In this chapter, you will get acquainted with the following topics:
An overview of Storm
The "before Storm" era and key features of Storm
Storm cluster modes
Storm installation
Starting various daemons
Playing with Storm configurations
Over the complete course of the chapter, you will learn why Storm is creating a buzz in the industry and why it is relevant in present-day scenarios. What is this real-time computation? We will also explain the different types of Storm's cluster modes, the installation, and the approach to configuration.
Storm is a distributed, fault-tolerant, and highly scalable platform for processing streaming data in a real-time manner. It became an Apache top-level project in September 2014, and was previously an Apache Incubator project since September 2013.
Real-time processing on a massive scale has become a requirement of businesses. Apache Storm provides the capability to process data (a.k.a tuples or stream) as and when it arrives in a real-time manner with distributed computing options. The ability to add more machines to the Storm cluster makes Storm scalable. Then, the third most important thing that comes with storm is fault tolerance. If the storm program (also known as topology) is equipped with reliable spout, it can reprocess the failed tuples lost due to machine failure and also give fault tolerance. It is based on XOR magic, which will be explained in Chapter 2, The Storm Anatomy.
Storm was originally created by Nathan Marz and his team at BackType. The project was made open source after it was acquired by Twitter. Interestingly, Storm received a tag as Real Time Hadoop.
Storm is best suited for many real-time use cases. A few of its interesting use cases are explained here:
ETL pipeline: ETL stands for Extraction, Transformation, and Load. It is a very common use case of Storm. Data can be extracted or read from any source. Here, the data can be complex XML, a JDBC result set row, or simply a few key-value records. Data (also known as tuples in Storm) can be enriched on the fly with more information, transformed into the required storage format, and stored in a NoSQL/RDBMS data store. All of these things can be achieved at a very high throughput in a real-time manner with simple storm programs. Using the Storm ETL pipeline, you can ingest into a big data warehouse at high speed.
Trending topic analysis: Twitter uses such use cases to know the trending topics within a given time frame or at present. There are numerous use cases, and finding the top trends in a real-time manner is required. Storm can fit well in such use cases. You can also perform running aggregation of values with the help of any database.
Regulatory check engine: Real-time event data can pass through a business-specific regulatory algorithm, which can perform a compliance check in a real-time manner. Banks use these for trade data checks in real time.
Storm can ideally fit into any use case where there is a need to process data in a fast and reliable manner, at a rate of more than 10,000 messages processing per second, as soon as data arrives. Actually, 10,000+ is a small number. Twitter is able to process millions of tweets per second on a large cluster. It depends on how well the Storm topology is written, how well it is tuned, and the cluster size.
Storm program (a.k.a topologies) are designed to run 24x7 and will not stop until someone stops them explicitly.
Storm is written using both Clojure as well as Java. Clojure is a Lisp, functional programming language that runs on JVM and is best for concurrency and parallel programming. Storm leverages the mature Java library, which was built over the last 10 years. All of these can be found inside the storm
/lib
folder.
Before Storm became popular, real-time or near-real-time processing problems were solved using intermediate brokers and with the help of message queues. Listener or worker processes run using the Python or Java languages. For parallel processing, code was dependent on the threading model supplied using the programming language itself. Many times, the old style of working did not utilize CPU and memory very well. In some cases, mainframes were used as well, but they also became outdated over time. Distributed computing was not so easy. There were either many intermediate outputs or hops in this old style of working. There was no way to perform a fail replay automatically. Storm addressed all of these pain areas very well. It is one of the best real-time computation frameworks available for use.
Here are Storm's key features; they address the aforementioned problems:
Simple to program: It's easy to learn the Storm framework. You can write code in the programming language of your choice and can also use the existing libraries of that programming language. There is no compromise.
Storm already supports most programming languages: However, even if something is not supported, it can be done by supplying code and configuration using the JSON protocol defined in the Storm Data Specification Language (DSL).
Horizontal scalability or distributed computing is possible: Computation can be multiplied by adding more machines to the Storm cluster without stopping running programs, also known as topologies.
Fault tolerant: Storm manages worker and machine-level failure. Heartbeats of each process are tracked to manage different types of failure, such as task failure on one machine or an entire machine's failure.
Guaranteed message processing: There is a provision of performing auto and explicit ACK within storm processes on messages (tuples). If ACK is not received, storm can do a reply of a message.
Free, open source, and lots of open source community support: Being an Apache project, Storm has free distribution and modifying rights without any worry about the legal aspect. Storm gets a lot of attention from the open source community and is attracting a large number of good developers to contribute to the code.
The Storm cluster can be set up in four flavors based on the requirement. If you want to set up a large cluster, go for distributed installation. If you want to learn Storm, then go for a single machine installation. If you want to connect to an existing Storm cluster, use client mode. Finally, if you want to perform development on an IDE, simply unzip the storm
TAR and point to all dependencies of the storm
library. At the initial learning phase, a single-machine storm installation is actually what you need.
A developer can download storm from the distribution site, unzip it somewhere in $HOME
, and simply submit the Storm topology as local mode. Once the topology is successfully tested locally, it can be submitted to run over the cluster.
This flavor is best for students and medium-scale computation. Here, everything runs on a single machine, including Zookeeper, Nimbus, and Supervisor. Storm/bin
is used to run all commands. Also, no extra Storm client is required. You can do everything from the same machine. This case is well demonstrated in the following figure:

This option is required when you have a large-scale computation requirement. It is a horizontal scaling option. The following figure explains this case in detail. In this figure, we have five physical machines, and to increase fault tolerance in the systems, we are running Zookeeper on two machines. As shown in the diagram, Machine 1 and Machine 2 are a group of Zookeeper machines; one of them is the leader at any point of time, and when it dies, the other becomes the leader. Nimbus is a lightweight process, so it can run on either machine, 1 or 2. We also have Machine 3, Machine 4, and Machine 5 dedicated for performing actual processing. Each one of these machines (3, 4, and 5) requires a supervisor daemon to run over there. Machines 3, 4, and 5 should know where the Nimbus/Zookeeper daemon is running and that entry should be present in their storm.yaml
.

So, each physical machine (3, 4, and 5) runs one supervisor daemon, and each machine's storm.yaml
points to the IP address of the machine where Nimbus is running (this can be 1 or 2). All Supervisor machines must add the Zookeeper IP addresses (1 and 2) to storm.yaml
. The Storm UI daemon should run on the Nimbus machine (this can be 1 or 2).
The Storm client is required only when you have a Storm cluster of multiple machines. To start the client, unzip the Storm distribution and add the Nimbus IP address to the storm.yaml
file. The Storm client can be used to submit Storm topologies and check the status of running topologies from command-line options. Storm versions older than 0.9 should put the yaml
file inside $STORM_HOME/.storm/storm.yaml
(not required for newer versions).
Installing Java and Python is easy. Let's assume our Linux machine is ready with Java and Python:
A Linux machine (Storm version 0.9 and later can also run on Windows machines)
Java 6 (
set export PATH=$PATH:$JAVA_HOME/bin
)Python 2.6 (required to run Storm daemons and management commands)
We will be making lots of changes in the storm configuration file (that is, storm.yaml
), which is actually present under $STORM_HOME/config
. First, we start the Zookeeper process, which carries out coordination between Nimbus and the Supervisors. Then, we start the Nimbus master daemon, which distributes code in the Storm cluster. Next, the Supervisor daemon listens for work assigned (by Nimbus) to the node it runs on and starts and stops the worker processes as necessary.
ZeroMQ/JZMQ and Netty are inter-JVM communication libraries that permit two machines or two JVMs to send and receive process data (tuples) between each other. JZMQ is a Java binding of ZeroMQ. The latest versions of Storm (0.9+) have now been moved to Netty. If you download an old version of Storm, installing ZeroMQ and JZMQ is required. In this book, we will be considering only the latest versions of Storm, so you don't really require ZeroMQ/JZMQ.
Zookeeper is a coordinator for the Storm cluster. The interaction between Nimbus and worker nodes is done through Zookeeper. The installation of Zookeeper is well explained on the official website at http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html#sc_InstallingSingleMode.
The setup can be downloaded from:
https://archive.apache.org/dist/zookeeper/zookeeper-3.3.5/zookeeper-3.3.5.tar.gz. After downloading, edit the zoo.cfg
file.
The following are the Zookeeper commands that are used:
Alternatively, use jps
to find <pid>
and then use kill -9 <pid>
to kill the processes.
Storm can be installed in either of these two ways:
Download directly from the following link: https://storm.apache.org/downloads.html
Storm configurations can be done using storm.yaml
, which is present in the conf
folder.
The following are the configurations for a single-machine Storm cluster installation.
Port # 2181
is the default port of Zookeeper. To add more than one zookeeper
, keep entry – separated:
storm.zookeeper.servers: - "localhost" # you must change 2181 to another value if zookeeper running on another port. storm.zookeeper.port: 2181 # In single machine mode nimbus run locally so we are keeping it localhost. # In distributed mode change localhost to machine name where nimbus daemon is running. nimbus.host: "localhost" # Here storm will generate logs of workers, nimbus and supervisor. storm.local.dir: "/var/stormtmp" java.library.path: "/usr/local/lib" # Allocating 4 ports for workers. More numbers can also be added. supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 # Memory is allocated to each worker. In below case we are allocating 768 mb per worker.worker.childopts: "-Xmx768m" # Memory to nimbus daemon- Here we are giving 512 mb to nimbus. nimbus.childopts: "-Xmx512m" # Memory to supervisor daemon- Here we are giving 256 mb to supervisor.
Note
Notice supervisor.childopts: "-Xmx256m"
. In this setting, we reserved four supervisor ports, which means that a maximum of four worker processes can run on this machine.
storm.local.dir
: This directory location should be cleaned if there is a problem with starting Nimbus and Supervisor. In the case of running a topology on the local IDE on a Windows machine, C:\Users\<User-Name>\AppData\Local\Temp
should be cleaned.
Netty enables inter JVM communication and it is very simple to use.
You don't really need to install anything extra for Netty. This is because it's a pure Java-based communication library. All new versions of Storm support Netty.
Add the following lines to your storm.yaml
file. Configure and adjust the values to best suit your use case:
storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 storm.messaging.netty.max_retries: 100 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100
Storm daemons are the processes that are needed to pre-run before you submit your program to the cluster. When you run a topology program on a local IDE, these daemons auto-start on predefined ports, but over the cluster, they must run at all times:
Start the master daemon,
nimbus
. Go to thebin
directory of the Storm installation and execute the following command (assuming thatzookeeper
is running):./storm nimbus Alternatively, to run in the background, use the same command with nohup, like this: Run in background nohup ./storm nimbus &
Now we have to start the
supervisor
daemon. Go to thebin
directory of the Storm installation and execute this command:./storm supervisor
To run in the background, use the following command:
nohup ./storm supervisor &
Let's start the
storm
UI. The Storm UI is an optional process. It helps us to see the Storm statistics of a running topology. You can see how many executors and workers are assigned to a particular topology. The command needed to run the storm UI is as follows:./storm ui
Alternatively, to run in the background, use this line with
nohup
:nohup ./storm ui &
To access the Storm UI, visit
http://localhost:8080
.We will now start
storm logviewer
. Storm UI is another optional process for seeing the log from the browser. You can also see thestorm
log using the command-line option in the$STORM_HOME/logs
folder. To start logviewer, use this command:./storm logviewer
To run in the background, use the following line with
nohup
:nohup ./storm logviewer &
DRPC daemon: DRPC is another optional service. DRPC stands for Distributed Remote Procedure Call. You will require the DRPC daemon if you want to supply to the storm topology an argument externally through the DRPC client. Note that an argument can be supplied only once, and the DRPC client can wait for long until storm topology does the processing and the return. DRPC is not a popular option to use in projects, as firstly, it is blocking to the client, and secondly, you can supply only one argument at a time. DRPC is not supported by Python and Petrel.
Summarizing, the steps for starting processes are as follows:
First, all the Zookeeper daemons.
Nimbus daemons.
Supervisor daemon on one or more machine.
The UI daemon where Nimbus is running (optional).
The Logviewer daemon (optional).
Submitting the topology.
You can restart the nimbus
daemon anytime without any impact on existing processes or topologies. You can restart the supervisor daemon and can also add more supervisor machines to the Storm cluster anytime.
To submit jar
to the Storm cluster, go to the bin
directory of the Storm installation and execute the following command:
./storm jar <path-to-topology-jar> <class-with-the-main> <arg1> … <argN>
All the previous settings are required to start the cluster, but there are many other settings that are optional and can be tuned based on the topology's requirement. A prefix can help find the nature of a configuration. The complete list of default yaml
configuration is available at https://github.com/apache/storm/blob/master/conf/defaults.yaml.
Configurations can be identified by how the prefix starts. For example, all UI configurations start with ui*
.
Nature of the configuration | Prefix to look into |
---|---|
General |
|
Nimbus |
|
UI |
|
Log viewer |
|
DRPC |
|
Supervisor |
|
Topology |
|
All of these optional configurations can be added to STORM_HOME/conf/storm.yaml
for any change other than the default values. All settings that start with topology.*
can either be set programmatically from the topology or from storm.yaml
. All other settings can be set only from the storm.yaml
file. For example, the following table shows three different ways to play with these parameters. However, all of these three do the same thing:
/conf/storm.yaml | Topology builder | Custom yaml |
---|---|---|
Changing (impacts all the topologies of the cluster) | Changing the topology builder while writing code (impacts only the current topology) | Supplying (impacts only the current topology) |
|
This is supplied through Python code | Create Python:
|
Any configuration change in storm.yaml
will affect all running topologies, but when using the conf.setXXX
option in code, different topologies can overwrite that option, what is best suited for each of them.
Here comes the conclusion of the first chapter. This chapter gave an overview of how applications were developed before Storm came into existence. A brief knowledge of what real-time computations are and how Storm, as a programming framework, is becoming so popular was also acquired as we went through the chapter and approached the conclusion. This chapter taught you to perform Storm configurations. It also gave you details about the daemons of Storm, Storm clusters, and their step up. In the next chapter, we will be exploring the details of Storm's anatomy.