Understanding MapReduce

(For more resources related to this topic, see here.)

Key/value pairs

Here we will explain why some operations process and provide the output in terms of key/value pair.

What it mean

Firstly, we will clarify just what we mean by key/value pairs by highlighting similar concepts in the Java standard library. The java.util.Map interface is the parent of commonly used classes such as HashMap and (through some library backward reengineering) even the original Hashtable.

For any Java Map object, its contents are a set of mappings from a given key of a specified type to a related value of a potentially different type. A HashMap object could, for example, contain mappings from a person's name (String) to his or her birthday (Date).

In the context of Hadoop, we are referring to data that also comprises keys that relate to associated values. This data is stored in such a way that the various values in the data set can be sorted and rearranged across a set of keys. If we are using key/value data, it will make sense to ask questions such as the following:

  • Does a given key have a mapping in the data set?
  • What are the values associated with a given key?
  • What is the complete set of keys?

We will go into Wordcount in detail shortly, but the output of the program is clearly a set of key/value relationships; for each word (the key), there is a count (the value) of its number of occurrences. Think about this simple example and some important features of key/value data will become apparent, as follows:

  • Keys must be unique but values need not be
  • Each value must be associated with a key, but a key could have no values (though not in this particular example)
  • Careful definition of the key is important; deciding on whether or not the counts are applied with case sensitivity will give different results

Note that we need to define carefully what we mean by keys being unique here. This does not mean the key occurs only once; in our data set we may see a key occur numerous times and, as we shall see, the MapReduce model has a stage where all values associated with each key are collected together. The uniqueness of keys guarantees that if we collect together every value seen for any given key, the result will be an association from a single instance of the key to every value mapped in such a way, and none will be omitted.

Why key/value data?

Using key/value data as the foundation of MapReduce operations allows for a powerful programming model that is surprisingly widely applicable, as can be seen by the adoption of Hadoop and MapReduce across a wide variety of industries and problem scenarios. Much data is either intrinsically key/value in nature or can be represented in such a way. It is a simple model with broad applicability and semantics straightforward enough that programs defined in terms of it can be applied by a framework like Hadoop.

Of course, the data model itself is not the only thing that makes Hadoop useful; its real power lies in how it uses the techniques of parallel execution, and divide and conquer. We can have a large number of hosts on which we can store and execute data and even use a framework that manages the division of the larger task into smaller chunks, and the combination of partial results into the overall answer. But we need this framework to provide us with a way of expressing our problems that doesn't require us to be an expert in the execution mechanics; we want to express the transformations required on our data and then let the framework do the rest. MapReduce, with its key/value interface, provides such a level of abstraction, whereby the programmer only has to specify these transformations and Hadoop handles the complex process of applying this to arbitrarily large data sets.

Some real-world examples

To become less abstract, let's think of some real-world data that is key/value pair:

  • An address book relates a name (key) to contact information (value)
  • A bank account uses an account number (key) to associate with the account details (value)
  • The index of a book relates a word (key) to the pages on which it occurs (value)
  • On a computer filesystem, filenames (keys) allow access to any sort of data, such as text, images, and sound (values)

These examples are intentionally broad in scope, to help and encourage you to think that key/value data is not some very constrained model used only in high-end data mining but a very common model that is all around us.

We would not be having this discussion if this was not important to Hadoop. The bottom line is that if the data can be expressed as key/value pairs, it can be processed by MapReduce.

MapReduce as a series of key/value transformations

You may have come across MapReduce described in terms of key/value transformations, in particular the intimidating one looking like this:

{K1,V1} -> {K2, List<V2>} -> {K3,V3}

We are now in a position to understand what this means:

  • The input to the map method of a MapReduce job is a series of key/value pairs that we'll call K1 and V1.
  • The output of the map method (and hence input to the reduce method) is a series of keys and an associated list of values that are called K2 and V2. Note that each mapper simply outputs a series of individual key/value outputs; these are combined into a key and list of values in the shuffle method.
  • The final output of the MapReduce job is another series of key/value pairs, called K3 and V3

These sets of key/value pairs don't have to be different; it would be quite possible to input, say, names and contact details and output the same, with perhaps some intermediary format used in collating the information. Keep this three-stage model in mind as we explore the Java API for MapReduce next. We will first walk through the main parts of the API you will need and then do a systematic examination of the execution of a MapReduce job.

The Hadoop Java API for MapReduce

Hadoop underwent a major API change in its 0.20 release, which is the primary interface in the 1.0 version. Though the prior API was certainly functional, the community felt it was unwieldy and unnecessarily complex in some regards.

The new API, sometimes generally referred to as context objects, for reasons we'll see later, is the future of Java's MapReduce development. Note that caveat: there are parts of the pre-0.20 MapReduce libraries that have not been ported to the new API, so we will use the old interfaces when we need to examine any of these.

The 0.20 MapReduce Java API

The 0.20 and above versions of MapReduce API have most of the key classes and interfaces either in the org.apache.hadoop.mapreduce package or its subpackages.

In most cases, the implementation of a MapReduce job will provide job-specific subclasses of the Mapper and Reducer base classes found in this package.

We'll stick to the commonly used K1/K2/K3/ and so on terminology, though more recently the Hadoop API has, in places, used terms such as KEYIN/VALUEIN and KEYOUT/VALUEOUT instead. For now, we will stick with K1/K2/K3 as it helps understand the end-to-end data flow.

The Mapper class

This is a cut-down view of the base Mapper class provided by Hadoop. For our own mapper implementations, we will subclass this base class and override the specified method as follows:

class Mapper<K1, V1, K2, V2> { void map(K1 key, V1 value Mapper.Context context) throws IOException, InterruptedException {..} }

Although the use of Java generics can make this look a little opaque at first, there is actually not that much going on. The class is defined in terms of the key/value input and output types, and then the map method takes an input key/value pair in its parameters. The other parameter is an instance of the Context class that provides various mechanisms to communicate with the Hadoop framework, one of which is to output the results of a map or reduce method.

Notice that the map method only refers to a single instance of K1 and V1 key/ value pairs. This is a critical aspect of the MapReduce paradigm in which you write classes that process single records and the framework is responsible for all the work required to turn an enormous data set into a stream of key/ value pairs. You will never have to write map or reduce classes that try to deal with the full data set. Hadoop also provides mechanisms through its InputFormat and OutputFormat classes that provide implementations of common file formats and likewise remove the need of having to write file parsers for any but custom file types.

There are three additional methods that sometimes may be required to be overridden.

protected void setup( Mapper.Context context) throws IOException, Interrupted Exception

This method is called once before any key/value pairs are presented to the map method. The default implementation does nothing.

protected void cleanup( Mapper.Context context) throws IOException, Interrupted Exception

This method is called once after all key/value pairs have been presented to the map method. The default implementation does nothing.

protected void run( Mapper.Context context) throws IOException, Interrupted Exception

This method controls the overall flow of task processing within a JVM. The default implementation calls the setup method once before repeatedly calling the map method for each key/value pair in the split, and then finally calls the cleanup method .

The Reducer class

The Reducer base class works very similarly to the Mapper class, and usually requires only subclasses to override a single reduce method . Here is the cut-down class definition:

public class Reducer<K2, V2, K3, V3> { void reduce(K1 key, Iterable<V2> values, Reducer.Context context) throws IOException, InterruptedException {..} }

Again, notice the class definition in terms of the broader data flow (the reduce method accepts K2/V2 as input and provides K3/V3 as output) while the actual reduce method takes only a single key and its associated list of values. The Context object is again the mechanism to output the result of the method.

This class also has the setup, run, and cleanup methods with similar default implementations as with the Mapper class that can optionally be overridden:

protected void setup( Reduce.Context context) throws IOException, InterruptedException

This method is called once before any key/lists of values are presented to the reduce method. The default implementation does nothing.

protected void cleanup( Reducer.Context context) throws IOException, InterruptedException

This method is called once after all key/lists of values have been presented to the reduce method. The default implementation does nothing.

protected void run( Reducer.Context context) throws IOException, InterruptedException

This method controls the overall flow of processing the task within JVM. The default implementation calls the setup method before repeatedly calling the reduce method for as many key/values provided to the Reducer class, and then finally calls the cleanup method.

The Driver class

Although our mapper and reducer implementations are all we need to perform the MapReduce job, there is one more piece of code required: the driver that communicates with the Hadoop framework and specifies the configuration elements needed to run a MapReduce job. This involves aspects such as telling Hadoop which Mapper and Reducer classes to use, where to find the input data and in what format, and where to place the output data and how to format it.

There is no default parent Driver class as a subclass; the driver logic usually exists in the main method of the class written to encapsulate a MapReduce job. Take a look at the following code snippet as an example driver. Don't worry about how each line works, though you should be able to work out generally what each is doing:

public class ExampleDriver { ... public static void main(String[] args) throws Exception { // Create a Configuration object that is used to set other options Configuration conf = new Configuration() ; // Create the object representing the job Job job = new Job(conf, "ExampleJob") ; // Set the name of the main class in the job jarfile job.setJarByClass(ExampleDriver.class) ; // Set the mapper class job.setMapperClass(ExampleMapper.class) ; // Set the reducer class job.setReducerClass(ExampleReducer.class) ; // Set the types for the final output key and value job.setOutputKeyClass(Text.class) ; job.setOutputValueClass(IntWritable.class) ; // Set input and output file paths FileInputFormat.addInputPath(job, new Path(args[0])) ; FileOutputFormat.setOutputPath(job, new Path(args[1])) // Execute the job and wait for it to complete System.exit(job.waitForCompletion(true) ? 0 : 1); } }}

Given our previous talk of jobs, it is not surprising that much of the setup involves operations on a Job object. This includes setting the job name and specifying which classes are to be used for the mapper and reducer implementations.

Certain input/output configurations are set and, finally, the arguments passed to the main method are used to specify the input and output locations for the job. This is a very common model that you will see often.

There are a number of default values for configuration options, and we are implicitly using some of them in the preceding class. Most notably, we don't say anything about the file format of the input files or how the output files are to be written. These are defined through the InputFormat and OutputFormat classes mentioned earlier; we will explore them in detail later. The default input and output formats are text files that suit our WordCount example. There are multiple ways of expressing the format within text files in addition to particularly optimized binary formats.

A common model for less complex MapReduce jobs is to have the Mapper and Reducer classes as inner classes within the driver. This allows everything to be kept in a single file, which simplifies the code distribution.

Writing MapReduce programs

We have been using and talking about WordCount for quite some time now; let's actually write an implementation, compile, and run it, and then explore some modifications.

Time for action – setting up the classpath

To compile any Hadoop-related code, we will need to refer to the standard Hadoop-bundled classes.

Add the Hadoop-1.0.4.core.jar file from the distribution to the Java classpath as follows:

$ export CLASSPATH=.:${HADOOP_HOME}/Hadoop-1.0.4.core.jar:${CLASSPATH}

What just happened?

This adds the Hadoop-1.0.4.core.jar file explicitly to the classpath alongside the current directory and the previous contents of the CLASSPATH environment variable.

Once again, it would be good to put this in your shell startup file or a standalone file to be sourced.

We will later need to also have many of the supplied third-party libraries that come with Hadoop on our classpath, and there is a shortcut to do this. For now, the explicit addition of the core JAR file will suffice.

Time for action – implementing WordCount

We will explore our own Java implementation by performing the following steps:

  1. Enter the following code into the WordCount1.java file:

    Import java.io.* ; import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount1 { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] words = value.toString().split(" ") ; for (String str: words) { word.set(str); context.write(word, one); } } } public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int total = 0; for (IntWritable val : values) { total++ ; } context.write(key, new IntWritable(total)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount1.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

  2. Now compile it by executing the following command:

    $ javac WordCount1.java

What just happened?

This is our first complete MapReduce job. Look at the structure and you should recognize the elements we have previously discussed: the overall Job class with the driver configuration in its main method and the Mapper and Reducer implementations defined as inner classes.

We'll do a more detailed walkthrough of the mechanics of MapReduce in the next section, but for now let's look at the preceding code and think of how it realizes the key/value transformations we talked about earlier.

The input to the Mapper class is arguably the hardest to understand, as the key is not actually used. The job specifies TextInputFormat as the format of the input data and, by default, this delivers to the mapper data where the key is the line number in the file and the value is the text of that line. In reality, you may never actually see a mapper that uses that line number key, but it is provided.

The mapper is executed once for each line of text in the input source and every time it takes the line and breaks it into words. It then uses the Context object to output (more commonly known as emitting) each new key/value of the form <word, 1 >. These are our K2/V2 values.

We said before that the input to the reducer is a key and a corresponding list of values, and there is some magic that happens between the map and reduce methods to collect together the values for each key that facilitates this, which we'll not describe right now. Hadoop executes the reducer once for each key and the preceding reducer implementation simply counts the numbers in the Iterable object and gives output for each word in the form of <word, count>. This is our K3/V3 values.

Take a look at the signatures of our mapper and reducer classes: the WordCountMapper class gives IntWritable and Text as input and gives Text and IntWritable as output. The WordCountReducer class gives Text and IntWritableboth as input and output. This is again quite a common pattern, where the map method performs an inversion on the key and values, and instead emits a series of data pairs on which the reducer performs aggregation.

The driver is more meaningful here, as we have real values for the parameters. We use arguments passed to the class to specify the input and output locations.

Time for action – building a JAR file

Before we run our job in Hadoop, we must collect the required class files into a single JAR file that we will submit to the system.

Create a JAR file from the generated class files.

$ jar cvf wc1.jar WordCount1*class

What just happened?

We must always package our class files into a JAR file before submitting to Hadoop, be it local or on Elastic MapReduce.

Be careful with the JAR command and file paths. If you include in a JAR file class the files from a subdirectory, the class may not be stored with the path you expect. This is especially common when using a catch-all classes directory where all source data gets compiled. It may be useful to write a script to change into the directory, convert the required files into JAR files, and move the JAR files to the required location.

Time for action – running WordCount on a local Hadoop cluster

Now we have generated the class files and collected them into a JAR file, we can run the application by performing the following steps:

  1. Submit the new JAR file to Hadoop for execution.

    $ hadoop jar wc1.jar WordCount1 test.txt output

  2. Check the output file; it should be as follows:

    $ Hadoop fs –cat output/part-r-00000 This 1 yes 1 a 1 is 2 test 1 this 1

What just happened?

This is the first time we have used the Hadoop JAR command with our own code. There are four arguments:

  1. The name of the JAR file.
  2. The name of the driver class within the JAR file.
  3. The location, on HDFS, of the input file (a relative reference to the /user/Hadoop home folder, in this case).
  4. The desired location of the output folder (again, a relative path).

The name of the driver class is only required if a main class has not (as in this case) been specified within the JAR file manifest.

Time for action – running WordCount on EMR

We will now show you how to run this same JAR file on EMR. Remember, as always, that this costs money!

  1. Go to the AWS console at o http://aws.amazon.com/console/, sign in, and select S3.
  2. You'll need two buckets: one to hold the JAR file and another for the job output. You can use existing buckets or create new ones.
  3. Open the bucket where you will store the job file, click on Upload, and add the wc1.jar file created earlier.
  4. Return to the main console home page, and then go to the EMR portion of the console by selecting Elastic MapReduce.
  5. Click on the Create a New Job Flow button and you'll see a familiar screen as shown in the following screenshot:

  6. Previously, we used a sample application; to run our code, we need to perform different steps. Firstly, select the Run your own application radio button.
  7. In the Select a Job Type combobox, select Custom JAR .
  8. Click on the Continue button and you'll see a new form, as shown in the following screenshot:

We now specify the arguments to the job. Within our uploaded JAR file, our code— particularly the driver class—specifies aspects such as the Mapper and Reducer classes.

What we need to provide is the path to the JAR file and the input and output paths for the job. In the JAR Location field, put the location where you uploaded the JAR file. If the JAR file is called wc1.jar and you uploaded it into a bucket called mybucket, the path would be mybucket/wc1.jar.

In the JAR Arguments field, you need to enter the name of the main class and the input and output locations for the job. For files on S3 , we can use URLs of the form s3://bucketname/objectname. Click on Continue and the familiar screen to specify the virtual machines for the job flow appears, as shown in the following screenshot:

Now continue through the job flow setup and execution.

What just happened?

The important lesson here is that we can reuse the code written on and for a local Hadoop cluster in EMR. Also, besides these first few steps, the majority of the EMR console is the same regardless of the source of the job code to be executed.

Through the remainder of this article, we will not explicitly show code being executed on EMR and will instead focus more on the local cluster, because running a JAR file on EMR is very easy.

The pre-0.20 Java MapReduce API

We'll take a quick look at the older APIs for two reasons:

  1. Many online examples and other reference materials are written for the older APIs.
  2. Several areas within the MapReduce framework are not yet ported to the new API, and we will need to use the older APIs to explore them.

The older API's classes are found primarily in the org.apache.hadoop.mapred package.

The new API classes use concrete Mapper and Reducer classes, while the older API had this responsibility split across abstract classes and interfaces.

An implementation of a Mapper class will subclass the abstract MapReduceBase class and implement the Mapper interface, while a custom Reducer class will subclass the same MapReduceBase abstract class but implement the Reducer interface.

We'll not explore MapReduceBase in much detail as its functionality deals with job setup and configuration, which aren't really core to understanding the MapReduce model. But the interfaces of pre-0.20 Mapper and Reducer are worth showing:

public interface Mapper <K1, V1, K2, V2>. { void map( K1 key, V1 value, OutputCollector< K2, V2> output, Reporter reporter) throws IOException ; } public interface Reducer<K2, V2, K3, V3> { void reduce( K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException ; }

There are a few points to understand here:

  • The generic parameters to the OutputCollector class show more explicitly how the result of the methods is presented as output.
  • The old API used the OutputCollector class for this purpose, and the Reporter class to write status and metrics information to the Hadoop framework. The 0.20 API combines these responsibilities in the Context class.
  • The Reducer interface uses an Iterator object instead of an Iterable object; this was changed as the latter works with the Java for each syntax and makes for cleaner code.
  • Neither the map nor the reduce method could throw InterruptedExceptionin the old API.

As you can see, the changes between the APIs alter how MapReduce programs are written but don't change the purpose or responsibilities of mappers or reducers. Don't feel obliged to become an expert in both APIs unless you need to.

Hadoop-provided mapper and reducer implementations

We don't always have to write our own Mapper and Reducer classes from scratch. Hadoop provides several common Mapper and Reducer implementations that can be used in our jobs. If we don't override any of the methods in the Mapper and Reducer classes in the new API, the default implementations are the identity Mapper and Reducer classes, which simply output the input unchanged.

Note that more such prewritten Mapper and Reducer implementations may be added over time, and currently the new API does not have as many as the older one.

The mappers are found at org.apache.hadoop.mapreduce.lib.mapper , and include the following:

  • InverseMapper: This outputs (value, key)
  • TokenCounterMapper: This counts the number of discrete tokens in each line of input

The reducers are found at org.apache.hadoop.mapreduce.lib.reduce, and currently include the following:

  • IntSumReducer: This outputs the sum of the list of integer values per key
  • LongSumReducer: This outputs the sum of the list of long values per key

Time for action – WordCount the easy way

Let's revisit WordCount, but this time use some of these predefined map and reduce implementations:

  1. Create a new WordCountPredefined.java file containing the following code:

    import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper ; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer ; public class WordCountPredefined { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count1"); job.setJarByClass(WordCountPredefined.class); job.setMapperClass(TokenCounterMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

  2. Now compile, create the JAR file, and run it as before.
  3. Don't forget to delete the output directory before running the job, if you want to use the same location. Use the hadoop fs -rmr output, for example.

What just happened?

Given the ubiquity of WordCount as an example in the MapReduce world, it's perhaps not entirely surprising that there are predefined Mapper and Reducer implementations that together realize the entire WordCount solution. The TokenCounterMapper class simply breaks each input line into a series of (token, 1) pairs and the IntSumReducer class provides a final count by summing the number of values for each key.

There are two important things to appreciate here:

  • Though WordCount was doubtless an inspiration for these implementations, they are in no way specific to it and can be widely applicable
  • This model of having reusable mapper and reducer implementations is one thing to remember, especially in combination with the fact that often the best starting point for a new MapReduce job implementation is an existing one

Walking through a run of WordCount

To explore the relationship between mapper and reducer in more detail, and to expose some of Hadoop's inner working, we'll now go through just how WordCount (or indeed any MapReduce job) is executed.


The call to Job.waitForCompletion() in the driver is where all the action starts. The driver is the only piece of code that runs on our local machine, and this call starts the communication with the JobTracker. Remember that the JobTracker is responsible for all aspects of job scheduling and execution, so it becomes our primary interface when performing any task related to job management. The JobTracker communicates with the NameNode on our behalf and manages all interactions relating to the data stored on HDFS.

Splitting the input

The first of these interactions happens when the JobTracker looks at the input data and determines how to assign it to map tasks. Recall that HDFS files are usually split into blocks of at least 64 MB and the JobTracker will assign each block to one map task.

Our WordCount example, of course, used a trivial amount of data that was well within a single block. Picture a much larger input file measured in terabytes, and the split model makes more sense. Each segment of the file—or split , in MapReduce terminology—is processed uniquely by one map task.

Once it has computed the splits, the JobTracker places them and the JAR file containing the Mapper and Reducer classes into a job-specific directory on HDFS, whose path will be passed to each task as it starts.

Task assignment

Once the JobTracker has determined how many map tasks will be needed, it looks at the number of hosts in the cluster, how many TaskTrackers are working, and how many map tasks each can concurrently execute (a user-definable configuration variable). The JobTracker also looks to see where the various input data blocks are located across the cluster and attempts to define an execution plan that maximizes the cases when a TaskTracker processes a split/block located on the same physical host, or, failing that, it processes at least one in the same hardware rack.

This data locality optimization is a huge reason behind Hadoop's ability to efficiently process such large datasets. Recall also that, by default, each block is replicated across three different hosts, so the likelihood of producing a task/host plan that sees most blocks processed locally is higher than it may seem at first.

Task startup

Each TaskTracker then starts up a separate Java virtual machine to execute the tasks. This does add a startup time penalty, but it isolates the TaskTracker from problems caused by misbehaving map or reduce tasks, and it can be configured to be shared between subsequently executed tasks.

If the cluster has enough capacity to execute all the map tasks at once, they will all be started and given a reference to the split they are to process and the job JAR file. Each TaskTracker then copies the split to the local filesystem.

If there are more tasks than the cluster capacity, the JobTracker will keep a queue of pending tasks and assign them to nodes as they complete their initially assigned map tasks.

We are now ready to see the executed data of map tasks. If this all sounds like a lot of work, it is; and it explains why when running any MapReduce job, there is always a non-trivial amount of time taken as the system gets started and performs all these steps.

Ongoing JobTracker monitoring

The JobTracker doesn't just stop work now and wait for the TaskTrackers to execute all the mappers and reducers. It is constantly exchanging heartbeat and status messages with the TaskTrackers, looking for evidence of progress or problems. It also collects metrics from the tasks throughout the job execution, some provided by Hadoop and others specified by the developer of the map and reduce tasks, though we don't use any in this example.

Mapper input

For the rest of this walkthrough, let's assume the input was a not-much-less trivial two-line text file:

This is a test Yes this is

The driver class specifies the format and structure of the input file by using TextInputFormat, and from this Hadoop knows to treat this as text with the line number as the key and line contents as the value. The two invocations of the mapper will therefore be given the following input:

1 This is a test 2 Yes it is.

Mapper execution

The key/value pairs received by the mapper are the offset in the file of the line and the line contents respectively because of how the job is configured. Our implementation of the map method in WordCountMapper discards the key as we do not care where each line occurred in the file and splits the provided value into words using the split method on the standard Java String class. Note that better tokenization could be provided by use of regular expressions or the StringTokenizer class, but for our purposes this simple approach will suffice.

For each individual word, the mapper then emits a key comprised of the actual word itself, and a value of 1.

We add a few optimizations that we'll mention here, but don't worry too much about them at this point. You will see that we don't create the IntWritable object containing the value 1 each time, instead we create it as a static variable and re-use it in each invocation. Similarly, we use a single Text object and reset its contents for each execution of the method. The reason for this is that though it doesn't help much for our tiny input file, the processing of a huge data set would see the mapper potentially called thousands or millions of times. If each invocation potentially created a new object for both the key and value output, this would become a resource issue and likely cause much more frequent pauses due to garbage collection. We use this single value and know the Context.write method will not alter it.

Mapper output and reduce input

The output of the mapper is a series of pairs of the form (word, 1); in our example these will be:

(This,1), (is, 1), (a, 1), (test., 1), (Yes, 1), (it, 1), (is, 1)

These output pairs from the mapper are not passed directly to the reducer. Between mapping and reducing is the shuffle stage where much of the magic of MapReduce occurs.


One of the implicit guarantees of the Reduce interface is that a single reducer will be given all the values associated with a given key. With multiple reduce tasks running across a cluster, each mapper output must therefore be partitioned into the separate outputs destined for each reducer. These partitioned files are stored on the local node filesystem.

The number of reduce tasks across the cluster is not as dynamic as that of mappers, and indeed we can specify the value as part of our job submission. Each TaskTracker therefore knows how many reducers are in the cluster and from this how many partitions the mapper output should be split into.

At this point an obvious question is what happens to this calculation if a reducer fails. The answer is that the JobTracker will ensure that any failed reduce tasks are reexecuted, potentially on a different node so a transient failure will not be an issue. A more serious issue, such as that caused by a data-sensitive bug or very corrupt data in a split will, unless certain steps are taken, cause the whole job to fail.

The optional partition function

Within the org.apache.hadoop.mapreduce package is the Partitioner class, an abstract class with the following signature:

public abstract class Partitioner <Key, Value> { public abstract int getPartition( Key key, Value value, int numPartitions) ; }

By default, Hadoop will use a strategy that hashes the output key to perform the partitioning. This functionality is provided by the HashPartitioner class within the org.apache.hadoop.mapreduce.lib.partition package, but it is necessary in some cases to provide a custom subclass of Partitioner with application-specific partitioning logic. This would be particularly true if, for example, the data provided a very uneven distribution when the standard hash function was applied.

Reducer input

The reducer TaskTracker receives updates from the JobTracker that tell it which nodes in the cluster hold map output partitions which need to be processed by its local reduce task. It then retrieves these from the various nodes and merges them into a single file that will be fed to the reduce task.

Reducer execution

Our WordCountReducer class is very simple; for each word it simply counts the number of elements in the array and emits the final (Word, count) output for each word.

We don't worry about any sort of optimization to avoid excess object creation here. The number of reduce invocations is typically smaller than the number of mappers, and consequently the overhead is less of a concern. However, feel free to do so if you find yourself with very tight performance requirements.

For our invocation of WordCount on our sample input, all but one word have only one value in the list of values; is has two.

Note that the word this and This had discrete counts because we did not attempt to ignore case sensitivity. Similarly, ending each sentence with a period would have stopped is having a count of two as is would be different from is.. Always be careful when working with textual data such as capitalization, punctuation, hyphenation, pagination, and other aspects, as they can skew how the data is perceived. In such cases, it's common to have a precursor MapReduce job that applies a normalization or clean-up strategy to the data set.

Reducer output

The final set of reducer output for our example is therefore:

(This, 1), (is, 2), (a, 1), (test, 1), (Yes, 1), (this, 1)

This data will be output to partition files within the output directory specified in the driver that will be formatted using the specified OutputFormat implementation. Each reduce task writes to a single file with the filename part-r-nnnnn, where nnnnn starts at 00000 and is incremented.


Once all tasks have completed successfully, the JobTracker outputs the final state of the job to the client, along with the final aggregates of some of the more important counters that it has been aggregating along the way. The full job and task history is available in the log directory on each node or, more accessibly, via the JobTracker web UI; point your browser to port 50030 on the JobTracker node.

That's all there is to it!

As you've seen, each MapReduce program sits atop a significant amount of machinery provided by Hadoop and the sketch provided is in many ways a simplification. As before, much of this isn't hugely valuable for such a small example, but never forget that we can use the same software and mapper/reducer implementations to do a WordCount on a much larger data set across a huge cluster, be it local or on EMR. The work that Hadoop does for you at that point is enormous and is what makes it possible to perform data analysis on such datasets; otherwise, the effort to manually implement the distribution, synchronization, and parallelization of code will be immense.

Apart from the combiner…maybe

There is one additional, and optional, step that we omitted previously. Hadoop allows the use of a combiner class to perform some early sorting of the output from the map method before it is retrieved by the reducer.

Why have a combiner?

Much of Hadoop's design is predicated on reducing the expensive parts of a job that usually equate to disk and network I/O. The output of the mapper is often large; it's not infrequent to see it many times the size of the original input. Hadoop does allow configuration options to help reduce the impact of the reducers transferring such large chunks of data across the network. The combiner takes a different approach, where it is possible to perform early aggregation to require less data to be transferred in the first place.

The combiner does not have its own interface; a combiner must have the same signature as the reducer and hence also subclasses the Reduce class from the org.apache.hadoop. mapreduce package. The effect of this is to basically perform a mini-reduce on the mapper for the output destined for each reducer.

Hadoop does not guarantee whether the combiner will be executed. At times, it may not be executed at all, while at times it may be used once, twice, or more times depending on the size and number of output files generated by the mapper for each reducer.

Time for action – WordCount with a combiner

Let's add a combiner to our first WordCount example. In fact, let's use our reducer as the combiner. Since the combiner must have the same interface as the reducer, this is something you'll often see, though note that the type of processing involved in the reducer will determine if it is a true candidate for a combiner; we'll discuss this later. Since we are looking to count word occurrences, we can do a partial count on the map node and pass these subtotals to the reducer.

  1. Copy WordCount1.java to WordCount2.java and change the driver class to add the following line between the definition of the Mapper and Reducer classes:


  2. Also change the class name to WordCount2 and then compile it.

    $ javac WordCount2.java

  3. Create the JAR file.

    $ jar cvf wc2.jar WordCount2*class

  4. Run the job on Hadoop.

    $ hadoop jar wc2.jar WordCount2 test.txt output

  5. Examine the output.

    $ hadoop fs -cat output/part-r-00000

What just happened?

This output may not be what you expected, as the value for the word is is now incorrectly specified as 1 instead of 2.

The problem lies in how the combiner and reducer will interact. The value provided to the reducer, which was previously (is, 1, 1), is now (is, 2) because our combiner did its own summation of the number of elements for each word. However, our reducer does not look at the actual values in the Iterable object, it simply counts how many are there.

When you can use the reducer as the combiner

You need to be careful when writing a combiner. Remember that Hadoop makes no guarantees on how many times it may be applied to map output, it may be 0, 1, or more. It is therefore critical that the operation performed by the combiner can effectively be applied in such a way. Distributive operations such as summation, addition, and similar are usually safe, but, as shown previously, ensure the reduce logic isn't making implicit assumptions that might break this property.

Time for action – fixing WordCount to work with a combiner

Let's make the necessary modifications to WordCount to correctly use a combiner.

Copy WordCount2.java to a new file called WordCount3.java and change the reduce method as follows:

public void reduce(Text key, Iterable <IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0 ; for (IntWritable val : values)) { total+= val.get() ; } context.write(key, new IntWritable(total)); }

Remember to also change the class name to WordCount3 and then compile, create the JAR file, and run the job as before.

What just happened?

The output is now as expected. Any map-side invocations of the combiner performs successfully and the reducer correctly produces the overall output value.

Would this have worked if the original reducer was used as the combiner and the new reduce implementation as the reducer? The answer is no, though our test example would not have demonstrated it. Because the combiner may be invoked multiple times on the map output data, the same errors would arise in the map output if the dataset was large enough, but didn't occur here due to the small input size. Fundamentally, the original reducer was incorrect, but this wasn't immediately obvious; watch out for such subtle logic flaws. This sort of issue can be really hard to debug as the code will reliably work on a development box with a subset of the data set and fail on the much larger operational cluster. Carefully craft your combiner classes and never rely on testing that only processes a small sample of the data.

Reuse is your friend

In the previous section we took the existing job class file and made changes to it. This is a small example of a very common Hadoop development workflow; use an existing job file as the starting point for a new one. Even if the actual mapper and reducer logic is very different, it's often a timesaver to take an existing working job as this helps you remember all the required elements of the mapper, reducer, and driver implementations.

Hadoop-specific data types

Up to this point we've glossed over the actual data types used as the input and output of the map and reduce classes. Let's take a look at them now.

The Writable and WritableComparable interfaces

If you browse the Hadoop API for the org.apache.hadoop.io package, you'll see some familiar classes such as Text and IntWritable along with others with the Writable suffix.

This package also contains the Writable interface specified as follows:

import java.io.DataInput ;
import java.io.DataOutput ;
import java.io.IOException ;
public interface Writable
void write(DataOutput out) throws IOException ;
void readFields(DataInput in) throws IOException ;

The main purpose of this interface is to provide mechanisms for the serialization and deserialization of data as it is passed across the network or read and written from the disk. Every data type to be used as a value input or output from a mapper or reducer (that is, V1, V2, or V3) must implement this interface.

Data to be used as keys (K1, K2, K3) has a stricter requirement: in addition to Writable, it must also provide an implementation of the standard Java Comparable interface. This has the following specifications:

public interface Comparable
public int compareTO( Object obj) ;

The compare method returns -1, 0, or 1 depending on whether the compared object is less than, equal to, or greater than the current object.

As a convenience interface, Hadoop provides the WritableComparable interface in the org.apache.hadoop.io package.

public interface WritableComparable extends Writable, Comparable

Introducing the wrapper classes

Fortunately, you don't have to start from scratch; as you've already seen, Hadoop provides classes that wrap the Java primitive types and implement WritableComparable. They are provided in the org.apache.hadoop.io package.

Primitive wrapper classes

These classes are conceptually similar to the primitive wrapper classes, such as Integer and Long found in java.lang. They hold a single primitive value that can be set either at construction or via a setter method.

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • VIntWritable – a variable length integer type
  • VLongWritable – a variable length long type

Array wrapper classes

These classes provide writable wrappers for arrays of other Writable objects. For example, an instance of either could hold an array of IntWritable or DoubleWritable but not arrays of the raw int or float types. A specific subclass for the required Writable class will be required. They are as follows:

  • ArrayWritable
  • TwoDArrayWritable

Map wrapper classes

These classes allow implementations of the java.util.Map interface to be used as keys or values. Note that they are defined as Map <Writable, Writable > and effectively manage a degree of internal-runtime-type checking. This does mean that compile type checking is weakened, so be careful.

  • AbstractMapWritable: This is a base class for other concrete Writablemap implementations
  • MapWritable: This is a general purpose map mapping Writable keys to Writable valuesv
  • SortedMapWritable: This is a specialization of the MapWritable class that also implements the SortedMap interface

Time for action – using the Writable wrapper classes

Let's write a class to show some of these wrapper classes in action:

  1. Create the following as WritablesTest.java:

    import org.apache.hadoop.io.* ;
    import java.util.* ;
    public class WritablesTest
    public static class IntArrayWritable extends ArrayWritable
    public IntArrayWritable()
    super(IntWritable.class) ;

    public static void main(String[] args)
    System.out.println("*** Primitive Writables ***") ;
    BooleanWritable bool1 = new BooleanWritable(true) ;
    ByteWritable byte1 = new ByteWritable( (byte)3) ;
    System.out.printf("Boolean:%s Byte:%d\n", bool1, byte1.
    get()) ;

    IntWritable i1 = new IntWritable(5) ;
    IntWritable i2 = new IntWritable( 17) ; System.
    out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ;
    i1.set(i2.get()) ;
    System.out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ;
    Integer i3 = new Integer( 23) ;
    i1.set( i3) ;
    System.out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ;

    System.out.println("*** Array Writables ***") ;
    ArrayWritable a = new ArrayWritable( IntWritable.class) ;
    a.set( new IntWritable[]{ new IntWritable(1), new
    IntWritable(3), new IntWritable(5)}) ;

    IntWritable[] values = (IntWritable[])a.get() ;

    for (IntWritable i: values)
    System.out.println(i) ;

    IntArrayWritable ia = new IntArrayWritable() ;
    ia.set( new IntWritable[]{ new IntWritable(1), new
    IntWritable(3), new IntWritable(5)}) ;

    IntWritable[] ivalues = (IntWritable[])ia.get() ;

    ia.set(new LongWritable[]{new LongWritable(1000l)}) ;

    System.out.println("*** Map Writables ***") ;
    MapWritable m = new MapWritable() ;
    IntWritable key1 = new IntWritable(5) ;
    NullWritable value1 = NullWritable.get() ;
    m.put(key1, value1) ;
    System.out.println(m.containsKey(key1)) ;
    System.out.println(m.get(key1)) ;
    m.put(new LongWritable(1000000000), key1) ;
    Set<Writable> keys = m.keySet() ;

    for(Writable w: keys)
    System.out.println(w.getClass()) ;

  2. Compile and run the class, and you should get the following output:

    *** Primitive Writables ***
    Boolean:true Byte:3
    I1:5 I2:17
    I1:17 I2:17
    I1:23 I2:17
    *** Array Writables ***
    *** Map Writables ***
    class org.apache.hadoop.io.LongWritable
    class org.apache.hadoop.io.IntWritable

What just happened?

This output should be largely self-explanatory. We create various Writable wrapper objects and show their general usage. There are several key points:

  • As mentioned, there is no type-safety beyond Writable itself. So it is possible to have an array or map that holds multiple types, as shown previously.
  • We can use autounboxing, for example, by supplying an Integer object to methods on IntWritable that expect an int variable.
  • The inner class demonstrates what is needed if an ArrayWritable class is to be used as an input to a reduce function; a subclass with such a default constructor must be defined.

Other wrapper classes

  • CompressedWritable : This is a base class to allow for large objects that should remain compressed until their attributes are explicitly accessed.
  • ObjectWritable: This is a general-purpose generic object wrapper
  • NullWritable: This is a singleton object representation of a null value
  • VersionedWritable: This is a base implementation to allow writable classes to track versions over time

Have a go hero – playing with Writables

Write a class that exercises the NullWritable and ObjectWritable classes in the same way as it does in the previous examples.

Making your own

As you have seen from the Writable and Comparable interfaces, the required methods are pretty straightforward; don't be afraid of adding this functionality if you want to use your own custom classes as keys or values within a MapReduce job.


There is one aspect of our driver classes that we have mentioned several times without getting into a detailed explanation: the format and structure of the data input into and output from MapReduce jobs.

Files, splits, and records

We have talked about files being broken into splits as part of the job startup and the data in a split being sent to the mapper implementation. However, this overlooks two aspects: how the data is stored in the file and how the individual keys and values are passed to the mapper structure.

InputFormat and RecordReader

Hadoop has the concept of an InputFormat for the first of these responsibilities. The InputFormat abstract class in the org.apache.hadoop.mapreduce package provides two methods as shown in the following code:

public abstract class InputFormat<K, V>
public abstract List<InputSplit> getSplits( JobContext context) ;
RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) ;

These methods display the two responsibilities of the InputFormat class:

  • To provide the details on how to split an input file into the splits required for map processing
  • To create a RecordReader class that will generate the series of key/value pairs from a split

The RecordReader class is also an abstract class within the org.apache.hadoop. mapreduce package:

public abstract class RecordReader<Key, Value> implements Closeable
public abstract void initialize(InputSplit split, TaskAttemptContext
context) ;
public abstract boolean nextKeyValue()
throws IOException, InterruptedException ;
public abstract Key getCurrentKey()
throws IOException, InterruptedException ;
public abstract Value getCurrentValue()
throws IOException, InterruptedException ;
public abstract float getProgress()
throws IOException, InterruptedException ;
public abstract close() throws IOException ;

A RecordReader instance is created for each split and calls getNextKeyValue to return a Boolean indicating if another key/value pair is available and if so, the getKey and getValue methods are used to access the key and value respectively.

The combination of the InputFormat and RecordReader classes therefore are all that is required to bridge between any kind of input data and the key/value pairs required by MapReduce.

Hadoop-provided InputFormat

There are some Hadoop-provided InputFormat implementations within the org.apache.hadoop.mapreduce.lib.input package:

  • FileInputFormat: This is an abstract base class that can be the parent of any file-based input
  • SequenceFileInputFormat: This is an efficient binary file format that will be discussed in an upcoming section
  • TextInputFormat: This is used for plain text files

The pre-0.20 API has additional InputFormats defined in the org.apache.hadoop.mapred package.

Note that InputFormats are not restricted to reading from files; FileInputFormat is itself a subclass of InputFormat. It is possible to have Hadoop use data that is not based on the files as the input to MapReduce jobs; common sources are relational databases or HBase.

Hadoop-provided RecordReader

Similarly, Hadoop provides a few common RecordReader implementations, which are also present within the org.apache.hadoop.mapreduce.lib.input package:

  • LineRecordReader: This implementation is the default RecordReader class for text files that present the line number as the key and the line contents as the value
  • SequenceFileRecordReader: This implementation reads the key/value from the binary SequenceFile container

Again, the pre-0.20 API has additional RecordReader classes in the org.apache.hadoop. mapred package, such as KeyValueRecordReader, that have not yet been ported to the new API.

OutputFormat and RecordWriter

There is a similar pattern for writing the output of a job coordinated by subclasses of OutputFormat and RecordWriter from the org.apache.hadoop.mapreduce package. We'll not explore these in any detail here, but the general approach is similar, though OutputFormat does have a more involved API as it has methods for tasks such as validation of the output specification.

It is this step that causes a job to fail if a specified output directory already exists. If you wanted different behavior, it would require a subclass of OutputFormat that overrides this method.

Hadoop-provided OutputFormat

The following OutputFormats are provided in the org.apache.hadoop.mapreduce. output package:

  • FileOutputFormat: This is the base class for all file-based OutputFormats
  • NullOutputFormat: This is a dummy implementation that discards the output and writes nothing to the file
  • SequenceFileOutputFormat: This writes to the binary SequenceFile format
  • TextOutputFormat: This writes a plain text file

Note that these classes define their required RecordWriter implementations as inner classes so there are no separately provided RecordWriter implementations.

Don't forget Sequence files

The SequenceFile class within the org.apache.hadoop.io package provides an efficient binary file format that is often useful as an output from a MapReduce job. This is especially true if the output from the job is processed as the input of another job. The Sequence files have several advantages, as follows:

  • As binary files, they are intrinsically more compact than text files
  • They additionally support optional compression, which can also be applied at different levels, that is, compress each record or an entire split
  • They additionally support optional compression, which can also be applied at different levels, that is, compress each record or an entire split

This last characteristic is important, as most binary formats—particularly those that are compressed or encrypted—cannot be split and must be read as a single linear stream of data. Using such files as input to a MapReduce job means that a single mapper will be used to process the entire file, causing a potentially large performance hit. In such a situation, it is preferable to either use a splitable format such as SequenceFile, or, if you cannot avoid receiving the file in the other format, do a preprocessing step that converts it into a splitable format. This will be a trade-off, as the conversion will take time; but in many cases—especially with complex map tasks—this will be outweighed by the time saved.


We have covered a lot of ground in this article and we now have the foundation to explore MapReduce in more detail. Specifically, we learned how key/value pairs is a broadly applicable data model that is well suited to MapReduce processing. We also learned how to write mapper and reducer implementations using the 0.20 and above versions of the Java API.

We then moved on and saw how a MapReduce job is processed and how the map and reduce methods are tied together by significant coordination and task-scheduling machinery. We also saw how certain MapReduce jobs require specialization in the form of a custom partitioner or combiner.

We also learned how Hadoop reads data to and from the filesystem. It uses the concept of InputFormat and OutputFormat to handle the file as a whole and RecordReader and RecordWriter to translate the format to and from key/value pairs.

Resources for Article :

Further resources on this subject:

You've been reading an excerpt of:

Hadoop Beginner's Guide

Explore Title