Advanced Hadoop MapReduce Administration

(For more resources related to this topic, see here.)

Tuning Hadoop configurations for cluster deployments

Getting ready

Shut down the Hadoop cluster if it is already running, by executing the bin/ and bin/ commands from HADOOP_HOME.

How to do it...

We can control Hadoop configurations through the following three configuration files:

  • conf/core-site.xml: This contains the configurations common to whole Hadoop distribution

  • conf/hdfs-site.xml: This contains configurations for HDFS

  • conf/mapred-site.xml: This contains configurations for MapReduce

Each configuration file has name-value pairs expressed in an XML format, and they define the workings of different aspects of Hadoop. The following code snippet shows an example of a property in the configuration file. Here, the <configuration> tag is the top-level XML container, and the <property> tags that define individual properties go as child elements of the <configuration> tag.


The following instructions show how to change the directory to which we write Hadoop logs and configure the maximum number of map and reduce tasks:

  1. Create a directory to store the logfiles. For example, /root/hadoop_logs.

  2. Uncomment the line that includes HADOOP_LOG_DIR in HADOOP_HOME/conf/ and point it to the new directory.

  3. Add the following lines to the HADOOP_HOME/conf/mapred-site.xml file:

    <value>2 </value>
    <value>2 </value>

  4. Restart the Hadoop cluster by running the bin/ and bin/ commands from the HADOOP_HOME directory.

  5. You can verify the number of processes created using OS process monitoring tools. If you are in Linux, run the watch ps –ef|grep hadoop command. If you are in Windows or MacOS use the Task Manager.

How it works...

HADOOP_LOG_DIR redefines the location to which Hadoop writes its logs. The mapred. and mapred.tasktracker.reduce.tasks. maximum properties define the maximum number of map and reduce tasks that can run within a single TaskTracker at a given moment.

These and other server-side parameters are defined in the HADOOP_HOME/conf/*-site. xml files. Hadoop reloads these configurations after a restart.

There's more...

There are many similar configuration properties defined in Hadoop. You can see some of them in the following tables.

The configuration properties for conf/core-site.xml are listed in the following table:


Default value




This is the amount of memory allocated to the in-memory filesystem that is used to merge map outputs at reducers in MBs.



This is the maximum number of streams

merged while sorting files.



This is the size of the read/write buffer used by sequence files.

The configuration properties for conf/mapred-site.xml are listed in the following table:


Default value





This is the maximum number of parallel copies the reduce step will execute to fetch output from many parallel jobs.



This is for passing Java options into the map JVM.




This is for passing Java options into the reduce JVM.



The memory limit while sorting data in MBs.

The configuration properties for conf/hdfs-site.xml are listed in the following table:


Default value




This is the HDFS block size.




This is the number of server threads to handle RPC calls in the NameNode.

Running benchmarks to verify the Hadoop installation

The Hadoop distribution comes with several benchmarks. We can use them to verify our Hadoop installation and measure Hadoop's performance. This recipe introduces these benchmarks and explains how to run them.

Getting ready

Start the Hadoop cluster. You can run these benchmarks either on a cluster setup or on a pseudo-distributed setup.

How to do it...

Let us run the sort benchmark. The sort benchmark consists of two jobs. First, we generate some random data using the randomwriter Hadoop job and then sort them using the sort sample.

  1. Change the directory to HADOOP_HOME.

  2. Run the randomwriter Hadoop job using the following command:

    >bin/hadoop jar hadoop-examples-1.0.0.jarrandomwriter
    -Dtest.randomwriter.maps_per_host=10 /data/unsorted-data

    Here the two parameters, test.randomwrite.bytes_per_map and test. randomwriter.maps_per_host specify the size of data generated by a map and the number of maps respectively.

  3. Run the sort program:

    >bin/hadoop jar hadoop-examples-1.0.0.jar sort /data/unsorted-data

  4. Verify the final results by running the following command:

    >bin/hadoop jar hadoop-test-1.0.0.jar testmapredsort -sortInput /
    data/unsorted-data -sortOutput /data/sorted-data

Finally, when everything is successful, the following message will be displayed:

The job took 66 seconds.
SUCCESS! Validated the MapReduce framework's 'sort' successfully.

How it works...

First, the randomwriter application runs a Hadoop job to generate random data that can be used by the second sort program. Then, we verify the results through testmapredsort job. If your computer has more capacity, you may run the initial randomwriter step with increased output sizes.

There's more...

Hadoop includes several other benchmarks.

  • TestDFSIO: This tests the input output (I/O) performance of HDFS

  • nnbench: This checks the NameNode hardware

  • mrbench: This runs many small jobs

  • TeraSort: This sorts a one terabyte of data

More information about these benchmarks can be found at with-terasort-testdfsio-nnbench-mrbench/.

Reusing Java VMs to improve the performance

In its default configuration, Hadoop starts a new JVM for each map or reduce task. However, running multiple tasks from the same JVM can sometimes significantly speed up the execution. This recipe explains how to control this behavior.

How to do it...

  1. Run the WordCount sample by passing the following option as an argument:

    >bin/hadoop jar hadoop-examples-1.0.0.jar wordcount –Dmapred.job.
    reuse.jvm.num.tasks=-1 /data/input1 /data/output1

  2. Monitor the number of processes created by Hadoop (through ps –ef|grephadoop command in Unix or task manager in Windows). Hadoop starts only a single JVM per task slot and then reuses it for an unlimited number of tasks in the job.

    However, passing arguments through the –D option only works if the job implements the org.apache.hadoop.util.Tools interface. Otherwise, you should set the option through the JobConf.setNumTasksToExecutePerJvm(-1) method.

How it works...

By setting the job configuration property through mapred.job.reuse.jvm.num.tasks, we can control the number of tasks for the JVM run by Hadoop. When the value is set to -1, Hadoop runs the tasks in the same JVM.

Fault tolerance and speculative execution

The primary advantage of using Hadoop is its support for fault tolerance. When you run a job, especially a large job, parts of the execution can fail due to external causes such as network failures, disk failures, and node failures.

When a job has been started, Hadoop JobTracker monitors the TaskTrackers to which it has submitted the tasks of the job. If any TaskTrackers are not responsive, Hadoop will resubmit the tasks handled by unresponsive TaskTracker to a new TaskTracker.

Generally, a Hadoop system may be compose of heterogeneous nodes, and as a result there can be very slow nodes as well as fast nodes. Potentially, a few slow nodes can slow down an execution significantly.

To avoid this, Hadoop supports speculative executions. This means if most of the map tasks have completed and Hadoop is waiting for a few more map tasks, Hadoop JobTracker will start these pending jobs also in a new node. The tracker will use the results from the first task that finishes and stop any other identical tasks.

However, the above model is feasible only if the map tasks are side-effects free. If such parallel executions are undesirable, Hadoop lets users turn off speculative executions.

How to do it...

Run the WordCount sample by passing the following option as an argument to turn off the speculative executions:

bin/hadoop jar hadoop-examples-1.0.0.jar wordcount–
speculative.execution=false –D mapred.reduce.tasks.speculative.
execution=true /data/input1 /data/output1

However, this only works if the job implements the org.apache.hadoop.util.Tools interface. Otherwise, you should set the parameter through JobConf.set(name, value).

How it works...

When the option is specified and set to false, Hadoop will turn off the speculative executions. Otherwise, it will perform speculative executions by default.

Debug scripts – analyzing task failures

A Hadoop job may consist of many map tasks and reduce tasks. Therefore, debugging a Hadoop job is often a complicated process. It is a good practice to first test a Hadoop job using unit tests by running it with a subset of the data.

However, sometimes it is necessary to debug a Hadoop job in a distributed mode. To support such cases, Hadoop provides a mechanism called debug scripts. This recipe explains how to use debug scripts.

Getting ready

Start the Hadoop cluster.

How to do it...

A debug script is a shell script, and Hadoop executes the script whenever a task encounters an error. The script will have access to the $script, $stdout, $stderr, $syslog, and $jobconf properties, as environment variables populated by Hadoop. You can find a sample script from resources/chapter3/debugscript. We can use the debug scripts to copy all the logfiles to a single location, e-mail them to a single e-mail account, or perform some analysis.

echo "Run the script" >> $LOG_FILE
echo $script >> $LOG_FILE
echo $stdout>> $LOG_FILE
echo $stderr>> $LOG_FILE
echo $syslog >> $LOG_FILE
echo $jobconf>> $LOG_FILE

  1. Write your own debug script using the above example. In the above example, edit HADOOP_HOME to point to your HADOOP_HOME directory.

    src/chapter3/ extends the WordCount sample to use debug scripts. The following listing shows the code.

    The following code uploads the job scripts to HDFS and configures the job to use these scripts. Also, it sets up the distributed cache.

    private static final String scriptFileLocation =
    public static void setupFailedTaskScript(JobConfconf)
    throws Exception {
    // create a directory on HDFS where we'll upload the fail
    FileSystemfs = FileSystem.get(conf);
    Path debugDir = new Path("/debug");
    // who knows what's already in this directory; let's just
    //clear it.
    if (fs.exists(debugDir)) {
    fs.delete(debugDir, true);
    // ...and then make sure it exists again
    // upload the local scripts into HDFS
    fs.copyFromLocalFile(new Path(scriptFileLocation),
    new Path("/debug/fail-script"));
    URI fsUri = fs.getUri();
    String mapUriStr = fsUri.toString()
    + "/debug/fail-script#fail-script";
    URI mapUri = new URI(mapUriStr);
    DistributedCache.addCacheFile(mapUri, conf);

    The following code runs the Hadoop job.. The only difference is that here, we have called the preceding method to configure failed task scripts.

    public static void main(String[] args) throws Exception
    JobConfconf = new JobConf();
    Job job = new Job(conf, "word count");
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

  2. Compile the code base by running Ant from home directory of the source code. Copy the build/lib/hadoop-cookbook-chapter3.jar to HADOOP_HOME.

  3. Then run the job by running the following command:

    WordcountWithDebugScript /data/input /data/output1

    The job will run the FaultyWordCount task that will always fail. Then Hadoop will execute the debug script, and you can find the results of the debug script from HADOOP_HOME.

How it works...

We configured the debug script through conf.setMapDebugScript("./fail-script"). However, the input value is not the file location, but the command that needs to be run on the machine when an error occurs. If you have a specific file that is present in all machines that you want to run when an error occurs, you can just add that path through the conf.setMapDebugScript("./fail-script") method.

But, Hadoop runs the mappers in multiple nodes, and often in a machine different than the machine running the job's client. Therefore, for the debug script to work, we need to get the script to all the nodes running the mapper.

We do this using the distributed cache. The users can add files that are in the HDFS filesystem to distribute cache. Then, Hadoop automatically copies those files to each node by running map tasks. However, distributed cache copies the files to mapred.local.dir of the MapReduce setup, but it runs the job from a different location. Therefore, we link the cache directory to the working directory by creating a symlink using the DistributedCache.createSymlink(conf) command.

Then Hadoop copies the script files to each mapper node and symlinks it to the working directory of the job. When an error occurs, Hadoop will run the ./fail-script command, which will run the script file that has been copied to the node through distributed cache. The debug script will carry out the tasks you have programmed when an error occurs.

Setting failure percentages and skipping bad records

When processing a large amount of data, there may be cases where a small amount of map tasks will fail, but still the final results make sense without the failed map tasks. This could happen due to a number of reasons such as:

  • Bugs in the map task

  • Small percentage of data records are not well formed

  • Bugs in third-party libraries

In the first case, it is best to debug, find the cause for failures, and fix it. However, in the second and third cases, such errors may be unavoidable. It is possible to tell Hadoop that the job should succeed even if some small percentage of map tasks have failed.

This can be done in two ways:

  • Setting the failure percentages

  • Asking Hadoop to skip bad records

This recipe explains how to configure this behavior.

Getting ready

Start the Hadoop setup.

How to do it...

Run the WordCount sample by passing the following options:

>bin/hadoop jar hadoop-examples-1.0.0.jar wordcount
-Dmapred.skip.reduce.max.skip.groups=1 /data/input1 /data/output1

However, this only works if the job implements the org.apache.hadoop.util.Tools interface. Otherwise, you should set it through JobConf.set(name, value).

How it works...

Hadoop does not support skipping bad records by default. We can turn on bad record skipping by setting the following parameters to positive values:

  • This sets the number of records to skip near a bad record, including the bad record

  • mapred.skip.reduce.max.skip.groups: This sets the number of acceptable skip groups surrounding a bad group

There's more...

You can also limit the percentage of failures in map or reduce tasks by setting the JobConf. setMaxMapTaskFailuresPercent(percent) and JobConf.setMaxReduceTaskFail uresPercent(percent) options.

Also, Hadoop repeats the tasks in case of a failure. You can control that through JobConf. setMaxMapAttempts(5).

Shared-user Hadoop clusters – using fair and other schedulers

When a user submits a job to Hadoop, this job needs to be assigned a resource (a computer/host) before execution. This process is called scheduling, and a scheduler decides when resources are assigned to a given job.

Hadoop is by default configured with a First in First out (FIFO) scheduler, which executes jobs in the same order as they arrive. However, for a deployment that is running many MapReduce jobs and shared by many users, more complex scheduling policies are needed.

The good news is that Hadoop scheduler is pluggable, and it comes with two other schedulers. Therefore, if required, it is possible to write your own scheduler as well.

  • Fair scheduler: This defines pools and over time; each pool gets around the same amount of resources.

  • Capacity scheduler: This defines queues, and each queue has a guaranteed capacity. The capacity scheduler shares computer resources allocated to a queue with other queues if those resources are not in use.

This recipe describes how to change the scheduler in Hadoop.

Getting ready

For this recipe, you need a working Hadoop deployment. Set up Hadoop.

How to do it...

  1. Shut down the Hadoop cluster.

  2. You need hadoop-fairscheduler-1.0.0.jar in the HADOOP_HOME/lib. However, from Hadoop 1.0.0 and higher releases, this JAR file is in the right place in the Hadoop distribution.

  3. Add the following code to the HADOOP_HOME/conf/mapred-site.xml:


  4. Restart Hadoop.

  5. Verify that the new scheduler has been applied by going to http://:50030/scheduler in your installation. If the scheduler has been properly applied, the page will have the heading "Fair Scheduler Administration".

How it works...

When you follow the preceding steps, Hadoop will load the new scheduler settings when it is started. The fair scheduler shares equal amount of resources between users unless it has been configured otherwise.

The fair scheduler supports users to configure it through two ways. There are several parameters of the mapred.fairscheduler.* form, and we can configure these parameters via HADOOP_HOME/conf/mapred-site.xml. Also additional parameters can be configured via HADOOP_HOME/conf/fair-scheduler.xml. More details about fair scheduler can be found from HADOOP_HOME/docs/fair_scheduler.html.

There's more...

Hadoop also includes another scheduler called capacity scheduler that provides more fine-grained control than the fair scheduler. More details about the capacity scheduler can be found from HADOOP_HOME/docs/capacity_scheduler.html.

Hadoop security – integrating with Kerberos

Hadoop by default runs without security. However, it also supports Kerberos-based setup, which provides full security. This recipe describes how to configure Hadoop with Kerberos for security.

Kerberos setups will include a Hadoop cluster—NameNode, DataNodes, JobTracker, and TaskTrackers—and a Kerberos server. We will define users as principals in the Kerberos server. Users can obtain a ticket from the Kerberos server, and use that ticket to log in to any server in Hadoop. We will map each Kerberos principal with a Unix user. Once logged in, the authorization is performed based on the Unix user and group permissions associated with each user.

Getting ready

Set up Hadoop. We need a machine to use as the Kerberos node for which you have root access. Furthermore, the machine should have the domain name already configured (we will assume DNS name is, but you can replace it with another domain). If you want to try this out in a single machine only, you can set up the DNS name through adding your IP address to your /etc/hosts file.

How to do it...

  1. Install Kerberos on your machine. Refer to krb5-1.8/krb5-1.8.6/doc/krb5-install.html for further instructions on setting up Kerberos.

    Provide as the realm and the administrative server when installation asks for it. Then run the following command to create a realm:

    >sudo krb5_newrealm

  2. In Kerberos, we call users "principals". Create a new principal by running following commands:

    >kadmin.local: add principal srinath/admin

  3. Edit /etc/krb5kdc/kadm5.acl to include the line srinath/admin@hadoop. * to grant all the permissions.

  4. Restart the Kerberos server by running the following command:

    >sudo /etc/init.d/krb5-admin-server restart.

  5. You can test the new principal by running following commands:


  6. Kerberos will use Unix users in Hadoop machines as Kerberos principals and use local Unix-level user permissions to do authorization. Create the following users and groups with permissions in all the machines on which you plan to run MapReduce.

    We will have three users—hdfs to run HDFS server, mapred to run MapReduce server, and bob to submit jobs.

    >usermod -g hadoophdfs
    >usermod -g hadoopmapred
    >useradd -G mapred bob
    >usermod -a -G hadoop bob

  7. Now let us create Kerberos principals for these users.

    >kadmin.local: addprinc -randkey
    >kadmin.local: addprinc –randkey
    >kadmin.local: addprinc -randkey
    >kadmin.local: addprinc -randkey

  8. Now, we will create a key tab file that contains credentials for Kerberos principals. We will use these credentials to avoid entering the passwords at Hadoop startup.

    >kadmin: xst -norandkey -k hdfs.keytab hdfs/
    >kadmin: xst -norandkey -k mapred.keytab mapred/hadoop.kbrelam.
    com host/
    >kadmin.local: xst -norandkey -k bob.keytab bob/hadoop.kbrelam.
    >kadmin.local: exit

  9. Deploy key tab files by moving them in to the HADOOP_HOME/conf directory. Change the directory to HADOOP_HOME and run following commands to set the permissions for key tab files:


  10. Now, set permissions in the filesystem and Hadoop. Change the directory to HADOOP_HOME and run the following commands:

    >chownhdfs:hadoop /opt/hadoop-work/name/
    >chownhdfs:hadoop /opt/hadoop-work/data
    >chownmapred:hadoop /opt/hadoop-work/local/
    >bin/hadoopfs -chownhdfs:hadoop /
    >bin/hadoopfs -chmod 755 /
    >bin/hadoopfs -mkdir /mapred
    >bin/hadoopfs -mkdir /mapred/system/
    >bin/hadoopfs -chownmapred:hadoop /mapred/system
    >bin/hadoopfs -chmod -R 700 /mapred/system
    >bin/hadoopfs -chmod 777 /tmp

  11. Install Unlimited Strength Java Cryptography Extension (JCE) Policy Files by downloading the policy files from java/javase/downloads/index.html and copying the JAR files in the distribution to JAVA_HOME/jre/lib/security.

  12. Configure Hadoop properties by adding following properties to the associated configuration files. Replace the HADOOP_HOME value with the corresponding location. Here, Hadoop will replace the _HOST with the localhost name. The following code snippet adds properties to core-site.xml:


  13. Copy the configuration parameters defined in resources/chapter3/kerberoshdfs- site.xml of the source code for this chapter to the HADOOP_HOME/conf/ hdfs-site.xml. Replace the HADOOP_HOME value with the corresponding location. Here Hadoop will replace the _HOST with the localhost name.

  14. Start the NameNode by running the following commands from HADOOP_HOME:

    >sudo -u hdfs bin/hadoopnamenode &

  15. Test HDFS setup by doing some metadata operations.

    >kinit hdfs/ -k -t conf/hdfs.keytab
    >kinit –R

    In the first command, we specify the name of the principal (for example, hdfs/ to apply operations to that principal. The first two commands are theoretically sufficient. However, there is a bug that stops Hadoop from reading the credentials. We can work around this by the last command that rewrites the key in more readable format. Now let's run hdfs commands.

    >bin/hadoopfs -ls /

  16. Start the DataNode (this must be done as the root) by running following command:

    >su - root
    >cd /opt/hadoop-1.0.3/
    >export HADOOP_SECURE_DN_USER=hdfs
    >export HADOOP_DATANODE_USER=hdfs
    >bin/hadoopdatanode &

  17. Configure mapred by adding the following code to conf/map-red.xml. Replace HADOOP_HOME with the corresponding location.

    <value>HADOOP_HOME/conf/mapred.keytab</value><!-- path to the
    MapReducekeytab -->
    </property><!-- TaskTracker security configs -->
    <value>HADOOP_HOME/conf/mapred.keytab</value><!-- path to the
    MapReducekeytab -->
    </property><!-- TaskController settings -->

  18. Configure the Linux task controller, which must be used for Kerberos setup.

    >mkdir /etc/hadoop
    >cpconf/taskcontroller.cfg /etc/hadoop/taskcontroller.cfg
    >chmod 755 /etc/hadoop/taskcontroller.cfg

  19. Add the following code to /etc/hadoop/taskcontroller.cfg:


    Set up the permissions by running the following command from HADOOP_HOME, and verify that the final permissions of bin/task-controller are rwsr-x---. Otherwise, the jobs will fail to execute.

    >chmod 4750 bin/task-controller
    >ls -l bin/task-controller
    >-rwsr-x--- 1 root mapred 63374 May 9 02:05 bin/task-controller

  20. Start the JobTracker and TaskTracker:

    >sudo -u mapred bin/hadoopjobtracker

    Wait for the JobTracker to start up and then run the following command:

    >sudo -u mapred bin/hadooptasktracker

  21. Run the job by running following commands from HADOOP_HOME. If all commands run successfully, you will see the WordCount output.

    >su bob
    >kinit bob/ -k -t conf/bob.keytab
    >kinit –R
    >bin/hadoopfs -mkdir /data
    >bin/hadoopfs -mkdir /data/job1
    >bin/hadoopfs -mkdir /data/job1/input
    >bin/hadoopfs -put README.txt /data/job1/input
    >bin/hadoop jar hadoop-examples-1.0.3.jar wordcount /data/job1 /

How it works...

By running the kinit command, the client would obtain a Kerberos ticket and store it in the filesystem. When we run the command, the client uses the Kerberos ticket to get access to the Hadoop nodes and submit jobs. Hadoop resolves the permission based on the user and group permissions of the Linux users that matches the Kerberos principal.

Hadoop Kerberos security settings have many pitfalls. The two tools that might be useful are as follows:

  • You can enable debugging by adding the environment variable HADOOP_ OPTS="$HADOOP_CLIENT_OPTS"

  • There is a very useful resource that has descriptions for all error codes: Error+Codes

    Also, when you change something, make sure you restart all the processes first by killing all the running processes.

Using the Hadoop Tool interface

Often Hadoop jobs are executed through a command line. Therefore, each Hadoop job has to support reading, parsing, and processing command-line arguments. To avoid each developer having to rewrite this code, Hadoop provides a org.apache.hadoop.util.Tool interface.

How to do it...

  1. See the following code:

    public class WordcountWithTools extends
    Configured implements Tool
    public int run(String[] args) throws Exception
    if (args.length< 2)
    return -1;
    Job job = new Job(getConf(), "word count");
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return 0;
    public static void main(String[] args)
    throws Exception
    int res =
    new Configuration(), new WordcountWithTools(), args);

  2. Set up a input folder in HDFS with /data/input/README.txt if it doesn't already exist. It can be done through following commands:

    bin/hadoopfs -mkdir /data/output
    bin/hadoopfs -mkdir /data/input
    bin/hadoopfs -put README.txt /data/input

  3. Try to run the WordCount without any options, and it will list the available options.

    bin/hadoop jar hadoop-cookbook-chapter3.jar chapter3.
    WordcountWithToolsWordcount <inDir><outDir>
    Generic options supported are
    -conf<configuration file> specify an application configuration
    -D <property=value> use value for given property
    -fs<local|namenode:port> specify a namenode
    -jt<local|jobtracker:port> specify a job tracker
    -files<comma separated list of files> specify comma separated
    files to be copied to the map reduce cluster
    -libjars<comma separated list of jars> specify comma separated
    jar files to include in the classpath.
    -archives<comma separated list of archives> specify comma
    separated archives to be unarchived on the compute machines.
    The general command line syntax is
    bin/hadoop command [genericOptions] [commandOptions]

  4. Run the WordCount sample with the mapred.job.reuse.jvm.num.tasks option to limit the number of JVMs created by the job, as we learned in an earlier recipe.

    bin/hadoop jar hadoop-cookbook-chapter3.jar
    -D mapred.job.reuse.jvm.num.tasks=1 /data/input /data/output

How it works...

When a job extends from the Tool interface, Hadoop will intercept the command-line arguments, parse the options, and configure the JobConf object accordingly. Therefore, the job will support standard generic options.


This article described how to perform advanced administration steps for your Hadoop Cluster.

Resources for Article :

Further resources on this subject:

You've been reading an excerpt of:

Hadoop MapReduce Cookbook

Explore Title