HDFS and MapReduce

A complete, hands-on guide to building and maintaining large Apache Hadoop clusters using Cloudera Manager and CDH5

(For more resources related to this topic, see here.)

Essentials of HDFS

HDFS is a distributed filesystem that has been designed to run on top of a cluster of industry standard hardware. The architecture of HDFS is such that there is no specific need for high-end hardware. HDFS is a highly fault-tolerant system and can handle failures of nodes in a cluster without loss of data. The primary goal behind the design of HDFS is to serve large data files efficiently. HDFS achieves this efficiency and high throughput in data transfer by enabling streaming access to the data in the filesystem.

The following are the important features of HDFS:

  • Fault tolerance: Many computers working together as a cluster are bound to have hardware failures. Hardware failures such as disk failures, network connectivity issues, and RAM failures could disrupt processing and cause major downtime. This could lead to data loss as well slippage of critical SLAs. HDFS is designed to withstand such hardware failures by detecting faults and taking recovery actions as required.

    The data in HDFS is split across the machines in the cluster as chunks of data called blocks. These blocks are replicated across multiple machines of the cluster for redundancy. So, even if a node/machine becomes completely unusable and shuts down, the processing can go on with the copy of the data present on the nodes where the data was replicated.

  • Streaming data: Streaming access enables data to be transferred in the form of a steady and continuous stream. This means if data from a file in HDFS needs to be processed, HDFS starts sending the data as it reads the file and does not wait for the entire file to be read. The client who is consuming this data starts processing the data immediately, as it receives the stream from HDFS. This makes data processing really fast.

  • Large data store: HDFS is used to store large volumes of data. HDFS functions best when the individual data files stored are large files, rather than having large number of small files. File sizes in most Hadoop clusters range from gigabytes to terabytes. The storage scales linearly as more nodes are added to the cluster.

  • Portable: HDFS is a highly portable system. Since it is built on Java, any machine or operating system that can run Java should be able to run HDFS. Even at the hardware layer, HDFS is flexible and runs on most of the commonly available hardware platforms. Most production level clusters have been set up on commodity hardware.

  • Easy interface: The HDFS command-line interface is very similar to any Linux/Unix system. The commands are similar in most cases. So, if one is comfortable with the performing basic file actions in Linux/Unix, using commands with HDFS should be very easy.

The following two daemons are responsible for operations on HDFS:

  • Namenode

  • Datanode

The namenode and datanodes daemons talk to each other via TCP/IP.

Configuring HDFS

All HDFS-related configuration is done by adding/updating the properties in the hdfs-site.xml file that is found in the conf folder under the Hadoop installation folder.

The following are the different properties that are part of the hdfs-site.xml file:

  • dfs.namenode.servicerpc-address: This specifies the unique namenode RPC address in the cluster. Services/daemons such as the secondary namenode and datanode daemons use this address to connect to the namenode daemon whenever it needs to communicate. This property is shown in the following code snippet:

    <property> <name>dfs.namenode.servicerpc-address</name> <value>node1.hcluster:8022</value> </property>

  • dfs.namenode.http-address: This specifies the URL that can be used to monitor the namenode daemon from a browser. This property is shown in the following code snippet:

    <property> <name>dfs.namenode.http-address</name> <value>node1.hcluster:50070</value> </property>

  • dfs.replication: This specifies the replication factor for data block replication across the datanode daemons. The default is 3 as shown in the following code snippet:

    <property> <name>dfs.replication</name> <value>3</value> </property>

  • dfs.blocksize: This specifies the block size. In the following example, the size is specified in bytes (134,217,728 bytes is 128 MB):

    <property> <name>dfs.blocksize</name> <value>134217728</value> </property>

  • fs.permissions.umask-mode: This specifies the umask value that will be used when creating files and directories in HDFS. This property is shown in the following code snippet:

    <property> <name>fs.permissions.umask-mode</name> <value>022</value> </property>

The read/write operational flow in HDFS

To get a better understanding of HDFS, we need to understand the flow of operations for the following two scenarios:

  • A file is written to HDFS

  • A file is read from HDFS

HDFS uses a single-write, multiple-read model, where the files are written once and read several times. The data cannot be altered once written. However, data can be appended to the file by reopening it. All files in the HDFS are saved as data blocks.

Writing files in HDFS

The following sequence of steps occur when a client tries to write a file to HDFS:

  1. The client informs the namenode daemon that it wants to write a file. The namenode daemon checks to see whether the file already exists.

  2. If it exists, an appropriate message is sent back to the client. If it does not exist, the namenode daemon makes a metadata entry for the new file.

  3. The file to be written is split into data packets at the client end and a data queue is built. The packets in the queue are then streamed to the datanodes in the cluster.

  4. The list of datanodes is given by the namenode daemon, which is prepared based on the data replication factor configured. A pipeline is built to perform the writes to all datanodes provided by the namenode daemon.

  5. The first packet from the data queue is then transferred to the first datanode daemon. The block is stored on the first datanode daemon and is then copied to the next datanode daemon in the pipeline. This process goes on till the packet is written to the last datanode daemon in the pipeline.

  6. The sequence is repeated for all the packets in the data queue. For every packet written on the datanode daemon, a corresponding acknowledgement is sent back to the client.

  7. If a packet fails to write onto one of the datanodes, the datanode daemon is removed from the pipeline and the remainder of the packets is written to the good datanodes. The namenode daemon notices the under-replication of the block and arranges for another datanode daemon where the block could be replicated.

  8. After all the packets are written, the client performs a close action, indicating that the packets in the data queue have been completely transferred.

  9. The client informs the namenode daemon that the write operation is now complete.

The following diagram shows the data block replication process across the datanodes during a write operation in HDFS:

Reading files in HDFS

The following steps occur when a client tries to read a file in HDFS:

  1. The client contacts the namenode daemon to get the location of the data blocks of the file it wants to read.

  2. The namenode daemon returns the list of addresses of the datanodes for the data blocks.

  3. For any read operation, HDFS tries to return the node with the data block that is closest to the client. Here, closest refers to network proximity between the datanode daemon and the client.

  4. Once the client has the list, it connects the closest datanode daemon and starts reading the data block using a stream.

  5. After the block is read completely, the connection to datanode is terminated and the datanode daemon that hosts the next block in the sequence is identified and the data block is streamed. This goes on until the last data block for that file is read.

The following diagram shows the read operation of a file in HDFS:

Understanding the namenode UI

Hadoop provides web interfaces for each of its services. The namenode UI or the namenode web interface is used to monitor the status of the namenode and can be accessed using the following URL:

http://<namenode-server>:50070/

The namenode UI has the following sections:

  • Overview: The general information section provides basic information of the namenode with options to browse the filesystem and the namenode logs.

    The following is the screenshot of the Overview section of the namenode UI:

    The Cluster ID parameter displays the identification number of the cluster. This number is same across all the nodes within the cluster.

    A block pool is a set of blocks that belong to a single namespace. The Block Pool Id parameter is used to segregate the block pools in case there are multiple namespaces configured when using HDFS federation. In HDFS federation, multiple namenodes are configured to scale the name service horizontally. These namenodes are configured to share datanodes amongst themselves. We will be exploring HDFS federation in detail a bit later.

  • Summary: The following is the screenshot of the cluster's summary section from the namenode web interface:

    Under the Summary section, the first parameter is related to the security configuration of the cluster. If Kerberos (the authorization and authentication system used in Hadoop) is configured, the parameter will show as Security is on. If Kerberos is not configured, the parameter will show as Security is off.

    The next parameter displays information related to files and blocks in the cluster. Along with this information, the heap and non-heap memory utilization is also displayed. The other parameters displayed in the Summary section are as follows:

    • Configured Capacity: This displays the total capacity (storage space) of HDFS.

    • DFS Used: This displays the total space used in HDFS.

    • Non DFS Used: This displays the amount of space used by other files that are not part of HDFS. This is the space used by the operating system and other files.

    • DFS Remaining: This displays the total space remaining in HDFS.

    • DFS Used%: This displays the total HDFS space utilization shown as percentage.

    • DFS Remaining%: This displays the total HDFS space remaining shown as percentage.

    • Block Pool Used: This displays the total space utilized by the current namespace.

    • Block Pool Used%: This displays the total space utilized by the current namespace shown as percentage. As you can see in the preceding screenshot, in this case, the value matches that of the DFS Used% parameter. This is because there is only one namespace (one namenode) and HDFS is not federated.

    • DataNodes usages% (Min, Median, Max, stdDev): This displays the usages across all datanodes in the cluster. This helps administrators identify unbalanced nodes, which may occur when data is not uniformly placed across the datanodes. Administrators have the option to rebalance the datanodes using a balancer.

    • Live Nodes: This link displays all the datanodes in the cluster as shown in the following screenshot:

    • Dead Nodes: This link displays all the datanodes that are currently in a dead state in the cluster. A dead state for a datanode daemon is when the datanode daemon is not running or has not sent a heartbeat message to the namenode daemon. Datanodes are unable to send heartbeats if there exists a network connection issue between the machines that host the datanode and namenode daemons. Excessive swapping on the datanode machine causes the machine to become unresponsive, which also prevents the datanode daemon from sending heartbeats.

    • Decommissioning Nodes: This link lists all the datanodes that are being decommissioned.

    • Number of Under-Replicated Blocks: This represents the number of blocks that have not replicated as per the replication factor configured in the hdfs-site.xml file.

  • Namenode Journal Status: The journal status provides location information of the fsimage file and the state of the edits logfile. The following screenshot shows the NameNode Journal Status section:

  • NameNode Storage: The namenode storage table provides the location of the fsimage file along with the type of the location. In this case, it is IMAGE_AND_EDITS, which means the same location is used to store the fsimage file as well as the edits logfile. The other types of locations are IMAGE, which stores only the fsimage file and EDITS, which stores only the edits logfile. The following screenshot shows the NameNode Storage information:

Understanding the secondary namenode UI

The secondary namenode is a checkpoint service for the namenode daemon that performs periodic merging of the edits log and the fsimage file. The secondary namenode UI can be accessed using the following URL:

http://<secondary-namenode-server>:50090/

The following screenshot shows the secondary namenode UI:

Just like the namenode UI, the secondary namenode UI also displays the Hadoop version. All checkpoint related information is available in this UI, which are given as follows:

  • Name Node Address: This is the RPC address of the primary namenode daemon. Secondary namenode uses this address to connect to primary namenode.

  • Start Time: This is the start timestamp of the secondary namenode service.

  • Last Checkpoint Time: This the timestamp of the last checkpoint action performed by the secondary namenode daemon.

  • Checkpoint Period: This property defines the schedule to perform the checkpoint. In the preceding screenshot, the value is 3,600 seconds. This means that every 3,600 seconds (1 hour), the secondary namenode daemon will perform the checkpoint operation.

  • Checkpoint Size: If the edits logfile reaches this checkpoint size, the secondary namenode daemon will perform the checkpoint even if the check point period has not elapsed.

  • Checkpoint Dirs: This is the location of the fsimage file stored by the secondary namenode daemon.

  • Checkpoint Edit Dirs: This is the location of the edits logfiles stored by the secondary namenode daemon.

Exploring HDFS commands

To perform filesystem related tasks, the commands begin with hdfs dfs. The filesystem commands have been designed to behave similarly to the corresponding Unix/Linux filesystem commands.

What is a URI? URI stands for Uniform Resource Identifier. In the commands that are listed as follows, you will observe the use of URI for file locations. The URI syntax to access a file in HDFS is hdfs://namenodehost/parent/child/<file>.

Commonly used HDFS commands

The following are some of the most commonly used HDFS commands:

  • ls: This command lists files in HDFS.

    The syntax of the ls command is hdfs dfs -ls <args>. The following is the screenshot showing an example of the ls command:

  • cat: This command displays the contents of file/files in the terminal.

    The syntax of the cat command is hdfs dfs -cat URI [URI …]. The following is a sample output of the cat command:

  • copyFromLocal: This command copies a file/files from the local filesystem to HDFS.

    The syntax of the copyFromLocal command is hdfs dfs -copyFromLocal <localsrc> URI. The following is the screenshot showing an example of the copyFromLocal command:

  • copyToLocal: This command copies a file/files from HDFS to the local filesystem.

    The syntax of the copyToLocal command is hdfs dfs -copyToLocal URI <localdst>. The following is the screenshot showing an example of the copyToLocal command:

  • cp: This command copies files within HDFS.

    The syntax of the cp command is hdfs dfs -cp URI [URI …] <dest>. The following is the screenshot showing an example of the cp command:

  • mkdir: This command creates a directory in HDFS.

    The syntax of the mkdir command is hdfs dfs -mkdir <paths>. The following is the screenshot showing an example of the mkdir command:

  • mv: This command moves files within HDFS.

    The syntax of the mv command is hdfs dfs -mv URI [URI …] <dest>. The following is the screenshot showing an example of the mv command:

  • rm: This command deletes files from HDFS.

    The syntax of the rm command is hdfs dfs -rm URI [URI …]. The following is the screenshot showing an example of the rm command:

  • rm -r: This command deletes a directory from the HDFS.

    The syntax of the rm –r command is hdfs dfs –rm -r URI [URI …]. The following is the screenshot showing an example of the rm -r command:

  • setrep: This command sets the replication factor for a file in HDFS.

    The syntax of the setrep command is hdfs dfs -setrep [-R] <path>. The following is the screenshot showing an example of the setrep command:

  • tail: This command displays the trailing kilobyte of the contents of a file in HDFS.

    The syntax of the tail command is hdfs dfs -tail [-f] URI. The following is the screenshot showing an example of the tail command:

Commands to administer HDFS

Hadoop provides several commands to administer HDFS. The following are two of the commonly used administration commands in HDFS:

  • balancer: In a cluster, new datanodes can be added. The addition of new datanodes provides more storage space for the cluster. However, when a new datanode is added, the datanode does not have any files. Due to the addition of the new datanode, data blocks across all the datanodes are in a state of imbalance, that is, they are not evenly spread across the datanodes. The administrator can use the balancer command to balance the cluster. The balancer can be invoked using this command.

    The syntax of the balancer command is hdfs balancer –threshold <threshold>. Here, threshold is the balancing threshold expressed in percentage. The threshold is specified as a float value that ranges from 0 to 100. The default threshold values is 10. The balancer tries to distribute blocks to the underutilized datanodes. For example, if the average utilization of all the datanodes in the cluster is 50 percent, the balancer, by default, will try to pick up blocks from nodes that have a utilization of above 60 percent (50 percent + 10 percent) and move them to nodes that have a utilization of below 40 percent (50 percent - 10 percent).

  • dfsadmin: The dfsadmin command is used to run administrative commands on HDFS.

    The syntax of the dfsadmin command is hadoop dfsadmin <options>. Let's understand a few of the important command options and the actions they perform:

    • [-report]: This generates a report of the basic filesystem information and statistics.

    • [-safemode <enter | leave | get | wait>]: This safe mode is a namenode state in which it does not accept changes to the namespace (read-only) and does not replicate or delete blocks.

    • [-saveNamespace]: This saves the current state of the namespace to a storage directory and resets the edits log.

    • [-rollEdits]: This forces a rollover of the edits log, that is, it saves the state of the current edits log and creates a fresh edits log for new transactions.

    • [-restoreFailedStorage true|false|check]: This enables to set/unset or check to attempt to restore failed storage replicas.

    • [-refreshNodes]: This updates the namenode daemon with the set of datanodes allowed to connect to the namenode daemon.

    • [-setQuota <quota> <dirname>...<dirname>]: This sets the quota (the number of items) for the directory/directories.

    • [-clrQuota <dirname>...<dirname>]: This clears the set quota for the directory/directories.

    • [-setSpaceQuota <quota> <dirname>...<dirname>]: This sets the disk space quota for the directory/directories.

    • [-clrSpaceQuota <dirname>...<dirname>]: This clears the disk space quota for the directory/directories.

    • [-refreshserviceacl]: This refreshes the service-level authorization policy file. We will be learning more about authorization later.

    • [-printTopology]: This prints the tree of the racks and their nodes as reported by the namenode daemon.

    • [-refreshNamenodes datanodehost:port]: This reloads the configuration files for a datanode daemon, stops serving the removed block pools, and starts serving new block pools. A block pool is a set of blocks that belong to a single namespace. We will be looking into this concept a bit later.

    • [-deleteBlockPool datanodehost:port blockpoolId [force]]: This deletes a block pool of a datanode daemon.

    • [-setBalancerBandwidth <bandwidth>]: This sets the bandwidth limit to be used by the balancer. The bandwidth is the value in bytes per second that the balancer should use for data blocks movement.

    • [-fetchImage <local directory>]: This gets the latest fsimage file from namenode and saves it to the specified local directory.

    • [-help [cmd]]: This displays help for the given command or all commands if a command is not specified.

Getting acquainted with MapReduce

Now you have a solid knowledge base in HDFS, it is now time to dive into the processing module of Hadoop known as MapReduce. Once we have the data in the cluster, we need a programming model to perform advanced operations on it. This is done using Hadoop's MapReduce.

The MapReduce programming model concept has been in existence for quite some time now. This model was designed to process large volumes of data in parallel. Google implemented a version of MapReduce in house to process their data stored on GFS. Later, Google released a paper explaining their implementation. Hadoop's MapReduce implementation is based on this paper.

MapReduce in Hadoop is a Java-based distributed programming framework that leverages the features of HDFS to execute high performance batch processing of the data stored in HDFS.

The processing can be divided into major functions, which are:

  • Map

  • Reduce

We will focus on the MapReduce architecture and how it works together with HDFS to process large volumes of data.

Understanding the map phase

In a MapReduce application, all the data read in the map function is read in the form of key and value pairs. The processed output of the map function is also in the form of key and value pairs. The processing of data as key and value pairs works well in a distributed computing environment.

Let's understand how MapReduce works with the help of an example. The word counting program is known as the Hello, World program for MapReduce. The program counts the number of words in an input set of text files.

For this example, let's consider a file with the following line in it:

She sells sea shells on the sea shore where she also sells cookies.

So, if the preceding text is provided as an input to the word count program, the expected output would be as follows:

she, 2 sells,2 sea, 2 shells, 1 on, 1 the, 1 shore, 1 where, 1 also, 1 cookies, 1

The three major components of a MapReduce program are:

  • Driver

  • Mapper

  • Reducer

The driver component of a MapReduce program is responsible for setting up the job configurations and submitting it to the Hadoop cluster. This part of the program runs on the client computer.

The driver component of the word count program would take two parameters to submit the job to the Hadoop cluster:

  • The location of the input files

  • The location of the output file

Once the job is submitted to the cluster, the mapper reads every line in the file as <key, value> pairs. So, if we consider a file with the line mentioned earlier, the key will be the offset of the line and the value will be the entire sentence.

The mapper reads the line as follows:

<0000, She sells sea shells on the sea shore where she also sells cookies>

Once read, the mapper logic would emit the <key, value> pairs for each word in the sentence as follows:

<she, 1> <sells, 1> <sea, 1> <shells, 1> <on, 1> <the, 1> <sea, 1> <shore, 1> <where, 1> <she, 1> <also, 1> <sells, 1> <cookies, 1>

The mapping function has emitted each word in the sentence as a key and constant number 1 as the value for each key.

Understanding the reduce phase

The reduce function reads the intermediate <key, value> pairs emitted by the mapper and produces the final result.

These results are then taken as input by the reducer in a sorted order of the keys. The reducer logic would then work on each key group; in this case, it would sum up the values for each key and would produce the final result as follows:

she, 2 sells,2 sea, 2 shells, 1 on, 1 the, 1 shore, 1 where, 1 also, 1 cookies, 1

The following is a functional representation of the map and reduce functions:

Function

Input

Output

map

<k1, v1>

list(k2, v2)

reduce

<k2, list(v2)>

list(<k3, v3>)

The following diagram shows the flow of a MapReduce job starting from an input file right up to the generation of an output file:

In the preceding diagram, you see a very simple flow of MapReduce. However, in real production scenarios, there are multiple mappers and reducers.

When there are multiple mappers and reducers involved, there is a phase between the mapper and reducer known as the shuffle and sort phase. In this phase, all the keys are sorted and sent to the reducers. Each reducer works on the set of keys and values provided as input and generates their own output file as shown in the following diagram:

Summary

In this article, we have learned the essentials of HDFS, such as file operations on HDFS and how to configure HDFS. We looked at the namenode and secondary namenode web interfaces and explored a few HDFS commands. We also covered the MapReduce architecture.

Resources for Article:


Further resources on this subject:


Books to Consider

comments powered by Disqus
X

An Introduction to 3D Printing

Explore the future of manufacturing and design  - read our guide to 3d printing for free