Storm Real-time Processing Cookbook

By Quinton Anderson
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Setting Up Your Development Environment

About this book

Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!
Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.

The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.

Publication date:
August 2013
Publisher
Packt
Pages
254
ISBN
9781782164425

 

Chapter 1. Setting Up Your Development Environment

In this chapter we will cover:

  • Setting up your development environment

  • Distributed version control

  • Creating a "Hello World" topology

  • Creating a Storm cluster – provisioning the machines

  • Creating a Storm cluster – provisioning Storm

  • Deriving basic click statistics

  • Unit testing a bolt

  • Implementing an integration test

  • Deploying to the cluster

 

Introduction


This chapter provides a very basic and practical introduction to the Storm processor. This will cover everything, from setting up your development environment to basic operational concerns in deploying your topologies and basic quality practices such as unit and integration testing of your Storm topology. Upon completion of this chapter, you will be able to build, test, and deliver basic Storm topologies.

This book does not provide a theoretical introduction to the Storm processor and its primitives and architecture. The author assumes that the readers have orientated themselves through online resources such as the Storm wiki.

Tip

Delivery of systems is only achieved once a system is delivering a business value in a production environment consistently and reliably. In order to achieve this, quality and operational concerns must always be taken into account while developing your Storm topologies.

 

Setting up your development environment


A development environment consists of all the tools and systems that are required in order to start building Storm topologies. The focus of this book is on individual delivery of Storm with a focus on the technology; however, it must be noted that the development environment for a software development team, be it centralized or distributed, would require much more tooling and processes to be effective and is considered outside the scope of this book.

The following classes of tools and processes are required in order to effectively set up the development environment, not only from an on-going perspective, but also in terms of implementing the recipes in this book:

  • SDK(s)

  • Version control

  • Build environment

  • System provisioning tooling

  • Cluster provisioning tooling

The provisioning and installation recipes in this book are based on Ubuntu; they are, however, quite portable to other Linux distributions. If you have any issues working with another distribution using these instructions, please seek support from the Storm mailing list at https://groups.google.com/forum/#!forum/storm-user.

Tip

Environmental variables are the enemy of maintainable and available systems. Developing on one environment type and deploying on another is a very risky example of such a variable. Developing on your target type should be done whenever possible.

How to do it…

  1. Download the latest J2SE 6 SDK from Oracle's website (http://www.oracle.com/technetwork/java/javase/downloads/index.html) and install it as follows:

    chmod 775 jdk-6u35-linux-x64.bin
    yes | jdk-6u35-linux-x64.bin
    mv jdk1.6.0_35 /opt
    ln -s /opt/jdk1.6.0_35/bin/java /usr/bin
    ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin
    JAVA_HOME=/opt/jdk1.6.0_35
    export JAVA_HOME
    PATH=$PATH:$JAVA_HOME/bin
    export PATH
    
  2. The version control system, Git, must then be installed:

    sudo apt-get install git
    
  3. The installation should then be followed by Maven, the build system:

    sudo apt-get install mvn
    
  4. Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning:

    sudo apt-get install virtualbox puppet vagrant
    
  5. Finally, you need to install an IDE:

    sudo apt-get install eclipse
    

    Note

    There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features.

How it works…

The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly.

Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment.

Within the growing arena of DevOps and Continuous Delivery, the Puppet system is widely used to provide declarative server provisioning of Linux and other operating systems and applications. Puppet provides us with the ability to program the state of our servers and deployment environments. This is important because our server's state can then be maintained within a version control system such as GIT and manual changes to servers can be safely removed. This provides many advantages, including deterministic Mean Time to Recovery (MTTR) and audit trail, which, in general, means making systems more stable. This is also an important step on the path towards continuous delivery.

Vagrant is a very useful tool within development environments. It allows the automation of provisioning of VirtualBox virtual machines. Within the context of the Storm processor, this is important, given that it is a cluster-based technology. In order to test a cluster, you must either build an actual cluster of machines or provision many virtual machines. Vagrant allows us to do this locally in a deterministic and declarative way.

Note

A virtual machine is an extremely useful abstraction within the IT infrastructure, operations, and development. However, it must be noted that, while reduced performance is expected and acceptable within locally hosted VMs, their usability at all times depends entirely on the availability of RAM. The processing power is not a key concern, especially with most modern processors being extremely underutilized, although this is not necessarily the case once your topologies are working; it is recommended that you ensure your computer has at least 8 GB of RAM.

 

Distributed version control


Traditional version control systems are centralized. Each client contains a checkout of the files at their current version, depending on what branch the client is using. All previous versions are stored on the server. This has worked well, in such a way that it allows teams to collaborate closely and know to some degree what other members of the team are doing.

Centralized servers have some distinct downfalls that have led to the rise of distributed control systems. Firstly, the centralized server represents a single point of failure; if the server goes down or becomes unavailable for any reason, it becomes difficult for developers to work using their existing workflows. Secondly, if the data on the server is corrupt or lost for any reason, the history of the code base is lost.

Open source projects have been a large driver of distributed version controls, for both reasons, but mostly because of the collaboration models that distribution enables. Developers can follow a disciplined set of workflows on their local environments and then distribute these changes to one or many remote repositories when it is convenient to do so, in both a flat and hierarchical manner.

The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored.

How to do it…

Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized.

  1. First, let's create our project directory, as follows:

    mkdir FirstGitProject
    cd FirstGitProject
    git init
    
  2. In order to test if the workflow is working, we need some files in our repository.

    touch README.txt
    vim README.txt
    

    Using vim, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed by wq; hit the Enter key.

  3. Before you commit, review the status of the repository.

    git status
    

    This should give you an output that looks similar to the following:

    # On branch master
    # Initial commit
    # Untracked files:
    #    README.txt
    
  4. Git requires that you add all files and folders manually; you can do it as follows:

    git add README.txt
    
  5. Then commit the file using the following:

    git commit –a
    
  6. This will open a vim editor and allow you to add your comments.

    Tip

    You can specify the commit message directly while issuing the command, using the –m flag.

Without pushing this repository to a remote host, you will essentially be placing it under the same risk as that of a centralized host. It is therefore important to push the repository to a remote host. Both www.github.com and www.bitbucket.org are good options for free-hosted Git services, providing that you aren't pushing your corporate intellectual property there for public consumption. This book uses bitbucket.org. In order to push your repository to this remote host, simply navigate there in your browser and sign up for an account.

Once the registration process is complete, create a new repository using the menu system.

Enter the following values in order to create the repository:

Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository.

git remote add origin https://[user]@bitbucket.org/[user]/firstgitproject.git
git push origin master

You must replace [user] in the preceding command with your registered username.

Tip

Cloning of a repository will be covered in later recipes, as will some standard version control workflows.

 

Creating a "Hello World" topology


The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode.

How to do it…

  1. Create a new project folder and initialize your Git repository.

    mkdir HelloWorld
    cd HelloWorld
    git init
    
  2. We must then create the Maven project file as follows:

    vim pom.xml
    
  3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>storm.cookbook</groupId>
      <artifactId>hello-world</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>hello-world</name>
      <url>https://bitbucket.org/[user]/hello-world</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
      
    </project>
  4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags:

    <repositories>
    
        <repository>
          <id>github-releases</id>
          <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
    
        <repository>
          <id>clojars.org</id>
          <url>http://clojars.org/repo</url>
        </repository>
    
        <repository>
          <id>twitter4j</id>
          <url>http://twitter4j.org/maven2</url>
        </repository>
    </repositories>

    Tip

    You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key.

  5. We then need to declare our dependencies by adding them within the project tags:

    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.8.1</version>
          <!-- keep storm out of the jar-with-dependencies -->
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>com.googlecode.json-simple</groupId>
          <artifactId>json-simple</artifactId>
          <version>1.1</version>
        </dependency>
    
    </dependencies>
  6. Finally we need to add the build plugin definitions for Maven:

    <build>
      <plugins>
          <!-- 
          bind the maven-assembly-plugin to the package phase
          this will create a jar file without the Storm dependencies suitable for deployment to a cluster.
           -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass></mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
    
        <plugin>
          <groupId>com.theoryinpractise</groupId>
          <artifactId>clojure-maven-plugin</artifactId>
          <version>1.3.8</version>
          <extensions>true</extensions>
          <configuration>
           <sourceDirectories>
            <sourceDirectory>src/clj</sourceDirectory>
           </sourceDirectories>
          </configuration>
          <executions>
          <execution>
              <id>compile</id>
              <phase>compile</phase>
              <goals>
                  <goal>compile</goal>
              </goals>
            </execution>
              <execution>
              <id>test</id>
              <phase>test</phase>
              <goals>
                 <goal>test</goal>
              </goals>
            </execution>
          </executions>
    
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <configuration>
            <source>1.6</source>
            <target>1.6</target>
          </configuration>
        </plugin>
      </plugins>
    </build>
  7. With the POM file complete, save it using the Esc + : + wq + Enter key sequence and complete the required folder structure for the Maven project:

    mkdir src
    cd src
    mkdir test
    mkdir main
    cd main
    mkdir java
    
  8. Then return to the project root folder and generate the Eclipse project files using the following:

    mvn eclipse:eclipse
    

    Tip

    The Eclipse project files are a generated artifact, much as a .class file, and should not be included in your Git checkins, especially since they contain client-machine-specific paths.

  9. You must now start your Eclipse environment and import the generated project files into the workspace:

  10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object:

    private SpoutOutputCollector collector;
      private int referenceRandom;
      private static final int MAX_RANDOM = 10;
      public HelloWorldSpout(){
        final Random rand = new Random();
        referenceRandom = rand.nextInt(MAX_RANDOM);
      }
  11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method:

    public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
        this.collector = collector;
      }
  12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method:

    Utils.sleep(100);
        final Random rand = new Random();
        int instanceRandom = rand.nextInt(MAX_RANDOM);
        if(instanceRandom == referenceRandom){
          collector.emit(new Values("Hello World"));
        } else {
          collector.emit(new Values("Other Random Word"));
        }
  13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method:

    declarer.declare(new Fields("sentence"));
  14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt:

    String test = input.getStringByField("sentence");
        if("Hello World".equals(test)){
          myCount++;
          System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
        }
  15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation:

    TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("randomHelloWorld", new 
                            HelloWorldSpout(), 10);
            builder.setBolt("HelloWorldBolt", new 
                           HelloWorldBolt(), 2)
             .shuffleGrouping("randomHelloWorld");
                    
            Config conf = new Config();
            conf.setDebug(true);
            
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(3);
                
                StormSubmitter.submitTopology(args[0], conf, 
                 builder.createTopology());
            } else {
            
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, 
                                      builder.createTopology());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();    
            }

    This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method.

  16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder:

    mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.cookbook.HelloWorldTopology
    

How it works…

The following diagram describes the "Hello World" topology:

The spout essentially emits a stream containing one of the following two sentences:

  • Other Random Word

  • Hello World

Based on random probability, it works by generating a random number upon construction and then generates subsequent random numbers to test against the original member's variable value. When it matches, Hello World is emitted; during the remaining executions, the other random words are emitted.

The bolt simply matches and counts the instances of Hello World. In the current implementation, you will notice sequential increments being printed from the bolt. In order to scale this bolt, you simply need to increase the parallelism hint for the topology by updating the following line:

builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 3)
                .shuffleGrouping("randomHelloWorld");

The key parameter here is parallism_hint, which you can adjust upwards. If you execute the cluster again, you will then notice three separate counts that are printed independently and interweaved with each other.

Tip

You can scale a cluster after deployment by updating these hints using the Storm GUI or CLI; however, you can't change the topology structure without recompiling and redeploying the JAR. For the command-line option, please see the CLI documentation on the wiki available at the following link:

https://github.com/nathanmarz/storm/wiki/Command-line-client

It is important to ensure that your project dependencies are declared correctly within your POM. The Storm JARs must be declared with the provided scope; if not, they would be packaged into your JAR; this would result in duplicate class files on the classpath within a deployed node of the cluster. Note that Storm checks for this classpath duplication; it will fail to start if you have included Storm into your distribution.

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Open source versions of the code are maintained by the author at his Bitbucket account at https://bitbucket.org/qanderson.

 

Creating a Storm cluster – provisioning the machines


Testing the cluster in the local mode is useful for debugging and verifying the basic functional logic of the cluster. It doesn't, however, give you a realistic view as to the operation of the cluster. Moreover, any development effort is only complete once the system is running in a production environment. This is a key consideration for any developer and is the cornerstone of the entire DevOps movement; regardless of the methodology, however, you must be able to reliably deploy your code into an environment. This recipe demonstrates how to create and provision an entire cluster directly from version control. There are many key principles in doing this:

  • The state of any given server must be known at all times. It isn't acceptable that people can log into a server and make changes to its settings or files without strict version control being in place.

  • Servers should be fundamentally immutable, with the state in some kind of separate volume. This allows deterministic recovery times of a server.

  • If something causes problems in the delivery process, do it more often. In software development and IT operations, this applies heavily to disaster recovery and integration. Both tasks can only be performed often if they are automated.

  • This book assumes that your destination production environment is a cluster (based on Amazon Web Services (AWS) EC2), which enables automatic scaling. Elastic auto-scaling is only possible where provisioning is automated.

The deployment of Storm topologies to an AWS cluster is the subject for a later chapter; however, the fundamentals will be presented in this recipe in a development environment.

How to do it...

Let's start by creating a new project as follows:

  1. Create a new project named vagrant-storm-cluster with the following data structure:

  2. Using your favorite editor, create a file in the project root called Vagrantfile. Inside the file, you must create the file header and the configuration for the virtual machines that we want to create. We need at least one nimbus node, two supervisor nodes, and a zookeeper node:

    # -*- mode: ruby -*-
    # vi: set ft=ruby :
    boxes = [
      { :name => :nimbus, :ip => '192.168.33.100', :cpus =>2, :memory => 512 },
      { :name => :supervisor1, :ip => '192.168.33.101', :cpus =>4, :memory => 1024 },
      { :name => :supervisor2, :ip => '192.168.33.102', :cpus =>4, :memory => 1024 },
      { :name => :zookeeper1, :ip => '192.168.33.201', :cpus =>1, :memory => 512 }
    ]

    Tip

    Note that the use of a single zookeeper node is only for development environments, as this cluster is not highly available. The purpose of this cluster is to test your topology logic in a realistic setting and identify stability issues.

  3. You must then create the virtual machine provisioning for each machine, specialized by the previous configuration at execution time. The first set of properties defines the hardware, networking, and operating system:

    boxes.each do |opts|
        config.vm.define opts[:name] do |config|
        config.vm.box = "ubuntu12"
        config.vm.box_url = 
            "http://dl.dropbox.com/u/1537815/precise64.box"
             config.vm.network :hostonly, opts[:ip]
        config.vm.host_name = "storm.%s" % opts[:name].to_s
        config.vm.share_folder "v-data", "/vagrant_data", "./data", :transient => false
        config.vm.customize ["modifyvm", :id, "--memory", opts[:memory]]
        config.vm.customize ["modifyvm", :id, "--cpus", opts[:cpus] ] if opts[:cpus]
  4. The provisioning of the application is then configured using a combination of the bash and Puppet scripts:

    config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts"
          
        config.vm.provision :shell, :inline => "apt-get update"
          # Check if the jdk has been provided
          if File.exist?("./data/jdk-6u35-linux-x64.bin") then
          config.vm.provision :puppet do |puppet|
            puppet.manifests_path = "manifests"
            puppet.manifest_file = "jdk.pp"
           end
          end
          
          config.vm.provision :puppet do |puppet|
          puppet.manifests_path = "manifests"
          puppet.manifest_file = "provisioningInit.pp"
          end
          
          # Ask puppet to do the provisioning now.
          config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug"  
          
        end
      end
    end

    The Vagrant file simply defines the hypervisor-level configuration and provisioning; the remaining provisioning is done through Puppet and is defined at two levels. The first level makes the base Ubuntu installation ready for application provisioning. The second level contains the actual application provisioning. In order to create the first level of provisioning, you need to create the JDK provisioning bash script and the provisioning initialization Puppet script.

  5. In the scripts folder of the project, create the installJdk.sh file and populate it with the following code:

    #!/bin/sh
    echo "Installing JDK!"
    chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin
    cd /root
    yes | /vagrant_data/jdk-6u35-linux-x64.bin
    /bin/mv /root/jdk1.6.0_35 /opt
    /bin/rm -rv /usr/bin/java
    /bin/rm -rv /usr/bin/javac
    /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin
    /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin
    JAVA_HOME=/opt/jdk1.6.0_35
    export JAVA_HOME
    PATH=$PATH:$JAVA_HOME/bin
    export PATH

    This will simply be invoked by the Puppet script in a declarative manner.

  6. In the manifest folder create a file called jdk.pp:

    $JDK_VERSION = "1.6.0_35" 
    package {"openjdk":
      ensure  =>  absent,
    } 
    exec { "installJdk":
      command => "installJdk.sh",
        path => "/vagrant/scripts",
        logoutput => true, 
        creates => "/opt/jdk${JDK_VERSION}",
    }
  7. In the manifest folder, create the provisioningInit.pp file and define the required packages and static variable values:

    $CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git"
    $CHECKOUT_DIR="/tmp/storm-puppet"
    
    package {git:ensure=> [latest,installed]}
    package {puppet:ensure=> [latest,installed]}
    package {ruby:ensure=> [latest,installed]}
    package {rubygems:ensure=> [latest,installed]}
    package {unzip:ensure=> [latest,installed]}
    
    exec { "install_hiera":
      command => "gem install hiera hiera-puppet",
        path => "/usr/bin",
        require => Package['rubygems'],
    }

    Note

    For more information on Hiera, please see the Puppet documentation page at http://docs.puppetlabs.com/hiera/1/index.html.

  8. You must then clone the repository, which contains the second level of provisioning:

    exec { "clone_storm-puppet":
      command => "git clone ${CLONE_URL}",
      cwd => "/tmp",
        path => "/usr/bin",
        creates => "${CHECKOUT_DIR}",
        require => Package['git'],
    }
  9. You must now configure a Puppet plugin called Hiera, which is used to externalize properties from the provisioning scripts in a hierarchical manner:

    exec {"/bin/ln -s /var/lib/gems/1.8/gems/hiera-puppet-1.0.0/ /tmp/storm-puppet/modules/hiera-puppet":
      creates => "/tmp/storm-puppet/modules/hiera-puppet",
      require => [Exec['clone_storm-puppet'],Exec['install_hiera']]
    }
    
    
    #install hiera and the storm configuration
    file { "/etc/puppet/hiera.yaml":
        source => "/vagrant_data/hiera.yaml",
        replace => true,
        require => Package['puppet']
    }
    
    file { "/etc/puppet/hieradata":
      ensure => directory,
      require => Package['puppet'] 
    }
    
    file {"/etc/puppet/hieradata/storm.yaml": 
      source => "${CHECKOUT_DIR}/modules/storm.yaml",
        replace => true,
        require => [Exec['clone_storm-puppet'],File['/etc/puppet/hieradata']]
    }
  10. Finally, you need to populate the data folder. Create the Hiera base configuration file, hiera.yaml:

    ---
    :hierarchy:
        - %{operatingsystem}
        - storm
    :backends:
        - yaml
    :yaml:
        :datadir: '/etc/puppet/hieradata'
  11. The final datafile required is the host's file, which act as the DNS in our local cluster:

    127.0.0.1       localhost
    192.168.33.100  storm.nimbus
    192.168.33.101  storm.supervisor1
    192.168.33.102  storm.supervisor2
    192.168.33.103  storm.supervisor3
    192.168.33.104  storm.supervisor4
    192.168.33.105  storm.supervisor5
    192.168.33.201  storm.zookeeper1
    192.168.33.202  storm.zookeeper2
    192.168.33.203  storm.zookeeper3
    192.168.33.204  storm.zookeeper4

    Tip

    The host's file is not required in properly configured environments; however, it works nicely in our local "host only" development network.

    The project is now complete, in that it will provision the correct virtual machines and install the base required packages; however, we need to create the Application layer provisioning, which is contained in a separate repository.

  12. Initialize your Git repository for this project and push it to bitbucket.org.

How it works...

Provisioning is performed on three distinct layers:

This recipe only works in the bottom two layers, with the Application layer presented in the next recipe. A key reason for the separation is that you will typically create different provisioning at these layers depending on the Hypervisor you are using for deployment. Once the VMs are provisioned, however, the application stack provisioning should be consistent through all your environments. This is key, in that it allows us to test our deployments hundreds of times before we get to production, and ensure that they are in a repeatable and version-controlled state.

In the development environment, VirtualBox is the Hypervisor with Vagrant and Puppet providing the provisioning. Vagrant works by specializing a base image of a VirtualBox. This base image represents a version-controlled artifact. For each box defined in our Vagrant file, the following parameters are specified:

  • The base box

  • The network settings

  • Shared folders

  • Memory and CPU settings for the VM

    Tip

    This base provisioning does not include any of the baseline controls you would expect in a production environment, such as security, access controls, housekeeping, and monitoring. You must provision these before proceeding beyond your development environment. You can find these kinds of recipes on Puppet Forge (http://forge.puppetlabs.com/).

Provisioning agents are then invoked to perform the remaining heavy lifting:

config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts"

The preceding command installs the host's file that gives the resolution of our cluster name:

config.vm.provision :shell, :inline => "apt-get update"

This updates all the packages in the apt-get cache within the Ubuntu installation. Vagrant then proceeds to install the JDK and the base provisioning. Finally it invokes the application provisioning.

Note

The base VM image could contain the entire base provisioning already, thus making this portion of the provisioning unrequired. However, it is important to understand the process of creating an appropriate base image and also to balance the amount of specialization in the base images you control; otherwise, they will proliferate.

 

Creating a Storm cluster – provisioning Storm


Once you have a base set of virtual machines that are ready for application provisioning, you need to install and configure the appropriate packages on each node.

How to do it…

  1. Create a new project named storm-puppet with the following folder structure:

  2. The entry point into the Puppet execution on the provisioned node is site.pp. Create it in the manifests folder:

    node 'storm.nimbus' {
      $cluster = 'storm1'
      include storm::nimbus
      include storm::ui
    }
    
    node /storm.supervisor[1-9]/ {
      $cluster = 'storm1'
      include storm::supervisor
    }
    
    node /storm.zookeeper[1-9]/ {
      include storm::zoo
    }
  3. Next, you need to define the storm module. A module exists in the modules folder and has its own manifests and template folder structure, much as with the structure found at the root level of the Puppet project. Within the storm module, create the required manifests (modules/storm/manifests), starting with the init.pp file:

    class storm {
      include storm::install
      include storm::config
    }
  4. The installation of the Storm application is the same on each storm node; only the configurations are adjusted where required, via templating. Next create the install.pp file, which will download the required binaries and install them:

    class storm::install {
    
      $BASE_URL="https://bitbucket.org/qanderson/storm-deb-packaging/downloads/"
      $ZMQ_FILE="libzmq0_2.1.7_amd64.deb"
      $JZMQ_FILE="libjzmq_2.1.7_amd64.deb"
      $STORM_FILE="storm_0.8.1_all.deb"
    
      package { "wget": ensure => latest }
      
      # call fetch for each file
      exec { "wget_storm": 
        command => "/usr/bin/wget ${BASE_URL}${STORM_FILE}" }
      exec {"wget_zmq": 
        command => "/usr/bin/wget ${BASE_URL}${ZMQ_FILE}" }
      exec { "wget_jzmq": 
        command => "/usr/bin/wget ${BASE_URL}${JZMQ_FILE}" }
      
      #call package for each file
      package { "libzmq0":
        provider => dpkg,
        ensure => installed,
        source => "${ZMQ_FILE}",
        require => Exec['wget_zmq']
      }
      #call package for each file
      package { "libjzmq":
        provider => dpkg,
        ensure => installed,
        source => "${JZMQ_FILE}",
        require => [Exec['wget_jzmq'],Package['libzmq0']]
      }
      #call package for each file
      package { "storm":
        provider => dpkg,
        ensure => installed,
        source => "${STORM_FILE}",
        require => [Exec['wget_storm'], Package['libjzmq']]
      } 
    }

    Tip

    The install manifest here assumes the existence of package, Debian packages, for Ubuntu. These were built using scripts and can be tweaked based on your requirements. The binaries and creation scripts can be found at https://bitbucket.org/qanderson/storm-deb-packaging.

    The installation consists of the following packages:

  5. The configuration of each node is done through the template-based generation of the configuration files. In the storm manifests, create config.pp:

    class storm::config {
      require storm::install
      include storm::params
      file { '/etc/storm/storm.yaml':
        require => Package['storm'],
        content => template('storm/storm.yaml.erb'),
        owner   => 'root',
        group   => 'root',
        mode    => '0644'
      }
      file { '/etc/default/storm':
        require => Package['storm'],
        content => template('storm/default.erb'),
        owner   => 'root',
        group   => 'root',
        mode    => '0644'
      }
    }
  6. All the storm parameters are defined using Hiera, with the Hiera configuration invoked from params.pp in the storm manifests:

    class storm::params {
      #_ STORM DEFAULTS _#
      $java_library_path = hiera_array('java_library_path', 
          ['/usr/local/lib', '/opt/local/lib', '/usr/lib'])
    }

    Tip

    Due to the sheer number of properties, the file has been concatenated. For the complete file, please refer to the Git repository at https://bitbucket.org/qanderson/storm-puppet/src.

  7. Each class of node is then specified; here we will specify the nimbus class:

    class storm::nimbus {
      require storm::install
      include storm::config
      include storm::params
    
      # Install nimbus /etc/default
      storm::service { 'nimbus':
        start      => 'yes',
        jvm_memory => $storm::params::nimbus_mem
      }
    
    }

    Specify the supervisor class:

    class storm::supervisor {
      require storm::install
      include storm::config
      include storm::params
    
      # Install supervisor /etc/default
      storm::service { 'supervisor':
        start      => 'yes',
        jvm_memory => $storm::params::supervisor_mem
      }
    
    }

    Specify the ui class:

    class storm::ui {
      require storm::install
      include storm::config
      include storm::params
      # Install ui /etc/default
      storm::service { 'ui':
        start      => 'yes',
        jvm_memory => $storm::params::ui_mem
      }
    
    }

    And finally, specify the zoo class (for a zookeeper node):

    class storm::zoo {
      package {['zookeeper','zookeeper-bin','zookeeperd']:
        ensure => latest,
      }
    }
  8. Once all the files have been created, initialize the Git repository and push it to bitbucket.org.

  9. In order to actually run the provisioning, navigate to the vagrant-storm-cluster folder and run the following command:

    vagrant up
    
  10. If you would like to ssh into any of the nodes, simply specify the following command:

    vagrant ssh nimbus
    

    Replace nimbus with your required node name.

How it works…

There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command:

config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug"

The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning.

The storm-puppet project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp manifest, for example:

node 'storm.nimbus' {
  $cluster = 'storm1'
  include storm::nimbus
  include storm::ui
}

In this case, the nimbus node will include the Hiera configurations for cluster1, and the installation for the nimbus and ui nodes will be performed. Any combination of classes can be included in the node definition, thus allowing the complete environment to be succinctly defined.

 

Deriving basic click statistics


The click topology is designed to gather basic website-usage statistics, specifically:

  • The number of visitors

  • The number of unique visitors

  • The number of visitors for a given country

  • The number of visitors for a given city

  • The percentage of visitors for each city in a given country

The system assumes a limited possible visitor population and prefers server-side client keys as opposed to client-side cookies. The topology derives the geographic information from the IP address and a public IP resolution service.

The click topology also uses Redis to store click events being sent into the topology, specifically as a persistent queue, and it also leverages Redis in order to persistently recall the previous visitors to the site.

Note

For more information on Redis, please visit Redis.io.

Getting ready

Before you proceed, you must install Redis (Version 2.6 or greater):

wget http://download.redis.io/redis-stable.tar.gz 
tar xvzf redis-stable.tar.gz 
cd redis-stable 
make
sudo cp redis-server /usr/local/bin/
sudo cp redis-cli /usr/local/bin/

Then start the Redis server.

How to do it…

  1. Create a new Java project named click-topology, and create the pom.xml file and folder structure as per the "Hello World" topology project.

  2. In the pom.xml file, update the project name and references, and then add the following dependencies to the <dependencies> tag:

    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.11</version>
       <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.jmock</groupId>
        <artifactId>jmock-junit4</artifactId>
        <version>2.5.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.jmock</groupId>
        <artifactId>jmock-legacy</artifactId>
        <version>2.5.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
       <version>2.1.0</version>
    </dependency>
  3. Take a special note of the scope definitions of JUnit and JMock so as to not include them in your final deployable JAR.

  4. In the source/main/java folder, create the ClickTopology main class in the package storm.cookbook package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows:

    public ClickTopology(){
      builder.setSpout("clickSpout", new ClickSpout(), 10);
    
      //First layer of bolts
      builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10)
                  .shuffleGrouping("clickSpout");
      builder.setBolt("geographyBolt", new GeographyBolt(new 
                  HttpIPResolver()), 10)
                  .shuffleGrouping("clickSpout");
    
      //second layer of bolts, commutative in nature
      builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
      builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
      conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT);
      }
      public void runLocal(int runTime){
         conf.setDebug(true);
         conf.put(Conf.REDIS_HOST_KEY, "localhost");
         cluster = new LocalCluster();
         cluster.submitTopology("test", conf, builder.createTopology());
         if(runTime > 0){
             Utils.sleep(runTime);
             shutDownLocal();
         }
      }
    
      public void shutDownLocal(){
          if(cluster != null){
              cluster.killTopology("test");
              cluster.shutdown();
          }
        }
    
        public void runCluster(String name, String redisHost) throws AlreadyAliveException, 
                               InvalidTopologyException {
            conf.setNumWorkers(20);
            conf.put(Conf.REDIS_HOST_KEY, redisHost);
            StormSubmitter.submitTopology(name, conf, 
                builder.createTopology());
        }
  5. This is followed by the main method, which is guided by the number of arguments passed at runtime:

    public static void main(String[] args) throws Exception {
         ClickTopology topology = new ClickTopology();
         
          if(args!=null && args.length > 1) {
              topology.runCluster(args[0], args[1]);
          } else {
              if(args!=null && args.length == 1)
                System.out.println("Running in local mode, redis ip missing for cluster run");
              topology.runLocal(10000);
          }
    
    }
  6. The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the storm.cookbook package, create the ClickSpout class, which connects to Redis when it is opened by the cluster:

    public class ClickSpout extends BaseRichSpout {
    
       public static Logger LOG = Logger.getLogger(ClickSpout.class);
    
        private Jedis jedis;
        private String host;
        private int port;
        private SpoutOutputCollector collector;
    
    
      @Override
      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
          outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
                  storm.cookbook.Fields.URL,
                  storm.cookbook.Fields.CLIENT_KEY));
      }
    
      @Override
      public void open(Map conf, TopologyContext 
                       topologyContext, SpoutOutputCollector 
                spoutOutputCollector) {
            host = conf.get(Conf.REDIS_HOST_KEY).toString();
            port = Integer.valueOf(conf
                 .get(Conf.REDIS_PORT_KEY).toString());
            this.collector = spoutOutputCollector;
            connectToRedis();
        }
    
        private void connectToRedis() {
            jedis = new Jedis(host, port);
        }
  7. The cluster will then poll the spout for new tuples through the nextTuple method:

    public void nextTuple() {
      String content = jedis.rpop("count");
       if(content==null || "nil".equals(content)) {
          try { Thread.sleep(300); } 
          catch (InterruptedException e) {}
       } else {
        JSONObject obj=(JSONObject) JSONValue.parse(content);
        String ip = obj.get(storm.cookbook.Fields.IP).toString();
        String url = obj.get(storm.cookbook.Fields.URL).toString();
        String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY).toString();
        collector.emit(new Values(ip,url,clientKey));
          }
      }
  8. Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the RepeatVisitBolt class, providing the open and Redis connection logic:

    public class RepeatVisitBolt extends BaseRichBolt {
    
        private OutputCollector collector;
    
        private Jedis jedis;
        private String host;
        private int port;
    
        @Override
        public void prepare(Map conf,
         TopologyContext topologyContext, OutputCollector 
         outputCollector) {
            this.collector = outputCollector;
            host = conf.get(Conf.REDIS_HOST_KEY).toString();
            port = Integer.valueOf(conf.
                   get(Conf.REDIS_PORT_KEY).toString());
            connectToRedis();
        }
    
        private void connectToRedis() {
            jedis = new Jedis(host, port);
            jedis.connect();
        }
  9. In the execute method, the tuple from the ClickSpout class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple:

    public void execute(Tuple tuple) {
      String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
      String clientKey = tuple.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
      String url = tuple.getStringByField(storm.cookbook.Fields.URL);
      String key = url + ":" + clientKey;
      String value = jedis.get(key);
      if(value == null){
           jedis.set(key, "visited");
           collector.emit(new Values(clientKey, url, 
              Boolean.TRUE.toString()));
      }  else {
            collector.emit(new Values(clientKey, url, 
                    Boolean.FALSE.toString()));
          }
    
      }
  10. Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The GeographyBolt class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In the storm.cookbook package, create the GeographyBolt class, extending from the BaseRichBolt interface, and implement the execute method:

    public void execute(Tuple tuple) {
       String ip = tuple.getStringByField(storm
                   .cookbook.Fields.IP);
       JSONObject json = resolver.resolveIP(ip);
       String city = (String) json.get(storm
                   .cookbook.Fields.CITY);
       String country = (String) json.get(storm
                   .cookbook.Fields.COUNTRY_NAME);
       collector.emit(new Values(country, city));
       }
  11. Provide a resolver by implementing the resolver, HttpIPResolver, and injecting it into GeographyBolt at design time:

    public class HttpIPResolver implements IPResolver, Serializable {
        static String url =
                "http://api.hostip.info/get_json.php";
        @Override
        public JSONObject resolveIP(String ip) {
          URL geoUrl = null;
          BufferedReader in = null;
          try {
            geoUrl = new URL(url + "?ip=" + ip);
            URLConnection connection = geoUrl.openConnection();
            in = new BufferedReader(new InputStreamReader(
                                    connection.getInputStream()));
            JSONObject json = (JSONObject) JSONValue.parse(in);
            in.close();
            return json;
            } catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                if(in != null){
                    try {
                        in.close();
                    } catch (IOException e) {}
                }
            }
            return null;
        }
    }
  12. Next, we need to derive the geographic stats. The GeoStatsBolt class simply receives the enriched tuple from GeographicBolt and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. The GeoStatsBolt class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis:

    builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
  13. Creating the GeoStatsBolt class, provide the implementation for the execute method:

    public void execute(Tuple tuple) {
            String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
            String city = tuple.getStringByField(Fields.CITY);
            if(!stats.containsKey(country)){
                stats.put(country, new CountryStats(country));
            }
            stats.get(country).cityFound(city);
            collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city)));
    
        }
  14. The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country:

    private class CountryStats {
            private int countryTotal = 0;
            private static final int COUNT_INDEX = 0;
            private static final int PERCENTAGE_INDEX = 1;
            private String countryName;
            public CountryStats(String countryName){
                this.countryName = countryName;
            }
            private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();
            public void cityFound(String cityName){
                countryTotal++;
                if(cityStats.containsKey(cityName)){
                    cityStats.get(cityName).set(COUNT_INDEX, 
                      cityStats.get(cityName)
                      .get(COUNT_INDEX).intValue() + 1);
                } else {
                   List<Integer> list = new LinkedList<Integer>();
          //add some dummy data
                list.add(1);
                list.add(0);
                cityStats.put(cityName, list);
                }
    
            double percent = (double)cityStats.get(cityName)
                  .get(COUNT_INDEX)/(double)countryTotal;
            cityStats.get(cityName).set(PERCENTAGE_INDEX, 
                   (int)percent);
            }
         public int getCountryTotal(){return countryTotal;}
         public int getCityTotal(String cityName){
             return cityStats.get(cityName)
               .get(COUNT_INDEX).intValue();
         }
    }
  15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition:

    builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
  16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method:

    public void execute(Tuple tuple) {
       boolean unique = Boolean.parseBoolean(tuple
        .getStringByField(storm.cookbook.Fields.UNIQUE));
       total++;
       if(unique)uniqueCount++;
       collector.emit(new Values(total,uniqueCount));
      }

How it works…

The following diagram illustrates the click topology:

The spout emits the click events from the web server into the topology, through a shuffle grouping, to both the geography and repeat bolts. This ensures that the load is evenly distributed around the cluster, especially for these slow or highly latent processes.

Tip

It is important to understand the commutative versus associative nature of your data model, together with any other concerns that are in your streams and inherent models before designing your topology.

It is important to understand the parallelism of Storm while setting up the topology structure. There is an excellent summary of this on the wiki (https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology). The key points to take into account are:

  • The number of worker processes for the topology (TOPOLOGY_WORKERS).

  • The number of executors (threads) per component of the topology. This is set using the parallelism hint. Note that this sets only the initial value (number of threads); this can be increased at runtime using topology rebalancing (through the UI or CLI). You can limit the number of executors using the Config#setMaxTaskParallelism() method.

  • The number of tasks is set by default to 1 per executor. You can adjust this value when you declare a component, using the ComponentConfigurationDeclarer#setNumTasks() method.

These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page.

Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster.

 

Unit testing a bolt


Unit testing is an essential part of any delivery; the logic contained in the bolts must also be unit tested.

Getting ready

Unit testing often involves a process called mocking that allows you to use dynamically generated fake instances of objects as dependencies in order to ensure that a particular class is tested on a unit basis. This book illustrates unit testing using JUnit 4 and JMock. Please take the time to read up on JMock's recipes online at http://jmock.org/cookbook.html.

How to do it…

  1. In the src/test/java folder, create the storm.cookbook package and create the StormTestCase class. This class is a simple abstraction of some of the initialization code:

    public class StormTestCase {
    
      protected Mockery context = new Mockery() {{
            setImposteriser(ClassImposteriser.INSTANCE);
        }};
        
    
        protected Tuple getTuple(){
            final Tuple tuple = context.mock(Tuple.class);
            return tuple;
        }
    }
  2. Then create the TestRepeatVisitBolt class that extends from StormTestCase, and mark it with the parameterized runner annotation:

    @RunWith(value = Parameterized.class)
    public class TestRepeatVisitBold extends StormTestCase {
  3. The test case logic of the class is contained in a single execute method:

    public void testExecute(){
          jedis = new Jedis("localhost",6379);
          RepeatVisitBolt bolt = new RepeatVisitBolt();
          Map config = new HashMap();
          config.put("redis-host", "localhost");
          config.put("redis-port", "6379");
          final OutputCollector collector = context.mock(OutputCollector.class);
          bolt.prepare(config, null, collector);
    
          assertEquals(true, bolt.isConnected());
    
          final Tuple tuple = getTuple();
          context.checking(new Expectations(){{
             oneOf(tuple).getStringByField(Fields
                    IP);will(returnValue(ip));
             oneOf(tuple).getStringByField(Fields
                   .CLIENT_KEY);will(returnValue(clientKey));
             oneOf(tuple).getStringByField(Fields
                    .URL);will(returnValue(url));
            oneOf(collector).emit(new Values
                   (clientKey, url, expected));
         }});
    
            bolt.execute(tuple);
            context.assertIsSatisfied();
    
            if(jedis != null)
                jedis.disconnect();
        }
  4. Next, the parameters must be defined:

    @Parameterized.Parameters
      public static Collection<Object[]> data() {
         Object[][] data = new Object[][] { 
           { "192.168.33.100", "Client1", "myintranet.com", "false" },
           { "192.168.33.100", "Client1", "myintranet.com", "false" },
           { "192.168.33.101", "Client2", "myintranet1.com", "true" },
           { "192.168.33.102", "Client3", "myintranet2.com", false"}};
      return Arrays.asList(data);
     }
  5. The base provisioning of the values must be done before the tests using Redis:

    @BeforeClass
        public static void setupJedis(){
            Jedis jedis = new Jedis("localhost",6379);
            jedis.flushDB();
            Iterator<Object[]> it = data().iterator();
            while(it.hasNext()){
                Object[] values = it.next();
                if(values[3].equals("false")){
                    String key = values[2] + ":" + values[1];
                    jedis.set(key, "visited");//unique, meaning it must exist
                }
            }
        }

    Tip

    It is always useful to leave data in the stack after the test completes in order to review and debug, clearing again only on the next run.

How it works…

Firstly, the unit test works by defining a set of test data. This allows us to test many different cases without unnecessary abstractions or duplication. Before the tests execute, the static data is populated into the Redis DB, thus allowing the tests to run deterministically. The test method is then executed once per line of parameterized data; many different cases are verified.

JMock provides mock instances of the collector and the tuples to be emitted by the bolt. The expected behavior is then defined in terms of these mocked objects and their interactions:

context.checking(new Expectations(){{oneOf(tuple).getStringByField(Fields.IP);will(returnValue(ip));
oneOf(tuple).getStringByField(Fields.CLIENT_KEY);will(returnValue(clientKey));oneOf(tuple).getStringByField(Fields.URL);will(returnValue(url));
oneOf(collector).emit(new Values(clientKey, url, expected));
        }});

Although these are separate lines of code, within the bounds of the expectations they should be read declaratively. I expect the getStringField method of the tuple to be called exactly once with the value ip, and the mock object must then return a value to the class being tested.

This mechanism provides a clean way to exercise the bolt.

Tip

There are many different kinds of unit tests; often it becomes necessary to test against a DB in such a manner; if you can help it, rather mock out all dependencies of the class and implement a true unit test. This would be possible with the geography bolt due to the resolver abstraction.

 

Implementing an integration test


Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster.

How to do it…

  1. Create the IntegrationTestTopology class in the src/test/java folder in the storm.cookbook package. Set up a local topology by adding in a testing utility bolt:

    @BeforeClass
        public static void setup(){
       //We want all output tuples coming to the mock for // testing purposes
         topology.getBuilder().setBolt("testBolt",testBolt, 1).globalGrouping("geoStats")
                              .globalGrouping("totalStats");
       // run in local mode, but we will shut the cluster down // when we are finished
            topology.runLocal(0);
       //jedis required for input and output of the cluster
            jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT));
            jedis.connect();
            jedis.flushDB();
       //give it some time to startup before running the tests.
            Utils.sleep(5000);
        }
  2. Then, define the expected parameters as a set of arrays arranged in pairs:

    @Parameterized.Parameters
        public static Collection<Object[]> data() {
        Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com",  "Client1"}, //input
        new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations
              {new Object[]{ "165.228.250.178", "internal.com",  "Client1"}, //input
              new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } },
              {new Object[]{ "4.17.136.0", "internal.com",  "Client1"}, //input, same client, different location
                new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } },
                {new Object[]{ "4.17.136.0", "internal.com",  "Client2"}, //input, same client, different location
                new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations
            return Arrays.asList(data);
        }
    Object[] input;
        Object[] expected;
        public IntegrationTestTopology(Object[] input,Object[] expected){
          this.input = input;
          this.expected = expected;
        }
  3. The test logic can then be based on these parameters:

    @Test
        public void inputOutputClusterTest(){
            JSONObject content = new JSONObject();
            content.put("ip" ,input[0]);
            content.put("url" ,input[1]);
            content.put("clientKey" ,input[2]);
    
            jedis.rpush("count", content.toJSONString());
    
            Utils.sleep(3000);
            
            int count = 0;
            String data = jedis.rpop("TestTuple");
            
            while(data != null){
              JSONArray values = (JSONArray) JSONValue.parse(data);
                
                if(values.get(0).toString().contains("geoStats")){
                  count++;
                    assertEquals(expected[0], values.get(1).toString().toUpperCase());
                    assertEquals(expected[1], values.get(2));
                    assertEquals(expected[2], values.get(3).toString().toUpperCase());
                    assertEquals(expected[3], values.get(4));
                } else if(values.get(0).toString().contains("totalStats")) {
                  count++;
                    assertEquals(expected[4], values.get(1));
                    assertEquals(expected[5], values.get(2));
                } 
                data = jedis.rpop("TestTuple");
    
            }
            assertEquals(2, count);
    
        }

How it works…

The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values.

Once the TestBolt value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt is as follows:

public void execute(Tuple input) {
    List objects = input.getValues();
    objects.add(0, input.getSourceComponent());
    jedis.rpush("TestTuple", JSONArray.toJSONString(objects));
  
  }

This is then read by the test and validated against the expected values:

String data = jedis.rpop("TestTuple");
        
    while(data != null){
      JSONArray values = (JSONArray) JSONValue.parse(data);
            
       if(values.get(0).toString().contains("geoStats")){
         count++;
            assertEquals(expected[0], values.get(1)
                          .toString().toUpperCase());
            assertEquals(expected[1], values.get(2));
            assertEquals(expected[2], values.get(3)
                          .toString().toUpperCase());
            assertEquals(expected[3], values.get(4));
       } else if(values.get(0).toString().contains("totalStats")) 
       {
       count++;
          assertEquals(expected[4], values.get(1));
          assertEquals(expected[5], values.get(2));
       } 
      data = jedis.rpop("TestTuple");

    }
     assertEquals(2, count);

  }
 

Deploying to the cluster


The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment.

How to do it…

  1. First you need to configure the Storm client on your host development machine by creating the .storm folder in your user home directory. Create storm.yaml in this folder with the following content:

    storm.local.dir: "/mnt/storm"
    nimbus.host: "192.168.33.100"
  2. Package your topology using the following command within the project's root:

    mvn package
    
  3. This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the storm client command:

    storm jar jarName.jar [TopologyName] [Args]
    

How it works…

The storm command-line client provides you with all the tools you need to control the cluster's functionality. Part of this is the ability to deploy packaged topologies. For more information on the storm CLI, please review the detailed documentation on the wiki at https://github.com/nathanmarz/storm/wiki/Command-line-client.

About the Author

  • Quinton Anderson

    Quinton Anderson is a software engineer with a background and focus on real-time computational systems. His career has been split between building real-time communication systems for defense systems and building enterprise applications within financial services and banking. Quinton does not align himself with any particular technology or programming language, but rather prefers to focus on sound engineering and polyglot development. He is passionate about open source, and is an active member of the Storm community; he has also enjoyed delivering various Storm-based solutions. Quinton's next area of focus is machine learning; specifically, Deep Belief networks, as they pertain to robotics. Please follow his blog entries on Computational Theory, general IT concepts, and Deep Belief networks for more information. You can find more information on Quinton via his LinkedIn profile (http://au.linkedin.com/pub/quinton-anderson/37/422/11b/) or more importantly, view and contribute to the source code available at his GitHub (https://github.com/quintona) and Bitbucket (https://bitbucket.org/qanderson) accounts.

    Browse publications by this author
Book Title
Access this book, plus 7,500 other titles for FREE
Access now