In this chapter we will cover:
Setting up your development environment
Distributed version control
Creating a "Hello World" topology
Creating a Storm cluster – provisioning the machines
Creating a Storm cluster – provisioning Storm
Deriving basic click statistics
Unit testing a bolt
Implementing an integration test
Deploying to the cluster
This chapter provides a very basic and practical introduction to the Storm processor. This will cover everything, from setting up your development environment to basic operational concerns in deploying your topologies and basic quality practices such as unit and integration testing of your Storm topology. Upon completion of this chapter, you will be able to build, test, and deliver basic Storm topologies.
This book does not provide a theoretical introduction to the Storm processor and its primitives and architecture. The author assumes that the readers have orientated themselves through online resources such as the Storm wiki.
A development environment consists of all the tools and systems that are required in order to start building Storm topologies. The focus of this book is on individual delivery of Storm with a focus on the technology; however, it must be noted that the development environment for a software development team, be it centralized or distributed, would require much more tooling and processes to be effective and is considered outside the scope of this book.
The following classes of tools and processes are required in order to effectively set up the development environment, not only from an on-going perspective, but also in terms of implementing the recipes in this book:
SDK(s)
Version control
Build environment
System provisioning tooling
Cluster provisioning tooling
The provisioning and installation recipes in this book are based on Ubuntu; they are, however, quite portable to other Linux distributions. If you have any issues working with another distribution using these instructions, please seek support from the Storm mailing list at https://groups.google.com/forum/#!forum/storm-user.
Tip
Environmental variables are the enemy of maintainable and available systems. Developing on one environment type and deploying on another is a very risky example of such a variable. Developing on your target type should be done whenever possible.
Download the latest J2SE 6 SDK from Oracle's website (http://www.oracle.com/technetwork/java/javase/downloads/index.html) and install it as follows:
chmod 775 jdk-6u35-linux-x64.bin yes | jdk-6u35-linux-x64.bin mv jdk1.6.0_35 /opt ln -s /opt/jdk1.6.0_35/bin/java /usr/bin ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH
The version control system, Git, must then be installed:
sudo apt-get install git
The installation should then be followed by Maven, the build system:
sudo apt-get install mvn
Puppet, Vagrant, and VirtualBox must then be installed in order to provide application and environment provisioning:
sudo apt-get install virtualbox puppet vagrant
Finally, you need to install an IDE:
sudo apt-get install eclipse
Note
There is currently a debate around which fork of the Java SDK is to be used since Sun was acquired by Oracle. While the author understood the need for OpenJDK, the recipes in this book have been tested using the Oracle JDK. In general, there is no difference between OpenJDK and Oracle JDK, apart from the Oracle JDK being more stable but lagging behind in terms of features.
The JDK is obviously required for any Java development to take place. GIT is an open source distributed version control system that has received wide adoption in recent years. A brief introduction to GIT will be presented shortly.
Maven is a widely used build system that prefers convention over configuration. Maven includes many useful features including the Project Object Model (POM), which allows us to manage our libraries, dependencies, and versions in an effective manner. Maven is backed by many binary repositories on the Internet that allow us to transparently maintain binary dependencies correctly and package our topologies for deployment.
Within the growing arena of DevOps and Continuous Delivery, the Puppet system is widely used to provide declarative server provisioning of Linux and other operating systems and applications. Puppet provides us with the ability to program the state of our servers and deployment environments. This is important because our server's state can then be maintained within a version control system such as GIT and manual changes to servers can be safely removed. This provides many advantages, including deterministic Mean Time to Recovery (MTTR) and audit trail, which, in general, means making systems more stable. This is also an important step on the path towards continuous delivery.
Vagrant is a very useful tool within development environments. It allows the automation of provisioning of VirtualBox virtual machines. Within the context of the Storm processor, this is important, given that it is a cluster-based technology. In order to test a cluster, you must either build an actual cluster of machines or provision many virtual machines. Vagrant allows us to do this locally in a deterministic and declarative way.
Note
A virtual machine is an extremely useful abstraction within the IT infrastructure, operations, and development. However, it must be noted that, while reduced performance is expected and acceptable within locally hosted VMs, their usability at all times depends entirely on the availability of RAM. The processing power is not a key concern, especially with most modern processors being extremely underutilized, although this is not necessarily the case once your topologies are working; it is recommended that you ensure your computer has at least 8 GB of RAM.
Traditional version control systems are centralized. Each client contains a checkout of the files at their current version, depending on what branch the client is using. All previous versions are stored on the server. This has worked well, in such a way that it allows teams to collaborate closely and know to some degree what other members of the team are doing.
Centralized servers have some distinct downfalls that have led to the rise of distributed control systems. Firstly, the centralized server represents a single point of failure; if the server goes down or becomes unavailable for any reason, it becomes difficult for developers to work using their existing workflows. Secondly, if the data on the server is corrupt or lost for any reason, the history of the code base is lost.
Open source projects have been a large driver of distributed version controls, for both reasons, but mostly because of the collaboration models that distribution enables. Developers can follow a disciplined set of workflows on their local environments and then distribute these changes to one or many remote repositories when it is convenient to do so, in both a flat and hierarchical manner.
The obvious additional advantage is that there naturally exist many backups of the repository because each client has a complete mirror of the repository; therefore, if any client or server dies, it can simply be replicated back, once it has been restored.
Git is used in this book as the distributed version control system. In order to create a repository, you need to either clone or initialize a repository. For a new project that you create, the repository should be initialized.
First, let's create our project directory, as follows:
mkdir FirstGitProject cd FirstGitProject git init
In order to test if the workflow is working, we need some files in our repository.
touch README.txt vim README.txt
Using
vim
, or any other text editor, simply add some descriptive text and press the Insert key. Once you have finished typing, simply hit the Esc key and then a colon, followed bywq
; hit the Enter key.Before you commit, review the status of the repository.
git status
This should give you an output that looks similar to the following:
# On branch master # Initial commit # Untracked files: # README.txt
Git requires that you add all files and folders manually; you can do it as follows:
git add README.txt
Then commit the file using the following:
git commit –a
This will open a
vim
editor and allow you to add your comments.
Without pushing this repository to a remote host, you will essentially be placing it under the same risk as that of a centralized host. It is therefore important to push the repository to a remote host. Both www.github.com and www.bitbucket.org are good options for free-hosted Git services, providing that you aren't pushing your corporate intellectual property there for public consumption. This book uses bitbucket.org. In order to push your repository to this remote host, simply navigate there in your browser and sign up for an account.
Once the registration process is complete, create a new repository using the menu system.
Enter the following values in order to create the repository:
Once the repository is created, you need to add the remote repository to your local repository and push the changes to the remote repository.
git remote add origin https://[user]@bitbucket.org/[user]/firstgitproject.git git push origin master
You must replace [user]
in the preceding command with your registered username.
The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode.
Create a new project folder and initialize your Git repository.
mkdir HelloWorld cd HelloWorld git init
We must then create the Maven project file as follows:
vim pom.xml
Using
vim
, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.cookbook</groupId> <artifactId>hello-world</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>hello-world</name> <url>https://bitbucket.org/[user]/hello-world</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> </project>
We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the
pom.xml
file within the project tags:<repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> <repository> <id>twitter4j</id> <url>http://twitter4j.org/maven2</url> </repository> </repositories>
We then need to declare our dependencies by adding them within the project tags:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.8.1</version> <!-- keep storm out of the jar-with-dependencies --> <scope>provided</scope> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1</version> </dependency> </dependencies>
Finally we need to add the
build
plugin definitions for Maven:<build> <plugins> <!-- bind the maven-assembly-plugin to the package phase this will create a jar file without the Storm dependencies suitable for deployment to a cluster. --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>com.theoryinpractise</groupId> <artifactId>clojure-maven-plugin</artifactId> <version>1.3.8</version> <extensions>true</extensions> <configuration> <sourceDirectories> <sourceDirectory>src/clj</sourceDirectory> </sourceDirectories> </configuration> <executions> <execution> <id>compile</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>test</id> <phase>test</phase> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build>
With the POM file complete, save it using the Esc +
:
+wq
+ Enter key sequence and complete the required folder structure for the Maven project:mkdir src cd src mkdir test mkdir main cd main mkdir java
Then return to the project root folder and generate the Eclipse project files using the following:
mvn eclipse:eclipse
You must now start your Eclipse environment and import the generated project files into the workspace:
You must then create your first spout by creating a new class named
HelloWorldSpout
, which extends fromBaseRichSpout
and is located in thestorm.cookbook
package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object:private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout(){ final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); }
After construction, the Storm cluster will open the spout; provide the following implementation for the
open
method:public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
The Storm cluster will repeatedly call the
nextTuple
method, which will do all the work of the spout. Provide the following implementation for the method:Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if(instanceRandom == referenceRandom){ collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); }
Finally, you need to tell the Storm cluster which fields this spout emits within the
declareOutputFields
method:declarer.declare(new Fields("sentence"));
Once you have resolved all the required imports for the class, you need to create
HelloWorldBolt
. This class will consume the produced tuples and implement the required counting logic. Create the new class within thestorm.cookbook
package; it should extend theBaseRichBolt
class. Declare a private member variable and provide the following implementation for theexecute
method, which does the work for this bolt:String test = input.getStringByField("sentence"); if("Hello World".equals(test)){ myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); }
Finally, you need to bring the elements together and declare the Storm topology. Create a main class named
HelloWorldTopology
within the same package and provide the following main implementation:TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2) .shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); }
This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the
main
method.After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder:
mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.cookbook.HelloWorldTopology
The following diagram describes the "Hello World" topology:
The spout essentially emits a stream containing one of the following two sentences:
Other Random Word
Hello World
Based on random probability, it works by generating a random number upon construction and then generates subsequent random numbers to test against the original member's variable value. When it matches, Hello World
is emitted; during the remaining executions, the other random words are emitted.
The bolt simply matches and counts the instances of Hello World
. In the current implementation, you will notice sequential increments being printed from the bolt. In order to scale this bolt, you simply need to increase the parallelism hint for the topology by updating the following line:
builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 3) .shuffleGrouping("randomHelloWorld");
The key parameter here is parallism_hint
, which you can adjust upwards. If you execute the cluster again, you will then notice three separate counts that are printed independently and interweaved with each other.
Tip
You can scale a cluster after deployment by updating these hints using the Storm GUI or CLI; however, you can't change the topology structure without recompiling and redeploying the JAR. For the command-line option, please see the CLI documentation on the wiki available at the following link:
https://github.com/nathanmarz/storm/wiki/Command-line-client
It is important to ensure that your project dependencies are declared correctly within your POM. The Storm JARs must be declared with the provided scope; if not, they would be packaged into your JAR; this would result in duplicate class files on the classpath within a deployed node of the cluster. Note that Storm checks for this classpath duplication; it will fail to start if you have included Storm into your distribution.
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Open source versions of the code are maintained by the author at his Bitbucket account at https://bitbucket.org/qanderson.
Testing the cluster in the local mode is useful for debugging and verifying the basic functional logic of the cluster. It doesn't, however, give you a realistic view as to the operation of the cluster. Moreover, any development effort is only complete once the system is running in a production environment. This is a key consideration for any developer and is the cornerstone of the entire DevOps movement; regardless of the methodology, however, you must be able to reliably deploy your code into an environment. This recipe demonstrates how to create and provision an entire cluster directly from version control. There are many key principles in doing this:
The state of any given server must be known at all times. It isn't acceptable that people can log into a server and make changes to its settings or files without strict version control being in place.
Servers should be fundamentally immutable, with the state in some kind of separate volume. This allows deterministic recovery times of a server.
If something causes problems in the delivery process, do it more often. In software development and IT operations, this applies heavily to disaster recovery and integration. Both tasks can only be performed often if they are automated.
This book assumes that your destination production environment is a cluster (based on Amazon Web Services (AWS) EC2), which enables automatic scaling. Elastic auto-scaling is only possible where provisioning is automated.
The deployment of Storm topologies to an AWS cluster is the subject for a later chapter; however, the fundamentals will be presented in this recipe in a development environment.
Let's start by creating a new project as follows:
Create a new project named
vagrant-storm-cluster
with the following data structure:Using your favorite editor, create a file in the project root called
Vagrantfile
. Inside the file, you must create the file header and the configuration for the virtual machines that we want to create. We need at least onenimbus
node, twosupervisor
nodes, and azookeeper
node:# -*- mode: ruby -*- # vi: set ft=ruby : boxes = [ { :name => :nimbus, :ip => '192.168.33.100', :cpus =>2, :memory => 512 }, { :name => :supervisor1, :ip => '192.168.33.101', :cpus =>4, :memory => 1024 }, { :name => :supervisor2, :ip => '192.168.33.102', :cpus =>4, :memory => 1024 }, { :name => :zookeeper1, :ip => '192.168.33.201', :cpus =>1, :memory => 512 } ]
You must then create the virtual machine provisioning for each machine, specialized by the previous configuration at execution time. The first set of properties defines the hardware, networking, and operating system:
boxes.each do |opts| config.vm.define opts[:name] do |config| config.vm.box = "ubuntu12" config.vm.box_url = "http://dl.dropbox.com/u/1537815/precise64.box" config.vm.network :hostonly, opts[:ip] config.vm.host_name = "storm.%s" % opts[:name].to_s config.vm.share_folder "v-data", "/vagrant_data", "./data", :transient => false config.vm.customize ["modifyvm", :id, "--memory", opts[:memory]] config.vm.customize ["modifyvm", :id, "--cpus", opts[:cpus] ] if opts[:cpus]
The provisioning of the application is then configured using a combination of the bash and Puppet scripts:
config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts" config.vm.provision :shell, :inline => "apt-get update" # Check if the jdk has been provided if File.exist?("./data/jdk-6u35-linux-x64.bin") then config.vm.provision :puppet do |puppet| puppet.manifests_path = "manifests" puppet.manifest_file = "jdk.pp" end end config.vm.provision :puppet do |puppet| puppet.manifests_path = "manifests" puppet.manifest_file = "provisioningInit.pp" end # Ask puppet to do the provisioning now. config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug" end end end
The Vagrant file simply defines the hypervisor-level configuration and provisioning; the remaining provisioning is done through Puppet and is defined at two levels. The first level makes the base Ubuntu installation ready for application provisioning. The second level contains the actual application provisioning. In order to create the first level of provisioning, you need to create the JDK provisioning bash script and the provisioning initialization Puppet script.
In the
scripts
folder of the project, create theinstallJdk.sh
file and populate it with the following code:#!/bin/sh echo "Installing JDK!" chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin cd /root yes | /vagrant_data/jdk-6u35-linux-x64.bin /bin/mv /root/jdk1.6.0_35 /opt /bin/rm -rv /usr/bin/java /bin/rm -rv /usr/bin/javac /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin JAVA_HOME=/opt/jdk1.6.0_35 export JAVA_HOME PATH=$PATH:$JAVA_HOME/bin export PATH
This will simply be invoked by the Puppet script in a declarative manner.
In the
manifest
folder create a file calledjdk.pp
:$JDK_VERSION = "1.6.0_35" package {"openjdk": ensure => absent, } exec { "installJdk": command => "installJdk.sh", path => "/vagrant/scripts", logoutput => true, creates => "/opt/jdk${JDK_VERSION}", }
In the
manifest
folder, create theprovisioningInit.pp
file and define the required packages and static variable values:$CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git" $CHECKOUT_DIR="/tmp/storm-puppet" package {git:ensure=> [latest,installed]} package {puppet:ensure=> [latest,installed]} package {ruby:ensure=> [latest,installed]} package {rubygems:ensure=> [latest,installed]} package {unzip:ensure=> [latest,installed]} exec { "install_hiera": command => "gem install hiera hiera-puppet", path => "/usr/bin", require => Package['rubygems'], }
Note
For more information on Hiera, please see the Puppet documentation page at http://docs.puppetlabs.com/hiera/1/index.html.
You must then clone the repository, which contains the second level of provisioning:
exec { "clone_storm-puppet": command => "git clone ${CLONE_URL}", cwd => "/tmp", path => "/usr/bin", creates => "${CHECKOUT_DIR}", require => Package['git'], }
You must now configure a Puppet plugin called Hiera, which is used to externalize properties from the provisioning scripts in a hierarchical manner:
exec {"/bin/ln -s /var/lib/gems/1.8/gems/hiera-puppet-1.0.0/ /tmp/storm-puppet/modules/hiera-puppet": creates => "/tmp/storm-puppet/modules/hiera-puppet", require => [Exec['clone_storm-puppet'],Exec['install_hiera']] } #install hiera and the storm configuration file { "/etc/puppet/hiera.yaml": source => "/vagrant_data/hiera.yaml", replace => true, require => Package['puppet'] } file { "/etc/puppet/hieradata": ensure => directory, require => Package['puppet'] } file {"/etc/puppet/hieradata/storm.yaml": source => "${CHECKOUT_DIR}/modules/storm.yaml", replace => true, require => [Exec['clone_storm-puppet'],File['/etc/puppet/hieradata']] }
Finally, you need to populate the
data
folder. Create the Hiera base configuration file,hiera.yaml
:--- :hierarchy: - %{operatingsystem} - storm :backends: - yaml :yaml: :datadir: '/etc/puppet/hieradata'
The final datafile required is the host's file, which act as the DNS in our local cluster:
127.0.0.1 localhost 192.168.33.100 storm.nimbus 192.168.33.101 storm.supervisor1 192.168.33.102 storm.supervisor2 192.168.33.103 storm.supervisor3 192.168.33.104 storm.supervisor4 192.168.33.105 storm.supervisor5 192.168.33.201 storm.zookeeper1 192.168.33.202 storm.zookeeper2 192.168.33.203 storm.zookeeper3 192.168.33.204 storm.zookeeper4
Tip
The host's file is not required in properly configured environments; however, it works nicely in our local "host only" development network.
The project is now complete, in that it will provision the correct virtual machines and install the base required packages; however, we need to create the Application layer provisioning, which is contained in a separate repository.
Initialize your Git repository for this project and push it to
bitbucket.org
.
Provisioning is performed on three distinct layers:
This recipe only works in the bottom two layers, with the Application layer presented in the next recipe. A key reason for the separation is that you will typically create different provisioning at these layers depending on the Hypervisor you are using for deployment. Once the VMs are provisioned, however, the application stack provisioning should be consistent through all your environments. This is key, in that it allows us to test our deployments hundreds of times before we get to production, and ensure that they are in a repeatable and version-controlled state.
In the development environment, VirtualBox is the Hypervisor with Vagrant and Puppet providing the provisioning. Vagrant works by specializing a base image of a VirtualBox. This base image represents a version-controlled artifact. For each box defined in our Vagrant file, the following parameters are specified:
The base box
The network settings
Shared folders
Memory and CPU settings for the VM
Tip
This base provisioning does not include any of the baseline controls you would expect in a production environment, such as security, access controls, housekeeping, and monitoring. You must provision these before proceeding beyond your development environment. You can find these kinds of recipes on Puppet Forge (http://forge.puppetlabs.com/).
Provisioning agents are then invoked to perform the remaining heavy lifting:
config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts"
The preceding command installs the host's file that gives the resolution of our cluster name:
config.vm.provision :shell, :inline => "apt-get update"
This updates all the packages in the apt-get
cache within the Ubuntu installation. Vagrant then proceeds to install the JDK and the base provisioning. Finally it invokes the application provisioning.
Note
The base VM image could contain the entire base provisioning already, thus making this portion of the provisioning unrequired. However, it is important to understand the process of creating an appropriate base image and also to balance the amount of specialization in the base images you control; otherwise, they will proliferate.
Once you have a base set of virtual machines that are ready for application provisioning, you need to install and configure the appropriate packages on each node.
Create a new project named
storm-puppet
with the following folder structure:The entry point into the Puppet execution on the provisioned node is
site.pp
. Create it in themanifests
folder:node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui } node /storm.supervisor[1-9]/ { $cluster = 'storm1' include storm::supervisor } node /storm.zookeeper[1-9]/ { include storm::zoo }
Next, you need to define the storm module. A module exists in the
modules
folder and has its ownmanifests
andtemplate
folder structure, much as with the structure found at the root level of the Puppet project. Within the storm module, create the required manifests (modules/storm/manifests
), starting with theinit.pp
file:class storm { include storm::install include storm::config }
The installation of the Storm application is the same on each
storm
node; only the configurations are adjusted where required, via templating. Next create theinstall.pp
file, which will download the required binaries and install them:class storm::install { $BASE_URL="https://bitbucket.org/qanderson/storm-deb-packaging/downloads/" $ZMQ_FILE="libzmq0_2.1.7_amd64.deb" $JZMQ_FILE="libjzmq_2.1.7_amd64.deb" $STORM_FILE="storm_0.8.1_all.deb" package { "wget": ensure => latest } # call fetch for each file exec { "wget_storm": command => "/usr/bin/wget ${BASE_URL}${STORM_FILE}" } exec {"wget_zmq": command => "/usr/bin/wget ${BASE_URL}${ZMQ_FILE}" } exec { "wget_jzmq": command => "/usr/bin/wget ${BASE_URL}${JZMQ_FILE}" } #call package for each file package { "libzmq0": provider => dpkg, ensure => installed, source => "${ZMQ_FILE}", require => Exec['wget_zmq'] } #call package for each file package { "libjzmq": provider => dpkg, ensure => installed, source => "${JZMQ_FILE}", require => [Exec['wget_jzmq'],Package['libzmq0']] } #call package for each file package { "storm": provider => dpkg, ensure => installed, source => "${STORM_FILE}", require => [Exec['wget_storm'], Package['libjzmq']] } }
Tip
The
install
manifest here assumes the existence of package, Debian packages, for Ubuntu. These were built using scripts and can be tweaked based on your requirements. The binaries and creation scripts can be found at https://bitbucket.org/qanderson/storm-deb-packaging.The installation consists of the following packages:
Storm
ZeroMQ: http://www.zeromq.org/
Java-ZeroMQ
The configuration of each node is done through the template-based generation of the configuration files. In the
storm
manifests, createconfig.pp
:class storm::config { require storm::install include storm::params file { '/etc/storm/storm.yaml': require => Package['storm'], content => template('storm/storm.yaml.erb'), owner => 'root', group => 'root', mode => '0644' } file { '/etc/default/storm': require => Package['storm'], content => template('storm/default.erb'), owner => 'root', group => 'root', mode => '0644' } }
All the
storm
parameters are defined using Hiera, with the Hiera configuration invoked fromparams.pp
in thestorm
manifests:class storm::params { #_ STORM DEFAULTS _# $java_library_path = hiera_array('java_library_path', ['/usr/local/lib', '/opt/local/lib', '/usr/lib']) }
Tip
Due to the sheer number of properties, the file has been concatenated. For the complete file, please refer to the Git repository at https://bitbucket.org/qanderson/storm-puppet/src.
Each class of node is then specified; here we will specify the
nimbus
class:class storm::nimbus { require storm::install include storm::config include storm::params # Install nimbus /etc/default storm::service { 'nimbus': start => 'yes', jvm_memory => $storm::params::nimbus_mem } }
class storm::supervisor { require storm::install include storm::config include storm::params # Install supervisor /etc/default storm::service { 'supervisor': start => 'yes', jvm_memory => $storm::params::supervisor_mem } }
class storm::ui { require storm::install include storm::config include storm::params # Install ui /etc/default storm::service { 'ui': start => 'yes', jvm_memory => $storm::params::ui_mem } }
And finally, specify the
zoo
class (for azookeeper
node):class storm::zoo { package {['zookeeper','zookeeper-bin','zookeeperd']: ensure => latest, } }
Once all the files have been created, initialize the Git repository and push it to bitbucket.org.
In order to actually run the provisioning, navigate to the
vagrant-storm-cluster
folder and run the following command:vagrant up
If you would like to
ssh
into any of the nodes, simply specify the following command:vagrant ssh nimbus
Replace
nimbus
with your required node name.
There are various patterns that can be applied when using Puppet. The simplest one is using a distributed model, whereby nodes provision themselves as opposed to a centralized model using Puppet Master. In the distributed model, updating server configuration simply requires that you update your provisioning manifests and push them to your central Git repository. The various nodes will then pull and apply this configuration. This can either be achieved through cron jobs, triggers, or through the use of a Continuous Delivery tool such as Jenkins, Bamboo, or Go. Provisioning in the development environment is explicitly invoked by Vagrant through the following command:
config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug"
The manifest is then applied declaratively by the Puppet. Puppet is declarative, in that each language element specifies the desired state together with methods for getting there. This means that, when the system is already in the required state, that particular provisioning step will be skipped, together with the adverse effects of duplicate provisioning.
The storm-puppet
project is therefore cloned onto the node and then the manifest is applied locally. Each node only applies provisioning for itself, based on the hostname specified in the site.pp
manifest, for example:
node 'storm.nimbus' { $cluster = 'storm1' include storm::nimbus include storm::ui }
In this case, the nimbus
node will include the Hiera configurations for cluster1
, and the installation for the nimbus
and ui
nodes will be performed. Any combination of classes can be included in the node
definition, thus allowing the complete environment to be succinctly defined.
The click topology is designed to gather basic website-usage statistics, specifically:
The number of visitors
The number of unique visitors
The number of visitors for a given country
The number of visitors for a given city
The percentage of visitors for each city in a given country
The system assumes a limited possible visitor population and prefers server-side client keys as opposed to client-side cookies. The topology derives the geographic information from the IP address and a public IP resolution service.
The click topology also uses Redis to store click events being sent into the topology, specifically as a persistent queue, and it also leverages Redis in order to persistently recall the previous visitors to the site.
Note
For more information on Redis, please visit Redis.io.
Before you proceed, you must install Redis (Version 2.6 or greater):
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make sudo cp redis-server /usr/local/bin/ sudo cp redis-cli /usr/local/bin/
Then start the Redis server.
Create a new Java project named
click-topology
, and create thepom.xml
file and folder structure as per the "Hello World" topology project.In the
pom.xml
file, update the project name and references, and then add the following dependencies to the<dependencies>
tag:<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.jmock</groupId> <artifactId>jmock-junit4</artifactId> <version>2.5.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.jmock</groupId> <artifactId>jmock-legacy</artifactId> <version>2.5.1</version> <scope>test</scope> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.1.0</version> </dependency>
Take a special note of the
scope
definitions of JUnit and JMock so as to not include them in your final deployable JAR.In the
source/main/java
folder, create theClickTopology
main class in thepackage storm.cookbook
package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows:public ClickTopology(){ builder.setSpout("clickSpout", new ClickSpout(), 10); //First layer of bolts builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10) .shuffleGrouping("clickSpout"); builder.setBolt("geographyBolt", new GeographyBolt(new HttpIPResolver()), 10) .shuffleGrouping("clickSpout"); //second layer of bolts, commutative in nature builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt"); builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT); } public void runLocal(int runTime){ conf.setDebug(true); conf.put(Conf.REDIS_HOST_KEY, "localhost"); cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); if(runTime > 0){ Utils.sleep(runTime); shutDownLocal(); } } public void shutDownLocal(){ if(cluster != null){ cluster.killTopology("test"); cluster.shutdown(); } } public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException { conf.setNumWorkers(20); conf.put(Conf.REDIS_HOST_KEY, redisHost); StormSubmitter.submitTopology(name, conf, builder.createTopology()); }
This is followed by the
main
method, which is guided by the number of arguments passed at runtime:public static void main(String[] args) throws Exception { ClickTopology topology = new ClickTopology(); if(args!=null && args.length > 1) { topology.runCluster(args[0], args[1]); } else { if(args!=null && args.length == 1) System.out.println("Running in local mode, redis ip missing for cluster run"); topology.runLocal(10000); } }
The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the
storm.cookbook
package, create theClickSpout
class, which connects to Redis when it is opened by the cluster:public class ClickSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(ClickSpout.class); private Jedis jedis; private String host; private int port; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf .get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); }
The cluster will then poll the spout for new tuples through the
nextTuple
method:public void nextTuple() { String content = jedis.rpop("count"); if(content==null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY).toString(); collector.emit(new Values(ip,url,clientKey)); } }
Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the
RepeatVisitBolt
class, providing the open and Redis connection logic:public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @Override public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf. get(Conf.REDIS_PORT_KEY).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); }
In the
execute
method, the tuple from theClickSpout
class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple:public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm.cookbook.Fields.IP); String clientKey = tuple.getStringByField(storm.cookbook.Fields.CLIENT_KEY); String url = tuple.getStringByField(storm.cookbook.Fields.URL); String key = url + ":" + clientKey; String value = jedis.get(key); if(value == null){ jedis.set(key, "visited"); collector.emit(new Values(clientKey, url, Boolean.TRUE.toString())); } else { collector.emit(new Values(clientKey, url, Boolean.FALSE.toString())); } }
Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The
GeographyBolt
class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In thestorm.cookbook
package, create theGeographyBolt
class, extending from theBaseRichBolt
interface, and implement theexecute
method:public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm .cookbook.Fields.IP); JSONObject json = resolver.resolveIP(ip); String city = (String) json.get(storm .cookbook.Fields.CITY); String country = (String) json.get(storm .cookbook.Fields.COUNTRY_NAME); collector.emit(new Values(country, city)); }
Provide a resolver by implementing the resolver,
HttpIPResolver
, and injecting it intoGeographyBolt
at design time:public class HttpIPResolver implements IPResolver, Serializable { static String url = "http://api.hostip.info/get_json.php"; @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try { geoUrl = new URL(url + "?ip=" + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader( connection.getInputStream())); JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) {} } } return null; } }
Next, we need to derive the geographic stats. The
GeoStatsBolt
class simply receives the enriched tuple fromGeographicBolt
and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. TheGeoStatsBolt
class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis:builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
Creating the
GeoStatsBolt
class, provide the implementation for theexecute
method:public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField(Fields.CITY); if(!stats.containsKey(country)){ stats.put(country, new CountryStats(country)); } stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city))); }
The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country:
private class CountryStats { private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName){ this.countryName = countryName; } private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>(); public void cityFound(String cityName){ countryTotal++; if(cityStats.containsKey(cityName)){ cityStats.get(cityName).set(COUNT_INDEX, cityStats.get(cityName) .get(COUNT_INDEX).intValue() + 1); } else { List<Integer> list = new LinkedList<Integer>(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } }
Finally, the
VisitorStatsBolt
method provides the final counting functionality for visitors and unique visits, based on the enriched stream from theRepeatVisitBolt
class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition:builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
In order to implement the
VisitorStatsBolt
class, create the class and define two member-level integers,total
anduniqueCount
; then implement theexecute
method:public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); }
The following diagram illustrates the click topology:
The spout emits the click events from the web server into the topology, through a shuffle grouping, to both the geography and repeat bolts. This ensures that the load is evenly distributed around the cluster, especially for these slow or highly latent processes.
Tip
It is important to understand the commutative versus associative nature of your data model, together with any other concerns that are in your streams and inherent models before designing your topology.
It is important to understand the parallelism of Storm while setting up the topology structure. There is an excellent summary of this on the wiki (https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology). The key points to take into account are:
The number of worker processes for the topology (
TOPOLOGY_WORKERS
).The number of executors (threads) per component of the topology. This is set using the parallelism hint. Note that this sets only the initial value (number of threads); this can be increased at runtime using topology rebalancing (through the UI or CLI). You can limit the number of executors using the
Config#setMaxTaskParallelism()
method.The number of tasks is set by default to
1
per executor. You can adjust this value when you declare a component, using theComponentConfigurationDeclarer#setNumTasks()
method.
These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page.
Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster.
Unit testing is an essential part of any delivery; the logic contained in the bolts must also be unit tested.
Unit testing often involves a process called mocking that allows you to use dynamically generated fake instances of objects as dependencies in order to ensure that a particular class is tested on a unit basis. This book illustrates unit testing using JUnit 4 and JMock. Please take the time to read up on JMock's recipes online at http://jmock.org/cookbook.html.
In the
src/test/java
folder, create thestorm.cookbook
package and create theStormTestCase
class. This class is a simple abstraction of some of the initialization code:public class StormTestCase { protected Mockery context = new Mockery() {{ setImposteriser(ClassImposteriser.INSTANCE); }}; protected Tuple getTuple(){ final Tuple tuple = context.mock(Tuple.class); return tuple; } }
Then create the
TestRepeatVisitBolt
class that extends fromStormTestCase
, and mark it with the parameterized runner annotation:@RunWith(value = Parameterized.class) public class TestRepeatVisitBold extends StormTestCase {
The test case logic of the class is contained in a single
execute
method:public void testExecute(){ jedis = new Jedis("localhost",6379); RepeatVisitBolt bolt = new RepeatVisitBolt(); Map config = new HashMap(); config.put("redis-host", "localhost"); config.put("redis-port", "6379"); final OutputCollector collector = context.mock(OutputCollector.class); bolt.prepare(config, null, collector); assertEquals(true, bolt.isConnected()); final Tuple tuple = getTuple(); context.checking(new Expectations(){{ oneOf(tuple).getStringByField(Fields IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields .CLIENT_KEY);will(returnValue(clientKey)); oneOf(tuple).getStringByField(Fields .URL);will(returnValue(url)); oneOf(collector).emit(new Values (clientKey, url, expected)); }}); bolt.execute(tuple); context.assertIsSatisfied(); if(jedis != null) jedis.disconnect(); }
Next, the parameters must be defined:
@Parameterized.Parameters public static Collection<Object[]> data() { Object[][] data = new Object[][] { { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.100", "Client1", "myintranet.com", "false" }, { "192.168.33.101", "Client2", "myintranet1.com", "true" }, { "192.168.33.102", "Client3", "myintranet2.com", false"}}; return Arrays.asList(data); }
The base provisioning of the values must be done before the tests using Redis:
@BeforeClass public static void setupJedis(){ Jedis jedis = new Jedis("localhost",6379); jedis.flushDB(); Iterator<Object[]> it = data().iterator(); while(it.hasNext()){ Object[] values = it.next(); if(values[3].equals("false")){ String key = values[2] + ":" + values[1]; jedis.set(key, "visited");//unique, meaning it must exist } } }
Firstly, the unit test works by defining a set of test data. This allows us to test many different cases without unnecessary abstractions or duplication. Before the tests execute, the static data is populated into the Redis DB, thus allowing the tests to run deterministically. The test method is then executed once per line of parameterized data; many different cases are verified.
JMock provides mock instances of the collector and the tuples to be emitted by the bolt. The expected behavior is then defined in terms of these mocked objects and their interactions:
context.checking(new Expectations(){{oneOf(tuple).getStringByField(Fields.IP);will(returnValue(ip)); oneOf(tuple).getStringByField(Fields.CLIENT_KEY);will(returnValue(clientKey));oneOf(tuple).getStringByField(Fields.URL);will(returnValue(url)); oneOf(collector).emit(new Values(clientKey, url, expected)); }});
Although these are separate lines of code, within the bounds of the expectations they should be read declaratively. I expect the
getStringField
method of the tuple to be called exactly once with the value ip
, and the mock object must then return a value to the class being tested.
This mechanism provides a clean way to exercise the bolt.
Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster.
Create the
IntegrationTestTopology
class in thesrc/test/java
folder in thestorm.cookbook
package. Set up a local topology by adding in a testing utility bolt:@BeforeClass public static void setup(){ //We want all output tuples coming to the mock for // testing purposes topology.getBuilder().setBolt("testBolt",testBolt, 1).globalGrouping("geoStats") .globalGrouping("totalStats"); // run in local mode, but we will shut the cluster down // when we are finished topology.runLocal(0); //jedis required for input and output of the cluster jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT)); jedis.connect(); jedis.flushDB(); //give it some time to startup before running the tests. Utils.sleep(5000); }
Then, define the expected parameters as a set of arrays arranged in pairs:
@Parameterized.Parameters public static Collection<Object[]> data() { Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations {new Object[]{ "165.228.250.178", "internal.com", "Client1"}, //input new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client1"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } }, {new Object[]{ "4.17.136.0", "internal.com", "Client2"}, //input, same client, different location new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations return Arrays.asList(data); } Object[] input; Object[] expected; public IntegrationTestTopology(Object[] input,Object[] expected){ this.input = input; this.expected = expected; }
The test logic can then be based on these parameters:
@Test public void inputOutputClusterTest(){ JSONObject content = new JSONObject(); content.put("ip" ,input[0]); content.put("url" ,input[1]); content.put("clientKey" ,input[2]); jedis.rpush("count", content.toJSONString()); Utils.sleep(3000); int count = 0; String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1).toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3).toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); }
The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values.
Once the TestBolt
value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt
persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt
is as follows:
public void execute(Tuple input) { List objects = input.getValues(); objects.add(0, input.getSourceComponent()); jedis.rpush("TestTuple", JSONArray.toJSONString(objects)); }
This is then read by the test and validated against the expected values:
String data = jedis.rpop("TestTuple"); while(data != null){ JSONArray values = (JSONArray) JSONValue.parse(data); if(values.get(0).toString().contains("geoStats")){ count++; assertEquals(expected[0], values.get(1) .toString().toUpperCase()); assertEquals(expected[1], values.get(2)); assertEquals(expected[2], values.get(3) .toString().toUpperCase()); assertEquals(expected[3], values.get(4)); } else if(values.get(0).toString().contains("totalStats")) { count++; assertEquals(expected[4], values.get(1)); assertEquals(expected[5], values.get(2)); } data = jedis.rpop("TestTuple"); } assertEquals(2, count); }
The final step in the development process is to functionally test the topology in a cluster before promoting it to the next environment.
First you need to configure the Storm client on your host development machine by creating the
.storm
folder in your user home directory. Createstorm.yaml
in this folder with the following content:storm.local.dir: "/mnt/storm" nimbus.host: "192.168.33.100"
Package your topology using the following command within the project's root:
mvn package
This will produce a completely packaged JAR in the target folder of the project. You can deploy this to the cluster using the
storm
client command:storm jar jarName.jar [TopologyName] [Args]
The storm
command-line client provides you with all the tools you need to control the cluster's functionality. Part of this is the ability to deploy packaged topologies. For more information on the storm CLI, please review the detailed documentation on the wiki at https://github.com/nathanmarz/storm/wiki/Command-line-client.