Integration with Hadoop

In this article by Cyrus Dasadia, author of the book, MongoDB Cookbook Second Edition, we will cover the following recipes:

  • Executing our first sample MapReduce job using the mongo-hadoop connector
  • Writing our first Hadoop MapReduce job

(For more resources related to this topic, see here.)

Hadoop is a well-known open source software for the processing of large datasets. It also has an API for the MapReduce programming model, which is widely used. Nearly all the big data solutions have some sort of support to integrate them with Hadoop in order to use its MapReduce framework. MongoDB too has a connector that integrates with Hadoop and lets us write MapReduce jobs using the Hadoop MapReduce API, process the data residing in the MongoDB/MongoDB dumps, and write the result back to the MongoDB/MongoDB dump files. In this article, we will be looking at some recipes around the basic MongoDB and Hadoop integration.

Executing our first sample MapReduce job using the mongo-hadoop connector

In this recipe, we will see how to build the Mongo-Hadoop connector from the source and set up Hadoop just for the purpose of running the examples in a standalone mode. The connector is the backbone that runs Hadoop MapReduce jobs on Hadoop using the data in Mongo.

Getting ready

There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on Ubuntu Linux. For production, Apache Hadoop always runs on the Linux environment and Windows is not tested for production systems. For development purposes, however, Windows can be used. If you are a Windows user, I would recommend installing a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with a Linux operating system on it and an Internet connection. The version that we will set up here is 2.4.0 of Apache Hadoop. The latest version of Apache Hadoop that is supported by the mongo-hadoop connector is 2.4.0.

A Git client is needed to clone the repository of the mongo-hadoop connector to the local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git.

You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod instance listening to port 27017. It is not expected for you to be an expert in Hadoop but some familiarity with it will be helpful. Knowing the concept of MapReduce is important and knowing about the Hadoop MapReduce API will be an advantage. In this recipe, we will be explaining what is needed to get the work done. You can get more details on Hadoop and its MapReduce API from other sources. The Wikipedia page, http://en.wikipedia.org/wiki/MapReduce, gives good enough information about the MapReduce programming.

How to do it…

  1. We will first install Java, Hadoop, and the required packages. We will start with installing JDK on the operating system.  Type the following on the command prompt of the operating system:
    $ javac –version

    If the program doesn't execute and you are told about the various packages that contain javac and program, then we need to install them as follows:

    $ sudo apt-get install default-jdk

    This is all we need to do to install Java

  2. Visit the URL, http://www.apache.org/dyn/closer.cgi/hadoop/common/, and download version 2.4 (or the latest mongo-hadoop connector supports).
  3. After the .tar.gz file has been downloaded, execute the following on the command prompt:
    $ tar –xvzf <name of the downloaded .tar.gz file>
    
    $ cd <extracted directory>
  4. Open the etc/hadoop/hadoop-env.sh file and replace export JAVA_HOME = ${JAVA_HOME} with export JAVA_HOME = /usr/lib/jvm/default-java. We will now get the mongo-hadoop connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the git project from the operating system command prompt as follows:
    $git clone https://github.com/mongodb/mongo-hadoop.git
    $cd mongo-hadoop
  5. Create a soft link; the Hadoop installation directory is the same as the one that we extracted in step 3:
    $ln –s <hadoop installation directory> ~/hadoop-binaries

    For example, if Hadoop is extracted/installed in the home directory, then this is the command to be executed:

    $ln –s ~/hadoop-2.4.0 ~/hadoop-binaries

    By default, the mongo-hadoop connector will look for a Hadoop distribution in the ~/hadoop-binaries folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once this link is created, we should have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin path.

  6. We will now build the mongo-hadoop connector from the source for the Apache Hadoop version 2.4.0. The build-by-default builds for the latest version, so as of now, the -Phadoop_version parameter can be left out as 2.4 is the latest anyways.
    $./gradlew jar –Phadoop_version='2.4'

    This build process would take some time to get completed.

  7. Once the build is completed successfully, we are ready to execute our first MapReduce job. We will be doing it using a treasuryYield sample provided with the mongo-hadoop connector project. The first activity is to import the data to a collection in Mongo.
  8. Assuming that the mongod instance is up and running and listening to port 27017 for connections and the current directory is the root of the mongo-hadoop connector code base, execute the following command:
    $ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json
  9. Once the import action is successful, we are left with copying two JAR files to the lib directory. Execute the following from the operating system shell:
    $ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo...
    
    $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/
    
    $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib

    The jar built for the mongo-hadoop core to be copied was named as above for the trunk version of the code and built for hadoop-2.4.0. Change the name of the JAR accordingly when you build it yourselves for a different version of the connector and Hadoop. The Mongo driver can be the latest version. The version 2.12.0 is the latest version.

  10. Now, execute the following command on the command prompt of the operating system shell:
     ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop     jar     examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar  \
    com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig  \
    -Dmongo.input.split_size=8     -Dmongo.job.verbose=true  \
    -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in  \
    -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out
  11. The output should print out a lot of things; however, the following line in the output should tell us that the MapReduce job is successful:
  12.  14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully
    Connect the mongod instance running on localhost from the mongo client and execute a find on the following collection:
    $ mongo
    
    > use mongo_hadoop
    
    switched to db mongo_hadoop
    
    > db.yield_historical.out.find()

How it works…

Installing Hadoop is not a trivial task and we don't need to get into this to try our samples for the mongo-hadoop connector. To learn about Hadoop and its installation, there are dedicated books and articles available. For the purpose of this article, we will simply be downloading the archive and extracting and running the MapReduce jobs in a standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we will simply clone the mongo-hadoop connector recipe. You can also download a stable, built version for your version of Hadoop from https://github.com/mongodb/mongo-hadoop/releases if you prefer not to build from the source. We will then build the connector for our version of Hadoop (2.4.0) till step 13. From step 14 onwards is what we will do to run the actual MapReduce job in order to work on the data in MongoDB. We imported the data to the yield_historical.in collection, which would be used as an input to the MapReduce job. Go ahead and query the collection from the Mongo shell using the mongo_hadoop database to see a document. Don't worry if you don't understand the contents; we want to see in this example what we intend to do with this data.

The next step was to invoke the MapReduce operation on the data. The hadoop command was executed giving one jar's path, (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar). This is the jar that contains the classes implementing a sample MapReduce operation for the treasury yield. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class in this JAR file is the bootstrap class containing the main method. We will visit this class soon. There are lots of configurations supported by the connector.For now, we will just remember that mongo.input.uri and mongo.output.uri are the collections for the input and output for the MapReduce operations.

With the project cloned, you can import it to any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield and the core present in the root of the cloned repository.

Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class. This is the entry point to the MapReduce method and has a main method in it. To write MapReduce jobs for mongo using the mongo-hadoop connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool. This class implements the org.apache.hadoop.Tool interface, which has the run method and is implemented for us by the MongoTool class. All that the main method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner class by invoking its static run method passing the instance of our main class (which is an instance of Tool).

There is a static block that loads the configurations from two XML files, hadoop-local.xml and mongo-defaults.xml. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node and multiple property nodes under it.

<configuration>

  <property>

    <name>{property name}</name>

    <value>{property value}</value>

  </property>

  ...

</configuration>

The property values that make sense in this context are all those that we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig wrapping an instance of org.apache.hadoop.conf.Configuration in the constructor of the bootstrap class, TreasuryYieldXmlConfig. The MongoConfig class provides sensible defaults that are enough to satisfy the majority of the use cases. Some of the most important things that we need to set in the MongoConfig instance are to set the output and input format, mapper and reducer classes, output key, and value of the mapper, output key, and reducer. The input format and output format will always be the com.mongodb.hadoop.-MongoInputFormat and com.mongodb.hadoop.MongoOutputFormat classes, which are provided by the mongo-hadoop connector library. For the mapper and reducer output key and its value, we have the org.apache.hadoop.io.Writable implementation. Refer to the Hadoop documentation for different types of writable implementations in the org.apache.hadoop.io package. Apart from these, the mongo-hadoop connector also provides us with some implementations in the com.mongodb.hadoop.io package. For the treasury yield example, we used the BSONWritable instance. These configurable values can either be provided in the XML file that we saw earlier or be programmatically set. Finally, we have the option to provide them as vm arguments that we did for mongo.input.uri and mongo.output.uri. These parameters can be provided either in the XML or invoked directly from the code on the MongoConfig instance; the two methods are setInputURI and setOutputURI, respectively.

We will now look at the mapper and reducer class implementation. We will copy the important portion of the class here to analyze. Refer to the cloned project for the entire implementation.

public class TreasuryYieldMapper

    extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {

 

    @Override

    public void map(final Object pKey,

                    final BSONObject pValue,

                    final Context pContext)

        throws IOException, InterruptedException {

        final int year = ((Date) pValue.get("_id")).getYear() + 1900;

        double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue();

        pContext.write(new IntWritable(year), new DoubleWritable(bid10Year));

    }

}

Our mapper extends the org.apache.hadoop.mapreduce.Mapper class. The four generic parameters are for the key class, type of the input value, type of the output key, and output value. The body of the map method reads the _id value from the input document, which is the date and extracts the year out of it. Then, it gets the double value from the document for the bc10Year field and simply writes to the context key value pair, where the key is the year and the value is the double. The implementation here doesn't rely on the value of the pKey parameter passed, which can be used as the key instead of hardcoding the _id value in the implementation. This value is basically the same field that would be set using the mongo.input.key property in XML or using the MongoConfig.setInputKey method. If none is set, _id is anyways the default value.

Let's look at the reducer implementation (with the logging statements removed):

public class TreasuryYieldReducer

    extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {

 

    @Override

    public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)

        throws IOException, InterruptedException {

        int count = 0;

        double sum = 0;

        for (final DoubleWritable value : pValues) {

            sum += value.get();

            count++;

        }

        final double avg = sum / count;

        BasicBSONObject output = new BasicBSONObject();

        output.put("count", count);

        output.put("avg", avg);

        output.put("sum", sum);

        pContext.write(pKey, new BSONWritable(output));

    }

}

This class extends from org.apache.hadoop.mapreduce.Reducer and has four generic parameters again for the input key, input value, output key, and output value. The input to the reducer is the output from the mapper, and if you notice carefully, the type of the first two generic parameters are the same as the last two generic parameters of the mapper that we saw earlier. The third and fourth parameters in this case are the type of the key and the value emitted from the reducer. The type of the value is BSONDocument, and thus we have BSONWritable as the type.

We now have the reduce method that has two parameters: the first one is the key, which is same as the key emitted from the map function, and the second parameter is java.lang.Iterable of the values emitted for the same key. This is how standard MapReduce functions work. For instance, if the map function gave the following key value pairs, (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce will be invoked with two unique keys, 1950 and 1960, and the values for the key 1950 will be an iterable with (10, 20, 30), where the value of 1960 will be an iterable of a single element, (20). The reducer's reduce function simply iterates though this iterable of doubles, finds the sum and count of these numbers, and writes one key value pair, where the key is the same as the incoming key and the output value is BasicBSONObject with the sum, count, and average in it for the computed values.

There are some good samples, including the enron dataset, in the examples of the cloned mongo-hadoop connector. If you would like to play around a bit, I would recommend that you to take a look at these example projects too and run them.

There's more…

What we saw here is a readymade sample that we executed. There is nothing like writing one MapReduce job ourselves for our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action.

See also

If you're wondering what the writable interface is all about and why you should not use plain old serialization instead, then refer to this URL, which gives the explanation by the creator of Hadoop himself:

http://www.mail-archive.com/hadoop-user@lucene.apache.org/msg00378.html

Writing our first Hadoop MapReduce job

In this recipe, we will write our first MapReduce job using the Hadoop MapReduce API and run it using the mongo-hadoop connector getting the data from MongoDB.

Getting ready

Refer to the previous recipe, Executing our first sample MapReduce job using mongo-hadoop connector, for the setting up of the mongo-hadoop connector. This is a maven project and thus maven needs to be set up and installed. This project, however, is built on Ubuntu Linux and you need to execute the following command from the operating system shell to get maven:

$ sudo apt-get install maven

How to do it…

  1. We have a Java mongo-hadoop-mapreduce-test project that can be downloaded from the Packt site. We invoked that MapReduce job using the Python and Java client on previous occasions.
  2. With the current directory at the root of the project where the pom.xml file is present, execute the following command on the command prompt:
  3. $ mvn clean package
    The JAR file, mongo-hadoop-mapreduce-test-1.0.jar, will be built and kept in the target directory.
  4. With the assumption that the CSV file is already imported to the postalCodes collection, execute the following command with the current directory still at the root of the mongo-hadoop-mapreduce-test project that we just built:
    ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop \
    
     jar target/mongo-hadoop-mapreduce-test-1.0.jar \
    
     com.packtpub.mongo.cookbook.TopStateMapReduceEntrypoint \
    
     -Dmongo.input.split_size=8 \
    
    -Dmongo.job.verbose=true \
    
    -Dmongo.input.uri=mongodb://localhost:27017/test.postalCodes \
    
    -Dmongo.output.uri=mongodb://localhost:27017/test.postalCodesHadoopmrOut
  5. Once the MapReduce job is completed, open the Mongo shell by typing the following command on the operating system command prompt and execute the following query from the shell:
    $ mongo
    
    > db.postalCodesHadoopmrOut.find().sort({count:-1}).limit(5)
  6. Compare the output to the ones that we got earlier when we executed the MapReduce jobs using Mongo's MapReduce framework.

How it works…

We have kept the classes very simple and with the bare minimum things that we needed. We just have three classes in our project, TopStateMapReduceEntrypoint, TopStateReducer, and TopStatesMapper, all in the same com.packtpub.mongo.cookbook package. The mapper's map function just writes a key value pair to the context, where the key is the name of the state and value is an integer value, 1. The following is the code snippet from the mapper function:

context.write(new Text((String)value.get("state")), new IntWritable(1));

What the reducer gets is the same key that is the list of states and an iterable of an integer value, 1. All we do is write the same name of the state and the sum of the iterables to the context. Now, as there is no size method in the iterable that can give the count in constant time, we are left with adding up all the ones that we get in the linear time. The following is the code in the reducer method:

int sum = 0;

for(IntWritable value : values) {

  sum += value.get();

}

BSONObject object = new BasicBSONObject();

object.put("count", sum);

context.write(text, new BSONWritable(object));

We will write the text string that is the key and the value that is the JSON document containing the count to the context. The mongo-hadoop connector is then responsible for writing to the output collection that we have postalCodesHadoopmrOut, the document with the _id field same as the key emitted. Thus, when we execute the following, we get the top five states with the most number of cities in our database:

> db. postalCodesHadoopmrOut.find().sort({count:-1}).limit(5)
{ "_id" : "Maharashtra", "count" : 6446 }
{ "_id" : "Kerala", "count" : 4684 }
{ "_id" : "Tamil Nadu", "count" : 3784 }
{ "_id" : "Andhra Pradesh", "count" : 3550 }
{ "_id" : "Karnataka", "count" : 3204 }

Finally, the main method of the main entry point class is as follows:

Configuration conf = new Configuration();

MongoConfig config = new MongoConfig(conf);

config.setInputFormat(MongoInputFormat.class);

config.setMapperOutputKey(Text.class);

config.setMapperOutputValue(IntWritable.class);

config.setMapper(TopStatesMapper.class);

config.setOutputFormat(MongoOutputFormat.class);

config.setOutputKey(Text.class);

config.setOutputValue(BSONWritable.class);

config.setReducer(TopStateReducer.class);

ToolRunner.run(conf, new TopStateMapReduceEntrypoint(), args);

All we do is wrap the org.apache.hadoop.conf.Configuration object with the com.mongodb.hadoop.MongoConfig instance to set the various properties and then submit the MapReduce job for the execution using ToolRunner.

See also

We executed a simple MapReduce job on Hadoop using the Hadoop API and sourcing the data from MongoDB and writing the data to the MongoDB collection in the recipe. What if we want to write the map and reduce the functions in a different language? Fortunately, this is possible by using a concept called Hadoop streaming, where stdout is used as a means to communicate between the program and the Hadoop MapReduce framework.

Summary

In this article, you learned about executing our first sample MapReduce job using the mongo-Hadoop connector and writing our first Hadoop MapReduce job. You can also refer to the following books related to MongoDB that are available on our website:

Resources for Article:


Further resources on this subject:


You've been reading an excerpt of:

MongoDB Cookbook - Second Edition

Explore Title