Deploying Storm on Hadoop for Advertising Analysis

(For more resources related to this topic, see here.)

Establishing the architecture

The recent componentization within Hadoop allows any distributed system to use it for resource management. In Hadoop 1.0, resource management was embedded into the MapReduce framework as shown in the following diagram:

Hadoop 2.0 separates out resource management into YARN, allowing other distributed processing frameworks to run on the resources managed under the Hadoop umbrella. In our case, this allows us to run Storm on YARN as shown in the following diagram:

As shown in the preceding diagram, Storm fulfills the same function as MapReduce. It provides a framework for the distributed computation. In this specific use case, we use Pig scripts to articulate the ETL/analysis that we want to perform on the data. We will convert that script into a Storm topology that performs the same function, and then we will examine some of the intricacies involved in doing that transformation.

To understand this better, it is worth examining the nodes in a Hadoop cluster and the purpose of the processes running on those nodes. Assume that we have a cluster as depicted in the following diagram:

There are two different components/subsystems shown in the diagram. The first is YARN, which is the new resource management layer introduced in Hadoop 2.0. The second is HDFS. Let's first delve into HDFS since that has not changed much since Hadoop 1.0.

Examining HDFS

HDFS is a distributed filesystem. It distributes blocks of data across a set of slave nodes. The NameNode is the catalog. It maintains the directory structure and the metadata indicating which nodes have what information. The NameNode does not store any data itself, it only coordinates create, read, update, and delete (CRUD) operations across the distributed filesystem. Storage takes place on each of the slave nodes that run DataNode processes. The DataNode processes are the workhorses in the system. They communicate with each other to rebalance, replicate, move, and copy data. They react and respond to the CRUD operations of clients.

Examining YARN

YARN is the resource management system. It monitors the load on each of the nodes and coordinates the distribution of new jobs to the slaves in the cluster. The ResourceManager collects status information from the NodeManagers. The ResourceManager also services job submissions from clients.

One additional abstraction within YARN is the concept of an ApplicationMaster. An ApplicationMaster manages resource and container allocation for a specific application. The ApplicationMaster negotiates with the ResourceManager for the assignment of resources. Once the resources are assigned, the ApplicationMaster coordinates with the NodeManagers to instantiate containers. The containers are logical holders for the processes that actually perform the work.

The ApplicationMaster is a processing-framework-specific library. Storm-YARN provides the ApplicationMaster for running Storm processes on YARN. HDFS distributes the ApplicationMaster as well as the Storm framework itself. Presently, Storm-YARN expects an external ZooKeeper. Nimbus starts up and connects to the ZooKeeper when the application is deployed.

The following diagram depicts the Hadoop infrastructure running Storm via Storm-YARN:

As shown in the preceding diagram, YARN is used to deploy the Storm application framework. At launch, Storm Application Master is started within a YARN container. That, in turn, creates an instance of Storm Nimbus and the Storm UI.

After that, Storm-YARN launches supervisors in separate YARN containers. Each of these supervisor processes can spawn workers within its container.

Both Application Master and the Storm framework are distributed via HDFS. Storm-YARN provides command-line utilities to start the Storm cluster, launch supervisors, and configure Storm for topology deployment. We will see these facilities later in this article.

To complete the architectural picture, we need to layer in the batch and real-time processing mechanisms: Pig and Storm topologies, respectively. We also need to depict the actual data.

Often a queuing mechanism such as Kafka is used to queue work for a Storm cluster. To simplify things, we will use data stored in HDFS. The following depicts our use of Pig, Storm, YARN, and HDFS for our use case, omitting elements of the infrastructure for clarity. To fully realize the value of converting from Pig to Storm, we would convert the topology to consume from Kafka instead of HDFS as shown in the following diagram:

As the preceding diagram depicts, our data will be stored in HDFS. The dashed lines depict the batch process for analysis, while the solid lines depict the real-time system. In each of the systems, the following steps take place:



Pig Equivalent

Storm-Yarn Equivalent


The processing frameworks are deployed

The MapReduce Application Master is deployed and started

Storm-YARN launches Application Master and distributes Storm framework


The specific analytics are launched

The Pig script is compiled to MapReduce jobs and submitted as a job

Topologies are deployed to the cluster


The resources are reserved

Map and reduce tasks are created in YARN containers

Supervisors are instantiated with workers


The analyses reads the data from storage and performs the analyses

Pig reads the data out of HDFS

Storm reads the work, typically from Kafka; but in this case, the topology reads it from a flat file

Another analogy can be drawn between Pig and Trident. Pig scripts compile down into MapReduce jobs, while Trident topologies compile down into Storm topologies.

For more information on the Storm-YARN project, visit the following URL:

Configuring the infrastructure

First, we need to configure the infrastructure. Since Storm will run on the YARN infrastructure, we will first configure YARN and then show how to configure Storm-YARN for deployment on that cluster.

The Hadoop infrastructure

To configure a set of machines, you will need a copy of Hadoop residing on them or a copy that is accessible to each of them. First, download the latest copy of Hadoop and unpack the archive. For this example, we will use Version 2.1.0-beta.

Assuming that you have uncompressed the archive into /home/user/hadoop, add the following environment variables on each of the nodes in the cluster:

export HADOOP_PREFIX=/home/user/hadoop export HADOOP_YARN_HOME=/home/user/hadoop export HADOOP_CONF_DIR=/home/user/hadoop/etc/Hadoop

Add YARN to your execute path as follows:


All the Hadoop configuration files are located in $HADOOP_CONF_DIR. The three key configuration files for this example are: core-site.xml, yarn-site.xml, and hdfs-site.xml.

In this example, we will assume that we have a Master node named master and four slave-nodes named slave01-04.

Test the YARN configuration by executing the following command line:

$ yarn version You should see output similar to the following: Hadoop 2.1.0-beta Subversion -r 1514472 Compiled by hortonmu on 2013-08-15T20:48Z Compiled with protoc 2.5.0 From source with checksum 8d753df8229fd48437b976c5c77e80a This command was run using /Users/bone/tools/hadoop-2.1.0-beta/share/ hadoop/common/hadoop-common-2.1.0-beta.jar

Configuring HDFS

As per the architecture diagram, to configure HDFS you need to start the NameNode and then connect one or more DataNode.

Configuring the NameNode

To start the NameNode, you need to specify a host and port. Configure the host and port in the core-site.xml file by using the following elements:

<configuration> <property> <name></name> <value>hdfs://master:8020</value> </property> </configuration>

Additionally, configure where the NameNode stores its metadata. This configuration is stored in the hdfs-site.xml file, in the variable.

To keep the example simple, we will also disable security on the distributed filesystem. To do this, we set dfs.permissions to False. After these edits, the HDFS configuration file looks like the following code snippet:

<configuration> <property> <name></name> <value>/home/user/hadoop/name/data</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> </configuration>

The final step before starting the NameNode is the formatting of the distributed filesystem. Do this with the following command:

hdfs namenode -format <cluster_name>

Finally, we are ready to start the NameNode. Do so with the following command:

$HADOOP_PREFIX/sbin/ --config $HADOOP_CONF_DIR --script hdfs start namenode

The last line of the startup will indicate where the logs are located:

starting namenode, logging to /home/user/hadoop/logs/hadoop-master.

Despite the message, the logs will actually be located in another file with the same name but with the suffix log instead of out.

Also, ensure that the name directory you declared in the configuration exists; otherwise, you will receive the following error in the logfile:

org.apache.hadoop.hdfs.server.common. InconsistentFSStateException: Directory /home/user/ hadoop-2.1.0-beta/name/data is in an inconsistent state: storage directory
does not exist or is not accessible.

Verify that the NameNode has started with the following code snippet:

boneill@master:~-> jps 30080 NameNode

Additionally, you should be able to navigate to the UI in a web browser. By default, the server starts on port 50070. Navigate to http://master:50070 in a browser. You should see the following screenshot:

Clicking on the Live Nodes link will show the nodes available and the space allocation per node, as shown in the following screenshot:

Finally, from the main page, you can also browse the filesystem by clicking on Browse the filesystem.

Configuring the DataNode

In general, it is easiest to share the core configuration file between nodes in the cluster. The data nodes will use the host and port defined in the core-site.xml file to locate the NameNode and connect to it.

Additionally, each DataNode needs to configure the location for local storage. This is defined in the following element within the hdfs-site.xml file:

<configuration> <property> <name></name> <value>/vol/local/storage/</value> </property> </configuration>

If this location is consistent across slave machines, then this configuration file can be shared as well. With this set, you can start the DataNode with the following command:

$HADOOP_PREFIX/sbin/ --config $HADOOP_CONF_DIR --script hdfs start datanode

Once again, verify that the DataNode is running using jps and monitor the logs for any errors. In a few moments, the DataNode should appear in the Live Nodes screen of the NameNode as previously shown.

Configuring YARN

With HDFS up and running, it is now time to turn our attention to YARN. Similar to what we did with HDFS, we will first get the ResourceManager running and then we will attach slave nodes by running NodeManager.

Configuring the ResourceManager

The ResourceManager has various subcomponents, each of which acts as a server that requires a host and port on which to run. All of the servers are configured within the yarn-site.xml file.

For this example, we will use the following YARN configuration:

<configuration> <property> <name>yarn.resourcemanager.address</name> <value>master:8022</value> </property> <property> <name>yarn.resourcemanager.admin.address</name> <value>master:8033</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>master:8025</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>master:8030</value> </property> <property> <name>yarn.acl.enable</name> <value>false</value> </property> <property> <name>yarn.nodemanager.local-dirs</name> <value>/home/user/hadoop_work/mapred/nodemanager</value> <final>true</final> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce.shuffle</value> </property> </configuration>

The first four variables in the preceding configuration file assign host and ports for the subcomponents. Setting the yarn.acl.enable variable to False disables security on the YARN cluster. The yarn.nodemanager.local-dirs variable specifies the place on the local filesystem where YARN will place the data.

Finally, the yarn.nodemanager.aux-services variable starts an auxiliary service within the NodeManager's runtime to support MapReduce jobs. Since our Pig scripts compile down into MapReduce jobs, they depend on this variable.

Like the NameNode, start the ResourceManager with the following command line:

$HADOOP_YARN_HOME/sbin/ --config $HADOOP_CONF_DIR start resourcemanager

Again, check for the existence of the process with jps, monitor the logs for exceptions, and then you should be able to navigate to the UI which by default runs on port 8088.

The UI is shown in the following screenshot:

Configuring the NodeManager

The NodeManager uses the same configuration file (yarn-site.xml) to locate the respective servers. Thus, it is safe to copy or share that file between the nodes in the cluster.

Start the NodeManager with the following command:

start nodemanager

After all NodeManagers register with the ResourceManager, you will be able to see them in the ResourceManager UI after clicking on Nodes, as shown in the following screenshot:

Deploying the analytics

With Hadoop in place, we can now focus on the distributed processing frameworks that we will use for analysis.

Performing a batch analysis with the Pig infrastructure

The first of the distributed processing frameworks that we will examine is Pig. Pig is a framework for data analysis. It allows the user to articulate analysis in a simple high-level language. These scripts then compile down to MapReduce jobs.

Although Pig can read data from a few different systems (for example, S3), we will use HDFS as our data storage mechanism in this example. Thus, the first step in our analysis is to copy the data into HDFS.

To do this, we issue the following Hadoop commands:

hadoop fs -mkdir /user/bone/temp hadoop fs -copyFromLocal click_thru_data.txt /user/bone/temp/

The preceding commands create a directory for the data file and copy the click-thru data file into that directory.

To execute a Pig script against that data, we will need to install Pig. For this, we simply download Pig and expand the archive on that machine configured with Hadoop. For this example, we will use Version 0.11.1.

Just like we did with Hadoop, we will add the following environment variables to our environment:

export PIG_CLASSPATH=/home/user/hadoop/etc/hadoop export PIG_HOME=/home/user/pig export PATH=PATH:$HOME/bin:$PIG_HOME/bin:$HADOOP_YARN_HOME/bin

The PIG_CLASSPATH variable tells Pig where to find Hadoop.

Once you have those variables in your environment, you should be able to test your Pig installation with the following commands:

boneill@master:~-> pig 2013-10-07 23:35:41,179 [main] INFO org.apache.pig.Main - Apache Pig version 0.11.1 (r1459641) compiled Mar 22 2013, 02:13:53... 2013-10-07 23:35:42,639 [main] INFO org.apache.pig.backend.hadoop. executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:8020 grunt>

By default, Pig will read the Hadoop configuration and connect to the distributed filesystem. You can see that in the previous output. It is connected to our distributed filesystem at hdfs://master:8020.

Via Pig, you can interact with HDFS in the same way as you would with a regular filesystem. For example, lsand catboth work as shown in the following code snippet:

grunt> ls /user/bone/temp/ hdfs://master:8020/user/bone/temp/click_thru_data.txt<r 3> 157 grunt> cat /user/bone/temp/click_thru_data.txt boneill campaign7 productX true lisalis campaign10 productX false boneill campaign6 productX true owen campaign6 productX false collin campaign7 productY true maya campaign8 productY true boneill campaign7 productX true owen campaign6 productX true olive campaign6 productX false maryanne campaign7 productY true dennis campaign7 productY true patrick campaign7 productX false charity campaign10 productY false drago campaign7 productY false

Performing a real-time analysis with the Storm-YARN infrastructure

Now that we have infrastructure working for batch processing, let's leverage the exact same infrastructure for real-time processing. Storm-YARN makes it easy to reuse the Hadoop infrastructure for Storm.

Since Storm-YARN is a new project, it is best to build from source and create the distribution using the instructions in the README file found at the following URL:

After building the distribution, you need to copy the Storm framework into HDFS. This allows Storm-YARN to deploy the framework to each of the nodes in the cluster. By default, Storm-YARN will look for the Storm library as a ZIP file in the launching user's directory on HDFS. Storm-YARN provides a copy of a compatible Storm in the libdirectory of its distribution.

Assuming that you are in the Storm-YARN directory, you can copy the ZIP file into the correct HDFS directory with the following commands:

hadoop fs -mkdir /user/bone/lib/ hadoop fs -copyFromLocal ./lib/ /user/bone/lib/

You can then verify that the Storm framework is HDFS by browsing the filesystem through the Hadoop administration interface. You should see the following screenshot:

With the Storm framework staged on HDFS, the next step is to configure the local YAML file for Storm-YARN. The YAML file used with Storm-YAML is the configuration for both Storm-YAML and Storm. The Storm-specific parameters in the YAML file get passed along to Storm.

An example of the YAML file is shown in the following code snippet: "master" master.thrift.port: 9000 master.initial-num-supervisors: 2 master.container.priority: 0 master.container.size-mb: 5120 master.heartbeat.interval.millis: 1000 master.timeout.secs: 1000 10000 10000 ui.port: 7070 storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.messaging.netty.buffer_size: 1048576 storm.messaging.netty.max_retries: 100 storm.messaging.netty.min_wait_ms: 1000 storm.messaging.netty.max_wait_ms: 5000 storm.zookeeper.servers: - "zkhost"

Many of the parameters are self-descriptive. However, take note of the last variable in particular. This is the location of the ZooKeeper host. Although it might not be the case always, for now Storm-YARN assumes you have a pre-existing ZooKeeper.

To monitor whether Storm-YARN will continue to require a preexisting ZooKeeper instance, go through the information available at the following link:

With the the Storm framework in HDFS and the YAML file configured, the command line to launch Storm on YARN is the following:

storm-yarn launch ../your.yaml --queue default
-appname storm-yarn-2.1.0-deta-demo --stormZip

You specify the location of the YAML file, the queue for YARN, a name for the application, and the location of the ZIP file, which is relative to the user directory unless a full path is specified.

Queues in YARN are beyond the scope of this discussion, but by default YARN is configured with a default queue that is used in the preceding command line. If you are running Storm on a preexisting cluster, examine capacity-scheduler.xml in the YARN configuration to locate potential queue names.

After executing the preceding command line, you should see the application deployed in the YARN administration screen, as shown in the following screenshot:

Clicking on the application shows where the application master is deployed. Examine the node value for the Application Master. This is where you will find the Storm UI as shown in the following screenshot:

Drilling down one more level, you will be able to see the logfiles for Storm, as shown in the following screenshot:

With any luck, the logs will show a successful startup of Nimbus and the UI. Examining the standard output stream, you will see Storm-YARN launching the supervisors:

13/10/09 21:40:10 INFO yarn.StormAMRMClient: Use NMClient to launch
supervisors in container. 13/10/09 21:40:10 INFO impl.
ContainerManagementProtocolProxy: Opening proxy : slave05:35847 13/10/09 21:40:12 INFO yarn.StormAMRMClient: Supervisor log: http://slave05:8042/node/containerlogs/ container_1381197763696_0004_01_000002/boneill/supervisor.log 13/10/09 21:40:14 INFO yarn.MasterServer: HB: Received allocated containers (1) 13/10/09 21:40:14 INFO yarn.MasterServer: HB: Supervisors are to run, so queueing (1) containers... 13/10/09 21:40:14 INFO yarn.MasterServer: LAUNCHER: Taking container with id (container_1381197763696_0004_01_000004) from the queue. 13/10/09 21:40:14 INFO yarn.MasterServer: LAUNCHER: Supervisors are to run, so launching container id (container_1381197763696_0004_01_000004) 13/10/09 21:40:16 INFO yarn.StormAMRMClient: Use NMClient to launch supervisors in container.
13/10/09 21:40:16 INFO impl. > ContainerManagementProtocolProxy: Opening proxy : dlwolfpack02. 13/10/09 21:40:16 INFO yarn.StormAMRMClient: Supervisor log: http://slave02:8042/node/containerlogs/ container_1381197763696_0004_01_000004/boneill/supervisor.log

The key lines in the preceding output are highlighted. If you navigate to those URLs, you will see the supervisor logs for the respective instances. Looking back at the YAML file we used to launch Storm-YARN, notice that we specified the following:

master.initial-num-supervisors: 2

Navigate to the UI using the node that hosts the ApplicationMaster, and then navigate to the UI port specified in the YAML file used for launch (ui.port: 7070).

In a browser, open http://node:7070/, where node is the host for the Application Master. You should see the familiar Storm UI as shown in the following screenshot:

The infrastructure is now ready for use. To kill the Storm deployment on YARN, you can use the following command:

./storm-yarn shutdown -appId application_1381197763696_0002

In the preceding statement, the appId parameter corresponds to the appId parameter assigned to Storm-YARN, and it is visible in the Hadoop administration screen.

Storm-YARN will use the local Hadoop configuration to locate the master Hadoop node. If you are launching from a machine that is not a part of the Hadoop cluster, you will need to configure that machine with the Hadoop environment variables and configuration files. Specifically, it launches through the ResourceManager. Thus, you will need the following variables configured in yarn-site.xml:


Performing the analytics

With both the batch and real-time infrastructure in place, we can focus on the analytics. First, we will take a look at the processing in Pig, and then we will translate the Pig script into a Storm topology.

Executing the batch analysis

For the batch analysis, we use Pig. The Pig script calculates the effectiveness of a campaign by computing the ratio between the distinct numbers of customers that have clicked-thru and the total number of impressions.

The Pig script is shown in the following code snippet:

click_thru_data = LOAD '../click_thru_data.txt' using PigStorage(' ') AS (cookie_id:chararray, campaign_id:chararray, product_id:chararray, click:chararray); click_thrus = FILTER click_thru_data BY click == 'true'; distinct_click_thrus = DISTINCT click_thrus; distinct_click_thrus_by_campaign = GROUP distinct_click_thrus BY campaign_id; count_of_click_thrus_by_campaign = FOREACH distinct_click_thrus_by_ campaign GENERATE group, COUNT($1); -- dump count_of_click_thrus_by_campaign; impressions_by_campaign = GROUP click_thru_data BY campaign_id; count_of_impressions_by_campaign = FOREACH impressions_by_campaign
GENERATE group, COUNT($1); -- dump count_of_impressions_by_campaign; joined_data = JOIN count_of_impressions_by_campaign BY $0 LEFT OUTER, count_of_click_thrus_by_campaign BY $0 USING 'replicated'; -- dump joined_data; result = FOREACH joined_data GENERATE $0 as campaign, ($3 is null ? 0 : $3) as clicks, $1 as impressions, (double)$3/(double)$1 as effectiveness:double; dump result;

Let's take a closer look at the preceding code.

The first LOAD statement specifies the location of the data and a schema with which to load the data. Typically, Pig loads denormalized data. The location for the data is a URL. When operating in local mode, as previously shown, this is a relative path. When running in MapReduce mode, the URL will most likely be a location in HDFS. When running a Pig script against Amazon Web Services (AWS), this will most likely be an S3 URL.

In the subsequent lines after the Load statement, the script calculates all the distinct click-thru. In the first line, it filters the dataset for only the rows that have True in the column, which indicates that the impression resulted in a click-thru. After filtering, the rows are filtered for only distinct entries. The rows are then grouped by campaign and each distinct click-thru is counted by campaign. The results of this analysis are stored in the alias count_of_click_thrus_by_campaign.

The second dimension of the problem is then computed in the subsequent lines. No filter is necessary since we simply want a count of the impressions by campaign. The results of this are stored in the alias count_of_impressions_by_campaign.

Executing the Pig script yields the following output:

(campaign6,2,4,0.5) (campaign7,4,7,0.5714285714285714) (campaign8,1,1,1.0) (campaign10,0,2,)

The first element in the output is the campaign identifier. The number of all the distinct click-thru and the total number of impressions follow that. The last element is the effectiveness, which is the ratio of all the distinct click-thru to total number of impressions.

Executing real-time analysis

Now, let's translate the batch analysis into real-time analysis. A strict interpretation of the Pig script might result in the following topology:

Stream inputStream = topology.newStream("clickthru", spout); Stream click_thru_stream = inputStream.each( new Fields("cookie", "campaign", "product", "click"), new Filter("click", "true")) .each(new Fields("cookie", "campaign", "product", "click"), new Distinct()) .groupBy(new Fields("campaign")) .persistentAggregate( new MemoryMapState.Factory(), new Count(), new Fields("click_thru_count")) .newValuesStream(); Stream impressions_stream = inputStream.groupBy( new Fields("campaign")) .persistentAggregate( new MemoryMapState.Factory(), new Count(), new Fields("impression_count")) .newValuesStream(); topology.join(click_thru_stream, new Fields("campaign"), impressions_stream, new Fields("campaign"), new Fields("campaign", "click_thru_count", "impression_count")) .each(new Fields("campaign", "click_thru_count", "impression_count"), new CampaignEffectiveness(), new Fields(""));

In the preceding topology, we fork the stream into two separate streams: click_thru_stream and impressions_stream. The click_thru_stream contains the count of distinct impressions. The impressions_stream contains the total count of impressions. Those two streams are then joined using the topology.join method.

The issue with the preceding topology is the join. In Pig, since the sets are static they can easily be joined. Joins within Storm are done on a per batch basis. This would not necessarily be a problem. However, the join is also an inner join, which means records are only emitted if there are corresponding tuples between the streams. In this case, we are filtering records from the click_thru_stream because we only want distinct records. Thus, the cardinality of that stream is smaller than that of the impressions_stream, which means that tuples are lost in the join process.

Operations such as join are well defined for discrete sets, but it is unclear how to translate their definitions into a real-time world of infinite event streams. For more on this, visit the following URLs:

Instead, we will use Trident's state construct to share the counts between the streams.

This is shown in the corrected topology in the following diagram:

The code for this topology is as follows:

StateFactory clickThruMemory = new MemoryMapState.Factory(); ClickThruSpout spout = new ClickThruSpout(); Stream inputStream = topology.newStream("clithru", spout); TridentState clickThruState = inputStream.each( new Fields("cookie", "campaign", "product", "click"), new Filter("click", "true")) .each(new Fields("cookie", "campaign", "product", "click"), new Distinct()) .groupBy(new Fields("campaign")) .persistentAggregate(clickThruMemory, new Count(), new Fields("click_thru_count")); inputStream.groupBy(new Fields("campaign")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("impression_count")) .newValuesStream() .stateQuery(clickThruState, new Fields("campaign"), new MapGet(), new Fields("click_thru_count")) .each(new Fields("campaign", "impression_count", "click_thru_count"), new CampaignEffectiveness(), new Fields(""));

Let's first take a look at the spout. It simply reads the file, parses the rows, and emits the tuples, as shown in the following code snippet:

public class ClickThruEmitter implements Emitter<Long>, Serializable { ... @Override public void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) { File file = new File("click_thru_data.txt"); try { BufferedReader br = new BufferedReader(new FileReader(file)); String line = null; while ((line = br.readLine()) != null) { String[] data = line.split(" "); List<Object> tuple = new ArrayList<Object>(); tuple.add(data[0]); // cookie tuple.add(data[1]); // campaign tuple.add(data[2]); // product tuple.add(data[3]); // click collector.emit(tuple); } br.close(); } catch (Exception e) { throw new RuntimeException(e); } } }

In a real system, the preceding spout would most likely read from a Kafka queue. Alternatively, a spout could read directly from HDFS if we sought to recreate exactly what the batch processing mechanism was doing.

There is some preliminary work on a spout that can read from HDFS; check out the following URL for more information:

To compute the distinct count of all the click-thru, the topology first filters the stream for only those impressions that resulted in a click-thru.

The code for this filter is as follows:

public class Filter extends BaseFilter { private static final long serialVersionUID = 1L; private String fieldName = null; private String value = null; public Filter(String fieldName, String value){ this.fieldName = fieldName; this.value = value; } @Override public boolean isKeep(TridentTuple tuple) { String tupleValue = tuple.getStringByField(fieldName); if (tupleValue.equals(this.value)) { return true; } return false; } }

Then, the stream filters for only distinct click-thrus. In this example, it uses an in memory cache to filter for distinct tuples. In reality, this should use distributed state and/or a grouping operation to direct like tuples to the same host. Without persistent storage, the example would eventually run out of memory in the JVM.

There is active work on algorithms to approximate distinct sets against data streams. For more information on Streaming Quotient Filter (SQF), check out the following URL:

For our example, the Distinct function is shown in the following code snippet:

public class Distinct extends BaseFilter { private static final long serialVersionUID = 1L; private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>()); @Override public boolean isKeep(TridentTuple tuple) { String id = this.getId(tuple);

Once it has all the distinct click-thru, Storm persists that information into Trident state using a call to persistAggregate. This collapses the stream by using the Count operator. In the example, we use a MemoryMap. However, in a real system we would most likely apply a distributed storage mechanism such as Memcache or Cassandra.

The result of processing the initial stream is a TridentState object that contains the count of all the distinct click-thru grouped by the campaign identifier. The critical line that joins the two streams is shown as follows:

.stateQuery(clickThruState, new Fields("campaign"), new MapGet(), new Fields("click_thru_count"))

This incorporates the state developed in the initial stream into the analysis developed by the second stream. Effectively, the second stream queries the state mechanism for the distinct count of all the click-thru for that campaign and adds it as a field to the tuples processed in this stream. That field can then be leveraged in the effectiveness computation, which is encapsulated in the following class:

public class CampaignEffectiveness extends BaseFunction { private static final long serialVersionUID = 1L; @Override public void execute(TridentTuple tuple, TridentCollector collector) { String campaign = (String) tuple.getValue(0); Long impressions_count = (Long) tuple.getValue(1); Long click_thru_count = (Long) tuple.getValue(2); if (click_thru_count == null) click_thru_count = new Long(0); double effectiveness = (double) click_thru_count / (double) impressions_count; Log.error("[" + campaign + "," + String.valueOf(click_thru_count) + "," + impressions_count + ", " + effectiveness + "]"); List<Object> values = new ArrayList<Object>(); values.add(campaign); collector.emit(values); } }

As shown in the preceding code, this class computes effectiveness by computing the ratio between the field that contains the total count and the field introduced by the state query.


In this article, we saw a few different things. First, we saw the blueprint for converting a batch processing mechanism that leverages Pig into a real-time system that is implemented in Storm.

Secondly, and perhaps most importantly, we examined Storm-YARN; it allows a user to reuse the Hadoop infrastructure to deploy Storm. Not only does this provide a means for existing Hadoop users to quickly transition to Storm, it also allows a user to capitalize on cloud mechanisms for Hadoop such as Amazon's Elastic Map Reduce (EMR). Using EMR, Storm can be deployed quickly to cloud infrastructure and scaled to meet demand.

Finally, as future work, the community is exploring methods to run Pig scripts directly on Storm. This would allow users to directly port their existing analytics over to Storm.

To monitor this work, visit

Resources for Article:


Further resources on this subject:

You've been reading an excerpt of:

Storm Blueprints: Patterns for Distributed Real-time Computation

Explore Title