Using Node.js and Hadoop to store distributed data

Harri Siirak

October 19th, 2015

In my previous post we had a brief introduction to using Node.js to store and retrieve data in HDFS (Hadoop Distributed File System) over its WebHDFS REST API. This time let's see how we can use Hadoop on the full power, by using its divine MapReduce functionality, and how we can engage Node.js as part of this process.

In short, MapReduce is a framework of processing parallelizable problems across huge datasets using a large cluster of computing nodes (computers). It allows for distributed processing of the map and reduction operations. When each mapping operation is independent of the others, all maps can be performed in parallel – albeit in real life this is limited by the number of data sources and/or the number of CPUs near each source.

Apache Hadoop's MapReduce and HDFS components were inspired by Google papers on their MapReduce and Google File System.

Word occurences counting is quite popular and a trivial use case that is used to demonstrate MapReduce basic usage and flow. I can't resist to do the same - so here is yet another WordCount example.

Global configuration

To run final MapReduce job, we need to set (if you have already set this environment variable, you can pass this part) the $HADOOP_HOME environment parameter. That is the path where your Hadoop installation is located.

For example, I'm running it on OS X and I installed the latest version (2.7.0 as of writing this) from homebrew:

export HADOOP_HOME=/usr/local/Cellar/hadoop/2.7.0

If you're running Hadoop on Linux, your $HADOOP_HOME probably will be (it may be different if you installed it manually):

export HADOOP_HOME=/usr/local/hadoop

Mapper

The Mapper goal is to transform each element individually to an output data element. In this example we're emitting word and occurrences count pairs, separated by tab characters.

Here I'm processing whole chunks of the data, counting all of the words, and finally outputting corresponding word and count pairs. It's also possible to solve this by emitting every occurrence separately, but the final results will be the same.

/**
* Mapper chunk processing function.
* Reads STDIN
*/
function process () {
   var chunk = process.stdin.read(); // Read a chunk
   if (chunk !== null) {
       // Replace all newlines and tab chars with spaces
       [ '\n', '\t'].forEach(function (char) {
           chunk = chunk.replace(new RegExp(char, 'g'), ' ');
       });

       // Split it
       var words = chunk.trim().split(' ');
       var counts = {};

       // Count words
       words.forEach(function (word) {
           word = word.trim();

           if (word.length) {
               if (!counts[word]) {
                   counts[word] = 0;
               }

               counts[word]++;
           }
       });

       // Emit results
       Object.keys(counts).forEach(function (word) {
           var count = counts[word];
           process.stdout.write(word + '\t' + count + '\n');
       });
   }
}

process.stdin.setEncoding('utf8');
process.stdin.on('readable', process); // Set STDIN processing handler

Reducer

Reducer, as its name says, collects/aggregates mapper values together, finally returning a single output value.

/**
* Reducer chunk processing function.
* Reads STDIN
*/
function process () {
   var chunk = process.stdin.read(); // Read a chunk
   if (chunk !== null) {
       // Split it
       var lines = chunk.trim().split('\n');
       var counts = {};

       // Count words
       lines.forEach(function (line) {
           line = line.trim();

           var atom = line.split('\t');
           var word = atom[0];
           var count = +atom[1];

           if (word !== null && word.length) {
               if (!counts[word]) {
                   counts[word] = 0;
               }

               counts[word] += count;
           }
       });

       // Emit results
       Object.keys(counts).forEach(function (word) {
           var count = counts[word];
           process.stdout.write(word + '\t' + count + '\n');
       });
   }
}

process.stdin.setEncoding('utf8');
process.stdin.on('readable', process); // Set STDIN processing handler

Prepare input file

As an input data we're using Herman Melville's omnious "Moby Dick", which is freely available and downloadable from the Gutenberg project.

wget http://www.gutenberg.org/files/2701/old/moby10b.txt

Next, copy the downloaded file into HDFS:

hdfs dfs -put moby10b.txt

This file will be copied into the /users/systemuser/ (in my case /users/harri/) folder.

Execute job

Hadoop comes with the streaming utility, which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.

hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
   -Dmapreduce.job.maps=10 \
   -Dmapreduce.job.reduces=10 \
   -files ./mapper.js,./reducer.js \
   -mapper "node ./mapper.js" \
   -reducer "node ./reducer.js" \
   -input ./moby10b.txt \
   -output ./moby10b.out

About the provided options:

  • mapreduce.job.maps=10 - the number of mapper tasks used
  • mapreduce.job.reduces=10 - the number of reduce tasks used
  • -files ./mapper.js,./reducer.js - script files that will be included with the job
  • -mapper "node ./mapper.js" – the command that will be executed in the mapping phase
  • -reducer "node ./reducer.js" – the command that will be executed in the reduce phase
  • -input ./moby10b.txt - input files for the job
  • -output ./moby10b.out - output files path for the job

Retreive job results

Results can be retrieved by (or by WebHDFS API - whichever fits you better) the Hadoop CLI utility.

hdfs dfs -get moby10b.out/part-00000

And here are the output files:

$ cat moby10b.out/part-00000

wolfish 2
wolves 1
wolves, 1
woman   4
woman's 4
woman, 2
womb.   1
women   7
women's 1
women, 2
won't   29
wonder 20
wonder, 8

What's next?

Now you have some idea what MapReduce is and how it can be used for the greater good, so you can proceed with building your applications more efficiently, by taking advantage of big data and its almost limitless processing capabilities.

About the Author

Harri Siirak is a mission architect and partner at nodeSWAT.com. He is a senior Node.js/JavaScript developer among a talented team of full-stack developers who specialize in building scalable and secure Node.js-based solutions. He can be found on Github at harrisiirak.