Learning Storm

By Ankit Jain , Anand Nalya
books-svg-icon Book
Subscription
$10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
BUY NOW $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
Subscription
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
  1. Free Chapter
    Setting Up Storm on a Single Machine
About this book

Starting with the very basics of Storm, you will learn how to set up Storm on a single machine and move on to deploying Storm on your cluster. You will understand how Kafka can be integrated with Storm using the Kafka spout.

You will then proceed to explore the Trident abstraction tool with Storm to perform stateful stream processing, guaranteeing single message processing in every topology. You will move ahead to learn how to integrate Hadoop with Storm. Next, you will learn how to integrate Storm with other well-known Big Data technologies such as HBase, Redis, and Kafka to realize the full potential of Storm.

Finally, you will perform in-depth case studies on Apache log processing and machine learning with a focus on Storm, and through these case studies, you will discover Storm's realm of possibilities.

Publication date:
August 2014
Publisher
Packt
Pages
252
ISBN
9781783981328

 

Chapter 1. Setting Up Storm on a Single Machine

With the exponential growth in the amount of data being generated and advanced data-capturing capabilities, enterprises are facing the challenge of making sense out of this mountain of raw data. On the batch processing front, Hadoop has emerged as the go-to framework to deal with Big Data. Until recently, there has been a void when one looks for frameworks to build real-time stream processing applications. Such applications have become an integral part of a lot of businesses as they enable them to respond swiftly to events and adapt to changing situations. Examples of this are monitoring social media to analyze public response to any new product that you launch and predicting the outcome of an election based on the sentiments of the election-related posts.

Apache Storm has emerged as the platform of choice for the industry leaders to develop such distributed, real-time, data processing platforms. It provides a set of primitives that can be used to develop applications that can process a very large amount of data in real time in a highly scalable manner.

Storm is to real-time processing what Hadoop is to batch processing. It is an open source software, currently being incubated at the Apache Software Foundation. Being in incubation does not mean that it is not yet ready for actual production. Indeed, it has been deployed to meet real-time processing needs by companies such as Twitter, Yahoo!, and Flipboard. Storm was first developed by Nathan Marz at BackType, a company that provided social search applications. Later, BackType was acquired by Twitter, and now it is a critical part of their infrastructure. Storm can be used for the following use cases:

  • Stream processing: Storm is used to process a stream of data and update a variety of databases in real time. This processing occurs in real time and the processing speed needs to match the input data speed.

  • Continuous computation: Storm can do continuous computation on data streams and stream the results into clients in real time. This might require processing each message as it comes or creating small batches over a little time. An example of continuous computation is streaming trending topics on Twitter into browsers.

  • Distributed RPC: Storm can parallelize an intense query so that you can compute it in real time.

  • Real-time analytics: Storm can analyze and respond to data that comes from different data sources as they happen in real time.

In this chapter, we will cover the following topics:

  • Features of Storm

  • Various components of a Storm cluster

  • What is a Storm topology

  • Local and remote operational modes to execute Storm topologies

  • Setting up a development environment to develop a Storm topology

  • Developing a sample topology

  • Setting up a single-node Storm cluster and its prerequisites

  • Deploying the sample topology

 

Features of Storm


The following are some of the features of Storm that make it a perfect solution to process streams of data in real time:

  • Fast: Storm has been reported to process up to 1 million tuples per second per node.

  • Horizontally scalable: Being fast is a necessary feature to build a high volume/velocity data processing platform, but a single-node will have an upper limit on the number of events that it can process per second. A node represents a single machine in your setup that execute Storm applications. Storm, being a distributed platform, allows you to add more nodes to your Storm cluster and increase the processing capacity of your application. Also, it is linearly scalable, which means that you can double the processing capacity by doubling the nodes.

  • Fault tolerant: Units of work are executed by worker processes in a Storm cluster. When a worker dies, Storm will restart that worker, and if the node on which the worker is running dies, Storm will restart that worker on some other node in the cluster. The descriptions of the worker process is mentioned in the Configuring the parallelism of a topology section of Chapter 2, Setting Up a Storm Cluster.

  • Guaranteed data processing: Storm provides strong guarantees that each message passed on to it to process will be processed at least once. In the event of failures, Storm will replay the lost tuples. Also, it can be configured so that each message will be processed only once.

  • Easy to operate: Storm is simple to deploy and manage. Once the cluster is deployed, it requires little maintenance.

  • Programming language agnostic: Even though the Storm platform runs on Java Virtual Machine, the applications that run over it can be written in any programming language that can read and write to standard input and output streams.

 

Storm components


A Storm cluster follows a master-slave model where the master and slave processes are coordinated through ZooKeeper. The following are the components of a Storm cluster.

Nimbus

The Nimbus node is the master in a Storm cluster. It is responsible for distributing the application code across various worker nodes, assigning tasks to different machines, monitoring tasks for any failures, and restarting them as and when required.

Nimbus is stateless and stores all of its data in ZooKeeper. There is a single Nimbus node in a Storm cluster. It is designed to be fail-fast, so when Nimbus dies, it can be restarted without having any effects on the already running tasks on the worker nodes. This is unlike Hadoop, where if the JobTracker dies, all the running jobs are left in an inconsistent state and need to be executed again.

Supervisor nodes

Supervisor nodes are the worker nodes in a Storm cluster. Each supervisor node runs a supervisor daemon that is responsible for creating, starting, and stopping worker processes to execute the tasks assigned to that node. Like Nimbus, a supervisor daemon is also fail-fast and stores all of its state in ZooKeeper so that it can be restarted without any state loss. A single supervisor daemon normally handles multiple worker processes running on that machine.

The ZooKeeper cluster

In any distributed application, various processes need to coordinate with each other and share some configuration information. ZooKeeper is an application that provides all these services in a reliable manner. Being a distributed application, Storm also uses a ZooKeeper cluster to coordinate various processes. All of the states associated with the cluster and the various tasks submitted to the Storm are stored in ZooKeeper. Nimbus and supervisor nodes do not communicate directly with each other but through ZooKeeper. As all data is stored in ZooKeeper, both Nimbus and the supervisor daemons can be killed abruptly without adversely affecting the cluster.

The following is an architecture diagram of a Storm cluster:

A Storm cluster's architecture

 

The Storm data model


The basic unit of data that can be processed by a Storm application is called a tuple. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array. Storm also provides an API to define your own data types, which can be serialized as fields in a tuple.

A tuple is dynamically typed, that is, you just need to define the names of the fields in a tuple and not their data type. The choice of dynamic typing helps to simplify the API and makes it easy to use. Also, since a processing unit in Storm can process multiple types of tuples, it's not practical to declare field types.

Each of the fields in a tuple can be accessed by its name getValueByField(String) or its positional index getValue(int) in the tuple. Tuples also provide convenient methods such as getIntegerByField(String)that save you from typecasting the objects. For example, if you have a Fraction(numerator, denominator) tuple, representing fractional numbers, then you can get the value of the numerator by either using getIntegerByField("numerator") or getInteger(0).

You can see the full set of operations supported by backtype.storm.tuple.backtype.storm.tuple.Tuple in the javadoc located at https://storm.incubator.apache.org/apidocs/backtype/storm/tuple/Tuple.html.

Definition of a Storm topology

In Storm terminology, a topology is an abstraction that defines the graph of the computation. You create a Storm topology and deploy it on a Storm cluster to process the data. A topology can be represented by a direct acyclic graph, where each node does some kind of processing and forwards it to the next node(s) in the flow. The following is a sample Storm topology:

Graphical representation of the Storm topology

The following are the components of a Storm topology:

  • Stream: The key abstraction in Storm is that of a stream. A stream is an unbounded sequence of tuples that can be processed in parallel by Storm. Each stream can be processed by a single or multiple types of bolts (the processing units in Storm, which are defined later in this section). Thus, Storm can also be viewed as a platform to transform streams. In the preceding diagram, streams are represented by arrows.

    Each stream in a Storm application is given an ID and the bolts can produce and consume tuples from these streams on the basis of their ID. Each stream also has an associated schema for the tuples that will flow through it.

  • Spout: A spout is the source of tuples in a Storm topology. It is responsible for reading or listening to data from an external source, for example, by reading from a logfile or listening for new messages in a queue and publishing them—emitting, in Storm terminology—into streams. A spout can emit multiple streams, each of different schemas. For example, it can read 10-field records from a logfile and emit them as different streams of 7-tuples and 4-tuples each.

    The backtype.storm.spout.ISpout interface is the interface used to define spouts. If you are writing your topology in Java, then you should use backtype.storm.topology.IRichSpout as it declares methods to use the TopologyBuilder API. Whenever a spout emits a tuple, Storm tracks all the tuples generated while processing this tuple, and when the execution of all the tuples in the graph of this source tuple is complete, it will send back an acknowledgement to the spout. This tracking happens only if a message ID was provided while emitting the tuple. If null was used as message ID, this tracking will not happen.

    A tuple-processing timeout can also be defined for a topology, and if a tuple is not processed within the specified timeout, a fail message will be sent back to the spout. Again, this will happen only if you define a message ID. A small performance gain can be extracted out of Storm at the risk of some data loss by disabling the message acknowledgements, which can be done by skipping the message ID while emitting tuples.

    The important methods of spout are:

    • nextTuple(): This method is called by Storm to get the next tuple from the input source. Inside this method, you will have the logic of reading data from the external sources and emitting them to an instance of backtype.storm.spout.ISpoutOutputCollector. The schema for streams can be declared by using the declareStream method of backtype.storm.topology.OutputFieldsDeclarer.

      If a spout wants to emit data to more than one stream, it can declare multiple streams using the declareStream method and specify a stream ID while emitting the tuple. If there are no more tuples to emit at the moment, this method would not be blocked. Also, if this method does not emit a tuple, then Storm will wait for 1 millisecond before calling it again. This waiting time can be configured using the topology.sleep.spout.wait.strategy.time.ms setting.

    • ack(Object msgId): This method is invoked by Storm when the tuple with the given message ID is completely processed by the topology. At this point, the user should mark the message as processed and do the required cleaning up such as removing the message from the message queue so that it does not get processed again.

    • fail(Object msgId): This method is invoked by Storm when it identifies that the tuple with the given message ID has not been processed successfully or has timed out of the configured interval. In such scenarios, the user should do the required processing so that the messages can be emitted again by the nextTuple method. A common way to do this is to put the message back in the incoming message queue.

    • open(): This method is called only once—when the spout is initialized. If it is required to connect to an external source for the input data, define the logic to connect to the external source in the open method, and then keep fetching the data from this external source in the nextTuple method to emit it further.

    Another point to note while writing your spout is that none of the methods should be blocking, as Storm calls all the methods in the same thread. Every spout has an internal buffer to keep track of the status of the tuples emitted so far. The spout will keep the tuples in this buffer until they are either acknowledged or failed, calling the ack or fail method respectively. Storm will call the nextTuple method only when this buffer is not full.

  • Bolt: A bolt is the processing powerhouse of a Storm topology and is responsible for transforming a stream. Ideally, each bolt in the topology should be doing a simple transformation of the tuples, and many such bolts can coordinate with each other to exhibit a complex transformation.

    The backtype.storm.task.IBolt interface is preferably used to define bolts, and if a topology is written in Java, you should use the backtype.storm.topology.IRichBolt interface. A bolt can subscribe to multiple streams of other components—either spouts or other bolts—in the topology and similarly can emit output to multiple streams. Output streams can be declared using the declareStream method of backtype.storm.topology.OutputFieldsDeclarer.

    The important methods of a bolt are:

    • execute(Tuple input): This method is executed for each tuple that comes through the subscribed input streams. In this method, you can do whatever processing is required for the tuple and then produce the output either in the form of emitting more tuples to the declared output streams or other things such as persisting the results in a database.

      You are not required to process the tuple as soon as this method is called, and the tuples can be held until required. For example, while joining two streams, when a tuple arrives, you can hold it until its counterpart also comes, and then you can emit the joined tuple.

      The metadata associated with the tuple can be retrieved by the various methods defined in the Tuple interface. If a message ID is associated with a tuple, the execute method must publish an ack or fail event using OutputCollector for the bolt or else Storm will not know whether the tuple was processed successfully or not. The backtype.storm.topology.IBasicBolt interface is a convenient interface that sends an acknowledgement automatically after the completion of the execute method. In the case that a fail event is to be sent, this method should throw backtype.storm.topology.FailedException.

    • prepare(Map stormConf, TopologyContext context, OutputCollector collector): A bolt can be executed by multiple workers in a Storm topology. The instance of a bolt is created on the client machine and then serialized and submitted to Nimbus. When Nimbus creates the worker instances for the topology, it sends this serialized bolt to the workers. The work will desterilize the bolt and call the prepare method. In this method, you should make sure the bolt is properly configured to execute tuples now. Any state that you want to maintain can be stored as instance variables for the bolt that can be serialized/deserialized later.

Operation modes

Operation modes indicate how the topology is deployed in Storm. Storm supports two types of operation modes to execute the Storm topology

  • The local mode: In the local mode, Storm topologies run on the local machine in a single JVM. This mode simulates a Storm cluster in a single JVM and is used for the testing and debugging of a topology.

  • The remote mode: In the remote mode, we will use the Storm client to submit the topology to the master along with all the necessary code required to execute the topology. Nimbus will then take care of distributing your code.

Setting up your development environment

Before you can start developing Storm topologies, you must first check/set up your development environment, which involves installing the following software packages on your development computer:

  • Java SDK 6

  • Maven

  • Git: Distributed version control

  • Spring Tool Suite: IDE

The following installation steps are valid for CentOS, and going forward, all the commands used in this book are valid for CentOS.

Installing Java SDK 6

Perform the following steps to install the Java SDK 6 on your machine:

  1. Download the Java SDK 6 RPM from Oracle's site (http://www.oracle.com/technetwork/java/javase/downloads/index.html).

  2. Install the Java jdk-6u31-linux-amd64.rpm file on your CentOS machine using the following command:

    sudo rpm -ivh jdk-6u31-linux-amd64.rpm
    
  3. Add the environment variable in the ~/.bashrc file:

    export JAVA_HOME=/usr/java/jdk1.6.0_31/
    
  4. Add the path of the bin directory of the JDK in the PATH system environment variable in the ~/.bashrc file:

    export PATH=$JAVA_HOME/bin:$PATH
    

    Note

    The PATH variable is the system variable that your operating system uses to locate the required executables from the command line or terminal window.

  5. Run the following command to reload the bashrc file on the current login terminal:

    source ~/.bashrc
    
  6. Check the Java installation as follows:

    java -version
    

    The output of the preceding command is:

    java version "1.6.0_31"
    Java(TM) SE Runtime Environment (build 1.6.0_31-b04)
    Java HotSpot(TM) 64-Bit Server VM (build 20.6-b01, mixed mode)
    

Installing Maven

Apache Maven is a software dependency management tool and is used to manage the project's build, reporting, and documentation. We are using this so that we do not need to download all the dependencies manually. Perform the following steps to install the Maven on your machine:

  1. Download the stable release of Maven from Maven's site (http://maven.apache.org/download.cgi).

  2. Once you have downloaded the latest version, unzip it. Now, set the MAVEN_HOME environment variable in the ~/.bashrc file to make the setting up of Maven easier.

    export MAVEN_HOME=/home/root/apache-maven-3.0.4
    
  3. Add the path to the bin directory of Maven in the $PATH environment variable in the ~/.bashrc file:

    export PATH=$JAVA_HOME/bin:$PATH:$MAVEN_HOME/bin
    
  4. Run the following command to reload the bashrc file on the current login terminal:

    source ~/.bashrc
    
  5. Check the Maven installation as follows:

    mvn –version
    

    The following information will be displayed:

    Apache Maven 3.0.4 (r1232337; 2012-01-17 14:14:56+0530)
    Maven home: /home/root/apache-maven-3.0.4
    Java version: 1.6.0_31, vendor: Sun Microsystems Inc.
    Java home: /usr/java/jdk1.6.0_31/jre
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "2.6.32-279.22.1.el6.x86_64", arch: "amd64", family: "unix"
    

Installing Git – distributed version control

Git is one of the most used open source version control systems. It is used to track content such as files and directories and allows multiple users to work on the same file. Perform the following steps to install Git on your machine:

  1. The command to install Git on a CentOS machine is:

    sudo yum install git
    
  2. Check the installation of Git using the following command:

    git --version
    

    The preceding command's output is:

    git version 1.7.1 
    

Installing the STS IDE

The STS IDE is an integrated development environment and is used to develop applications. We will be using this to develop all the examples in this book. Perform the following steps to install the STS IDE on your machine:

  1. Download the latest version of STS from the Spring site (https://spring.io/tools/sts/all).

  2. Once you have downloaded the latest version, unzip it.

  3. Start the STS IDE.

  4. Go to Windows | Preferences | Maven | Installations and add the path of maven-3.0.4, as shown in the following screenshot:

    Add maven-3.0.4 to launch Maven

  5. Go to Window | Preferences | Java | Installed JREs and add the path of Java Runtime Environment 6 (JRE 6), as shown in the following screenshot:

    Add jdk1.6.0_31 to the build path

From now on, we will use the Spring Tool Suite to develop all the sample Storm topologies.

Developing a sample topology

The sample topology shown in the following diagram will cover how to create a basic Storm project, including a spout and bolt, build it, and execute it:

A sample Hello World topology

Perform the following steps to create and execute a sample topology:

  1. Start your STS IDE and create a Maven project as shown in the following screenshot:

    Create a Maven project

  2. Specify com.learningstorm as Group Id and storm-example as Artifact Id, as shown in the following screenshot:

    Specify Archetype Parameters

  3. Add the following Maven dependencies in the pom.xml file:

    <dependencies>
      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
      </dependency>
      <dependency>
        <groupId>storm</groupId>
        <artifactId>storm</artifactId>
        <version>0.9.0.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
  4. Add the following Maven repository in the pom.xml file:

    <repositories>
      <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
      </repository>
    </repositories>
  5. Add the following Maven build plugins in the pom.xml file:

    <build>
      <plugins>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>2.2.1</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass />
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
      </plugins>
    </build>
  6. Write your first sample spout by creating a LearningStormSpout class in the com.learningstorm.storm_example package. The LearningStormSpout class extends the serialized BaseRichSpout class. This spout does not connect to an external source to fetch data but randomly generates the data and emits a continuous stream of records. The following is the source code of the LearningStormSpout class with an explanation:

    public class LearningStormSpout extends BaseRichSpout{
      private static final long serialVersionUID = 1L;
      private SpoutOutputCollectorspoutOutputCollector;
      private static final Map<Integer, String> map = new HashMap<Integer, String>();
      static {
        map.put(0, "google");
        map.put(1, "facebook");
        map.put(2, "twitter");
        map.put(3, "youtube");
        map.put(4, "linkedin");
      }
      public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        // Open the spout
        this.spoutOutputCollector = spoutOutputCollector;
      }
    
      public void nextTuple() {
        // Storm cluster repeatedly calls this method to emit a continuous
        // stream of tuples.
        final Random rand = new Random();
        // generate the random number from 0 to 4.
        int randomNumber = rand.nextInt(5);
        spoutOutputCollector.emit(new Values(map.get(randomNumber)));
    
      }
    
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // emit the tuple with field "site"
        declarer.declare(new Fields("site"));
      }
    }
  7. Write your first sample bolt by creating a LearningStormBolt class within the same package. The LearningStormBolt class extends the serialized BaseRichBolt class. This bolt will consume the tuples emitted by LearningStormSpout spout and will print the value of the field "site" on the console. The following is the source code of the LearningStormBolt class with an explanation:

    public class LearningStormBolt extends BaseBasicBolt{
    
      private static final long serialVersionUID = 1L;
    
      public void execute(Tuple input, BasicOutputCollector collector) {
        // fetched the field "site" from input tuple.
        String test = input.getStringByField("site");
        // print the value of field "site" on console.
        System.out.println("Name of input site is : " + test);
      }
    
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
      }
    }
  8. Create a main LearningStormTopology class within the same package. This class creates an instance of the spout and bolt, classes and chained together using a TopologyBuilder class. The following is the implementation of the main class:

    public class LearningStormTopology {
      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        // create an instance of TopologyBuilder class
        TopologyBuilder builder = new TopologyBuilder();
        // set the spout class
        builder.setSpout("LearningStormSpout", new LearningStormSpout(), 2);
        // set the bolt class
        builder.setBolt("LearningStormBolt", new LearningStormBolt(), 4).shuffleGrouping("LearningStormSpout");
    
        Config conf = new Config();
        conf.setDebug(true);
        // create an instance of LocalCluster class for 
        // executing topology in local mode.
        LocalCluster cluster = new LocalCluster();
    
        // LearningStormTopolgy is the name of submitted topology.
        cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology());
        try {
          Thread.sleep(10000);
        } catch (Exception exception) {
          System.out.println("Thread interrupted exception : " + exception);
        }
        // kill the LearningStormTopology
        cluster.killTopology("LearningStormToplogy");
        // shutdown the storm test cluster
        cluster.shutdown();
    
      }
    }
  9. Go to your project's home directory and run the following commands to execute the topology in the local mode:

    mvn compile exec:java -Dexec.classpathScope=compile 
    -
    Dexec.mainClass=com.learningstorm.storm_example.LearningStormTopology
    

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Also, we can execute the topology by simply running the main class through the STS IDE.

In the preceding example, we used a utility called LocalCluster to execute the topology in a single JVM. The LocalCluster class simulates the Storm cluster and starts all the Storm processes in a single JVM.

We have submitted a topology in a simulated cluster by calling the submitTopology method of the LocalCluster class. The submitTopology method takes the name of a topology, a configuration for the topology, and then the topology itself as arguments.

The topology name is used to identify the topology in the Storm cluster. Hence, it is good practice to use a unique name for each topology.

Running the Storm infrastructure in local mode is useful when we want to test and debug the topology.

The upcoming sections will cover the deployment of ZooKeeper, Storm native dependencies, and Storm, and how we can submit the topology on a single-node Storm cluster.

Setting up ZooKeeper

This section describes how you can set up a ZooKeeper cluster. We are deploying ZooKeeper in standalone mode, but in the distributed cluster mode, it is always recommended that you should run a ZooKeeper ensemble of at least three nodes to support failover and high availability. Perform the following steps to set up ZooKeeper on your machine:

  1. Download the latest stable ZooKeeper release from the ZooKeeper's site (http://www.apache.org/dyn/closer.cgi/zookeeper/); at this moment, the latest version is ZooKeeper 3.4.5.

  2. Once you have downloaded the latest version, unzip it and set the ZK_HOME environment variable.

  3. Create the configuration file, zoo.cfg, at the $ZK_HOME/conf directory using the following command:

    cd $ZK_HOME/conf
    touch zoo.cfg
    
  4. Add the following three properties in the zoo.cfg file:

    tickTime=2000
    dataDir=/tmp/zookeeper
    clientPort=2181
    

    The following are the definitions of each of these properties:

    • tickTime: This is the basic time unit in milliseconds used by ZooKeeper. It is used to send heartbeats and the minimum session timeout will be twice the tickTime value.

    • dataDir: This is an empty directory to store the in-memory database snapshots and transactional log.

    • clientPort: This is the port used to listen for client connections.

  5. The command to start the ZooKeeper node is as follows:

    bin/zkServer.sh start
    

    The following information is displayed:

    JMX enabled by default
    Using config: /home/root/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    
  6. At this point, the following Java process must be started:

    jps
    

    The following information is displayed:

    23074 QuorumPeerMain
    
  7. The command to check the status of running the ZooKeeper node is as follows:

    bin/zkServer.sh status
    

    The following information is displayed:

    JMX enabled by default
    Using config: ../conf/zoo.cfg
    Mode: standalone
    

Setting up Storm on a single development machine

This section describes you how to install Storm on a single machine. Download the latest stable Storm release from https://storm.incubator.apache.org/downloads.html; at the time of this writing, the latest version is storm-0.9.0.1. Perform the following steps to set up Storm on a single development machine:

  1. Once you have downloaded the latest version, unzip it and set the STORM_HOME environment variable.

  2. Perform the following steps to edit the storm.yaml configuration file:

    cd $STORM_HOME/conf
    vi storm.yaml
    

    Add the following information:

    storm.zookeeper.servers:
         - "127.0.0.1"
    storm.zookeeper.port: 2181
    nimbus.host: "127.0.0.1"
    storm.local.dir: "/tmp/storm-data"
    java.library.path: "/usr/local/lib"
    storm.messaging.transport: backtype.storm.messaging.netty.Context
    supervisor.slots.ports:
         - 6700
         - 6701
         - 6702
         - 6703
    
  3. The following is a definition of properties used in the storm.yaml file:

    • storm.zookeeper.servers: This property contains the IP addresses of ZooKeeper servers.

    • storm.zookeeper.port: This property contains the ZooKeeper client port.

    • storm.local.dir: The Nimbus and supervisor daemons require a directory on the local disk to store small amounts of state (such as JARs, CONFs, and more).

    • java.library.path: This is used to load the Java native libraries that Storm uses (ZeroMQ and JZMQ). The default location of Storm native libraries is /usr/local/lib: /opt/local/lib:/usr/lib.

    • nimbus.host: This specifies the IP address of the master (Nimbus) node:

    • supervisor.slots.ports: For every worker machine, we can configure how many workers run on that machine with this property. Each worker binds with a single port and uses that port to receive incoming messages.

  4. Start the master node using the following commands:

    cd $STORM_HOME
    bin/storm nimbus
    
  5. Start the supervisor node using the following commands:

    cd $STORM_HOME
    bin/storm supervisor
    

Deploying the sample topology on a single-node cluster

In the previous example, we executed the Storm topology in the local mode. Now, we will deploy the topology on the single-node Storm cluster.

  1. We will first create a LearningStormSingleNodeTopology class within the same package. The following LearningStormSingleNodeTopology class will use the submitTopology method of the StormSubmitter class to deploy the topology on the Storm cluster:

    public class LearningStormSingleNodeTopology {
      public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // set the spout class
        builder.setSpout("LearningStormSpout", new LearningStormSpout(), 4);
        // set the bolt class
        builder.setBolt("LearningStormBolt", new LearningStormBolt(), 2).shuffleGrouping("LearningStormSpout");
    
        Config conf = new Config();
        conf.setNumWorkers(3);
        try {
          // This statement submit the topology on remote cluster. 
          // args[0] = name of topology
          StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }catch(AlreadyAliveException alreadyAliveException) {
          System.out.println(alreadyAliveException);
        } catch (InvalidTopologyException invalidTopologyException) {
          System.out.println(invalidTopologyException);
        }
      }
    }
  2. Build your Maven project by running the following command on the project home directory:

    mvn clean install
    

    The output of the preceding command is:

    -----------------------------------------------------------------------
    [INFO] ----------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ----------------------------------------------------------------
    [INFO] Total time: 58.326s
    [INFO] Finished at: Mon Jan 20 00:55:52 IST 2014
    [INFO] Final Memory: 14M/116M
    [INFO] ----------------------------------------------------------------
    
  3. We can deploy the topology to the cluster using the following Storm client command:

    bin/storm jar jarName.jar [TopologyMainClass] [Args]
    

    The preceding command runs TopologyMainClass with the arguments, arg1 and arg2. The main function of TopologyMainClass is to define the topology and submit it to Nimbus. The Storm JAR part takes care of connecting to Nimbus and uploading the JAR part.

  4. Go to the $STORM_HOME directory and run the following command to deploy LearningStormSingleNodeTopology to the Storm cluster:

    bin/storm jar $PROJECT_HOME/target/storm-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.learningstorm.storm_example.LearningStormSingleNodeTopology LearningStormSingleNodeTopology
    

    The following information is displayed:

    0    [main] INFO  backtype.storm.StormSubmitter  - Jar not uploaded to master yet. Submitting jar...
    7    [main] INFO  backtype.storm.StormSubmitter  - Uploading topology jar /home/root/storm-example/target/storm-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-dfce742b-ca0b-4121-bcbe-1856dc1846a4.jar
    19   [main] INFO  backtype.storm.StormSubmitter  - Successfully uploaded topology jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-dfce742b-ca0b-4121-bcbe-1856dc1846a4.jar
    19   [main] INFO  backtype.storm.StormSubmitter  - Submitting topology LearningStormSingleNodeTopologyin distributed mode with conf{"topology.workers":3}
    219  [main] INFO  backtype.storm.StormSubmitter  - Finished submitting topology: LearningStormSingleNodeTopology
    
  5. Run the jps command to see the number of running JVM processes as follows:

    jps
    

    The preceding command's output is:

    26827 worker
    26530 supervisor
    26824 worker
    26468 nimbus
    15987 QuorumPeerMain
    26822 worker
    
  6. Storm supports deactivating a topology. In the deactivated state, spouts will not emits any new tuples into pipeline, but the processing of already emitted tuples will continue. The following is the command to deactivate the running topology:

    bin/storm deactivate topologyName
    
  7. Deactivate LearningStormSingleNodeTopology using the following command:

    bin/storm deactivate LearningStormSingleNodeTopology
    

    The following information is displayed:

    0    [main] INFO  backtype.storm.thrift  - Connecting to Nimbus at localhost:6627r
    76   [main] INFO  backtype.storm.command.deactivate  - Deactivated topology: LearningStormSingleNodeTopology
    
  8. Storm also supports activating a topology. When a topology is activated, spouts will again start emitting tuples. The following is the command to activate the topology:

    bin/storm activate topologyName
    
  9. Activate LearningStormSingleNodeTopology using the following command:

    bin/storm activate LearningStormSingleNodeTopology
    

    The following information is displayed:

    0    [main] INFO  backtype.storm.thrift  - Connecting to Nimbus at localhost:6627
    65   [main] INFO  backtype.storm.command.activate  - Activated topology: LearningStormSingleNodeTopology
    
  10. Storm topologies are never-ending processes. To stop a topology, we need to kill it. When killed, the topology first enters into the deactivation state, processes all the tuples already emitted into it, and then stops. Run the following command to kill LearningStormSingleNodeTopology:

    bin/storm kill LearningStormSingleNodeTopology
    

    The following information is displayed:

    0    [main] INFO  backtype.storm.thrift  - Connecting to Nimbus at localhost:6627
    80   [main] INFO  backtype.storm.command.kill-topology  - Killed topology: LearningStormSingleNodeTopology
    
  11. Now, run the jps command again to see the remaining JVM processes as follows:

    jps
    

    The preceding command's output is:

    26530 supervisor
    27193 Jps
    26468 nimbus
    15987 QuorumPeerMain
    
  12. To update a running topology, the only option available is to kill the currently running topology and submit a new one.

 

Summary


In this chapter, we introduced you to the basics of Storm and the various components that make up a Storm cluster. We saw the different operation modes in which a Storm cluster can operate. We deployed a single-node Storm cluster and also developed a sample topology to run it on the single-node Storm cluster.

In the next chapter, we will set up a three-node Storm cluster to run the sample topology. We will also see different types of Stream groupings supported by Storm and the guaranteed message semantic provided by Storm.

About the Authors
  • Ankit Jain

    Ankit Jain currently works as a senior research scientist at Uber AI Labs, the machine learning research arm of Uber. His work primarily involves the application of deep learning methods to a variety of Uber's problems, ranging from forecasting and food delivery to self-driving cars. Previously, he has worked in a variety of data science roles at the Bank of America, Facebook, and other start-ups. He has been a featured speaker at many of the top AI conferences and universities, including UC Berkeley, O'Reilly AI conference, and others. He has a keen interest in teaching and has mentored over 500 students in AI through various start-ups and bootcamps. He completed his MS at UC Berkeley and his BS at IIT Bombay (India).

    Browse publications by this author
  • Anand Nalya

    Anand Nalya is a full stack engineer with over 8 years of extensive experience in designing, developing, deploying, and benchmarking Big Data and web-scale applications for both start-ups and enterprises. He focuses on reducing the complexity in getting things done with brevity in code.

    He blogs about Big Data, web applications, and technology in general at http://anandnalya.com/. You can also follow him on Twitter at @anandnalya. When not working on projects, he can be found stargazing or reading.

    Browse publications by this author
Learning Storm
Unlock this book and the full library FREE for 7 days
Start now