Exploring HDFS

In this article by Tanmay Deshpande, the author of the book Hadoop Real World Solutions Cookbook- Second Edition, we'll cover the following recipes:

  • Loading data from a local machine to HDFS
  • Exporting HDFS data to a local machine
  • Changing the replication factor of an existing file in HDFS
  • Setting the HDFS block size for all the files in a cluster
  • Setting the HDFS block size for a specific file in a cluster
  • Enabling transparent encryption for HDFS
  • Importing data from another Hadoop cluster
  • Recycling deleted data from trash to HDFS
  • Saving compressed data in HDFS

Hadoop has two important components:

  • Storage: This includes HDFS
  • Processing: This includes Map Reduce

HDFS takes care of the storage part of Hadoop. So, let's explore the internals of HDFS through various recipes.

(For more resources related to this topic, see here.)

Loading data from a local machine to HDFS

In this recipe, we are going to load data from a local machine's disk to HDFS.

Getting ready

To perform this recipe, you should have an already Hadoop running cluster.

How to do it...

Performing this recipe is as simple as copying data from one folder to another. There are a couple of ways to copy data from the local machine to HDFS.

  • Using the copyFromLocal command

    To copy the file on HDFS, let's first create a directory on HDFS and then copy the file. Here are the commands to do this:

    hadoop fs -mkdir /mydir1
    hadoop fs -copyFromLocal /usr/local/hadoop/LICENSE.txt /mydir1
    
  • Using the put command

    We will first create the directory, and then put the local file in HDFS:

    hadoop fs -mkdir /mydir2
    hadoop fs -put /usr/local/hadoop/LICENSE.txt /mydir2
    

    You can validate that the files have been copied to the correct folders by listing the files:

    hadoop fs -ls /mydir1
    hadoop fs -ls /mydir2
    

How it works...

When you use HDFS copyFromLocal or the put command, the following things will occur:

  1. First of all, the HDFS client (the command prompt, in this case) contacts NameNode because it needs to copy the file to HDFS.
  2. NameNode then asks the client to break the file into chunks of different cluster block sizes. In Hadoop 2.X, the default block size is 128MB.
  3. Based on the capacity and availability of space in DataNodes, NameNode will decide where these blocks should be copied.
  4. Then, the client starts copying data to specified DataNodes for a specific block. The blocks are copied sequentially one after another.
  5. When a single block is copied, the block is sent to DataNode into packets that are 4MB in size. With each packet, a checksum is sent; once the packet copying is done, it is verified with checksum to check whether it matches. The packets are then sent to the next DataNode where the block will be replicated.
  6. The HDFS client's responsibility is to copy the data to only the first node; the replication is taken care by respective DataNode. Thus, the data block is pipelined from one DataNode to the next.
  7. When the block copying and replication is taking place, metadata on the file is updated in NameNode by DataNode.

Exporting data from HDFS to Local machine

In this recipe, we are going to export/copy data from HDFS to the local machine.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

Performing this recipe is as simple as copying data from one folder to the other. There are a couple of ways in which you can export data from HDFS to the local machine.

  • Using the copyToLocal command, you'll get this code:
    hadoop fs -copyToLocal /mydir1/LICENSE.txt /home/ubuntu
  • Using the get command, you'll get this code:
    hadoop fs -get/mydir1/LICENSE.txt /home/ubuntu

How it works...

When you use HDFS copyToLocal or the get command, the following things occur:

  1. First of all, the client contacts NameNode because it needs a specific file in HDFS.
  2. NameNode then checks whether such a file exists in its FSImage. If the file is not present, the error code is returned to the client.
  3. If the file exists, NameNode checks the metadata for blocks and replica placements in DataNodes.
  4. NameNode then directly points DataNode from where the blocks would be given to client one by one. The data is directly copied from DataNode to the client machine. and it never goes through NameNode to avoid bottlenecks.
  5. Thus, the file is exported to the local machine from HDFS.

Changing the replication factor of an existing file in HDFS

In this recipe, we are going to take a look at how to change the replication factor of a file in HDFS. The default replication factor is 3.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

Sometimes. there might be a need to increase or decrease the replication factor of a specific file in HDFS. In this case, we'll use the setrep command.

This is how you can use the command:

hadoop fs -setrep [-R] [-w] <noOfReplicas><path> ...

In this command, a path can either be a file or directory; if its a directory, then it recursively sets the replication factor for all replicas.

  • The w option flags the command and should wait until the replication is complete
  • The r option is accepted for backward compatibility

First, let's check the replication factor of the file we copied to HDFS in the previous recipe:

hadoop fs -ls /mydir1/LICENSE.txt
-rw-r--r--   3 ubuntu supergroup      15429 2015-10-29 03:04 /mydir1/LICENSE.txt

Once you list the file, it will show you the read/write permissions on this file, and the very next parameter is the replication factor. We have the replication factor set to 3 for our cluster, hence, you the number is 3.

Let's change it to 2 using this command:

hadoop fs -setrep -w 2 /mydir1/LICENSE.txt

It will wait till the replication is adjusted. Once done, you can verify this again by running the ls command:

hadoop fs -ls /mydir1/LICENSE.txt
-rw-r--r--   2 ubuntu supergroup      15429 2015-10-29 03:04 /mydir1/LICENSE.txt

How it works...

Once the setrep command is executed, NameNode will be notified, and then NameNode decides whether the replicas need to be increased or decreased from certain DataNode. When you are using the –w command, sometimes, this process may take too long if the file size is too big.

Setting the HDFS block size for all the files in a cluster

In this recipe, we are going to take a look at how to set a block size at the cluster level.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

The HDFS block size is configurable for all files in the cluster or for a single file as well. To change the block size at the cluster level itself, we need to modify the hdfs-site.xml file.

By default, the HDFS block size is 128MB. In case we want to modify this, we need to update this property, as shown in the following code. This property changes the default block size to 64MB:

<property>
<name>dfs.block.size</name>
    <value>67108864</value>
    <description>HDFS Block size</description>
</property>

If you have a multi-node Hadoop cluster, you should update this file in the nodes, that is, NameNode and DataNode. Make sure you save these changes and restart the HDFS daemons:

/usr/local/hadoop/sbin/stop-dfs.sh
/usr/local/hadoop/sbin/start-dfs.sh

This will set the block size for files that will now get added to the HDFS cluster. Make sure that this does not change the block size of the files that are already present in HDFS. There is no way to change the block sizes of existing files.

How it works...

By default, the HDFS block size is 128MB for Hadoop 2.X. Sometimes, we may want to change this default block size for optimization purposes. When this configuration is successfully updated, all the new files will be saved into blocks of this size. Ensure that these changes do not affect the files that are already present in HDFS; their block size will be defined at the time being copied.

Setting the HDFS block size for a specific file in a cluster

In this recipe, we are going to take a look at how to set the block size for a specific file only.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

In the previous recipe, we learned how to change the block size at the cluster level. But this is not always required. HDFS provides us with the facility to set the block size for a single file as well. The following command copies a file called myfile to HDFS, setting the block size to 1MB:

hadoop fs -Ddfs.block.size=1048576  -put /home/ubuntu/myfile /

Once the file is copied, you can verify whether the block size is set to 1MB and has been broken into exact chunks:

hdfs fsck -blocks /myfile
    Connecting to namenode via 
    http://localhost:50070/fsck?ugi=ubuntu&blocks=1&path=%2Fmyfile
    FSCK started by ubuntu (auth:SIMPLE) from /127.0.0.1 for path 
    /myfile at Thu Oct 29 14:58:00 UTC 2015
    .Status: HEALTHY
    Total size:    17276808 B
    Total dirs:    0
    Total files:   1
    Total symlinks:                0
    Total blocks (validated):      17 (avg. block size 1016282 B)
    Minimally replicated blocks:   17 (100.0 %)
    Over-replicated blocks:        0 (0.0 %)
    Under-replicated blocks:       0 (0.0 %)
    Mis-replicated blocks:         0 (0.0 %)
    Default replication factor:    1
    Average block replication:     1.0
    Corrupt blocks:                0
    Missing replicas:              0 (0.0 %)
    Number of data-nodes:          3
    Number of racks:               1
    FSCK ended at Thu Oct 29 14:58:00 UTC 2015 in 2 milliseconds

    The filesystem under path '/myfile' is HEALTHY

How it works...

When we specify the block size at the time of copying a file, it overwrites the default block size and copies the file to HDFS by breaking the file into chunks of a given size. Generally, these modifications are made in order to perform other optimizations. Make sure you make these changes, and you are aware of their consequences. If the block size isn't adequate enough, it will increase the parallelization, but it will also increase the load on NameNode as it would have more entries in FSImage. On the other hand, if the block size is too big, then it will reduce the parallelization and degrade the processing performance.

Enabling transparent encryption for HDFS

When handling sensitive data, it is always important to consider the security measures. Hadoop allows us to encrypt sensitive data that's present in HDFS. In this recipe, we are going to see how to encrypt data in HDFS.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

For many applications that hold sensitive data, it is very important to adhere to standards such as PCI, HIPPA, FISMA, and so on. To enable this, HDFS provides a utility called encryption zone where we can create a directory in it so that data is encrypted on writes and decrypted on read.

To use this encryption facility, we first need to enable Hadoop Key Management Server (KMS):

/usr/local/hadoop/sbin/kms.sh start

This would start KMS in the Tomcat web server.

Next, we need to append the following properties in core-site.xml and hdfs-site.xml.

In core-site.xml, add the following property:

<property>
    <name>hadoop.security.key.provider.path</name>
    <value>kms://http@localhost:16000/kms</value>
</property>

In hds-site.xml, add the following property:

<property>
    <name>dfs.encryption.key.provider.uri</name>
    <value>kms://http@localhost:16000/kms</value>
</property>

Restart the HDFS daemons:

/usr/local/hadoop/sbin/stop-dfs.sh
/usr/local/hadoop/sbin/start-dfs.sh

Now, we are all set to use KMS. Next, we need to create a key that will be used for the encryption:

hadoop key create mykey

This will create a key, and then, save it on KMS. Next, we have to create an encryption zone, which is a directory in HDFS where all the encrypted data is saved:

hadoop fs -mkdir /zone
hdfs crypto -createZone -keyName mykey -path /zone

We will change the ownership to the current user:

hadoop fs -chown ubuntu:ubuntu /zone

If we put any file into this directory, it will encrypt and would decrypt at the time of reading:

hadoop fs -put myfile /zone
hadoop fs -cat /zone/myfile

How it works...

There can be various types of encryptions one can do in order to comply with security standards, for example, application-level encryption, database level, file level, and disk-level encryption.

The HDFS transparent encryption sits between the database and file-level encryptions. KMS acts like proxy between HDFS clients and HDFS's encryption provider via HTTP REST APIs. There are two types of keys used for encryption: Encryption Zone Key( EZK) and Data Encryption Key (DEK). EZK is used to encrypt DEK, which is also called Encrypted Data Encryption Key(EDEK). This is then saved on NameNode.

When a file needs to be written to the HDFS encryption zone, the client gets EDEK from NameNode and EZK from KMS to form DEK, which is used to encrypt data and store it in HDFS (the encrypted zone).

When an encrypted file needs to be read, the client needs DEK, which is formed by combining EZK and EDEK. These are obtained from KMS and NameNode, respectively. Thus, encryption and decryption is automatically handled by HDFS. and the end user does not need to worry about executing this on their own.

You can read more on this topic at http://blog.cloudera.com/blog/2015/01/new-in-cdh-5-3-transparent-encryption-in-hdfs/.

Importing data from another Hadoop cluster

Sometimes, we may want to copy data from one HDFS to another either for development, testing, or production migration. In this recipe, we will learn how to copy data from one HDFS cluster to another.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

Hadoop provides a utility called DistCp, which helps us copy data from one cluster to another. Using this utility is as simple as copying from one folder to another:

hadoop distcp hdfs://hadoopCluster1:9000/source hdfs://hadoopCluster2:9000/target

This would use a Map Reduce job to copy data from one cluster to another. You can also specify multiple source files to be copied to the target. There are couple of other options that we can also use:

  • -update: When we use DistCp with the update option, it will copy only those files from the source that are not part of the target or differ from the target.
  • -overwrite: When we use DistCp with the overwrite option, it overwrites the target directory with the source.

How it works...

When DistCp is executed, it uses map reduce to copy the data and also assists in error handling and reporting. It expands the list of source files and directories and inputs them to map tasks. When copying from multiple sources, collisions are resolved in the destination based on the option (update/overwrite) that's provided. By default, it skips if the file is already present at the target. Once the copying is complete, the count of skipped files is presented.

You can read more on DistCp at https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html.

Recycling deleted data from trash to HDFS

In this recipe, we are going to see how recover deleted data from the trash to HDFS.

Getting ready

To perform this recipe, you should already have a running Hadoop cluster.

How to do it...

To recover accidently deleted data from HDFS, we first need to enable the trash folder, which is not enabled by default in HDFS. This can be achieved by adding the following property to core-site.xml:

<property>
    <name>fs.trash.interval</name>
    <value>120</value>
</property>

Then, restart the HDFS daemons:

/usr/local/hadoop/sbin/stop-dfs.sh
/usr/local/hadoop/sbin/start-dfs.sh

This will set the deleted file retention to 120 minutes.

Now, let's try to delete a file from HDFS:

hadoop fs -rmr /LICENSE.txt
    15/10/30 10:26:26 INFO fs.TrashPolicyDefault: Namenode trash 
    configuration: Deletion interval = 120 minutes, Emptier interval 
    = 0 minutes.
    Moved: 'hdfs://localhost:9000/LICENSE.txt' to trash at: 
    hdfs://localhost:9000/user/ubuntu/.Trash/Current

We have 120 minutes to recover this file before it is permanently deleted from HDFS. To restore the file to its original location, we can execute the following commands.

First, let's confirm whether the file exists:

hadoop fs -ls /user/ubuntu/.Trash/Current
    Found 1 items
    -rw-r--r--   1 ubuntu supergroup      15429 2015-10-30 10:26 
    /user/ubuntu/.Trash/Current/LICENSE.txt

Now, restore the deleted file or folder; it's better to use the distcp command instead of copying each file one by one:

hadoop distcp hdfs

//localhost:9000/user/ubuntu/.Trash/Current/LICENSE.txt hdfs://localhost:9000/

This will start a map reduce job to restore data from the trash to the original HDFS folder. Check the HDFS path; the deleted file should be back to its original form.

How it works...

Enabling trash enforces the file retention policy for a specified amount of time. So, when trash is enabled, HDFS does not execute any blocks deletions or movements immediately but only updates the metadata of the file and its location. This way, we can accidently stop deleting files from HDFS; make sure that trash is enabled before experimenting with this recipe.

Saving compressed data on HDFS

In this recipe, we are going to take a look at how to store and process compressed data in HDFS.

Getting ready

To perform this recipe, you should already have a running Hadoop.

How to do it...

It's always good to use compression while storing data in HDFS. HDFS supports various types of compression algorithms such as LZO, BIZ2, Snappy, GZIP, and so on. Every algorithm has its own pros and cons when you consider the time taken to compress and decompress and the space efficiency. These days people prefer Snappy compression as it aims to achieve a very high speed and reasonable amount compression.

We can easily store and process any number of files in HDFS. To store compressed data, we don't need to specifically make any changes to the Hadoop cluster. You can simply copy the compressed data in the same way it's in HDFS. Here is an example of this:

hadoop fs -mkdir /compressed
hadoop fs –put file.bz2 /compressed

Now, we'll run a sample program to take a look at how Hadoop automatically uncompresses the file and processes it:

hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.0.jar wordcount /compressed /compressed_out

Once the job is complete, you can verify the output.

How it works...

Hadoop explores native libraries to find the support needed for various codecs and their implementations. Native libraries are specific to the platform that you run Hadoop on. You don't need to make any configurations changes to enable compression algorithms. As mentioned earlier, Hadoop supports various compression algorithms that are already familiar to the computer world. Based on your needs and requirements (more space or more time), you can choose your compression algorithm.

Take a look at http://comphadoop.weebly.com/ for more information on this.

Summary

We covered major factors with respect to HDFS in this article which comprises of recipes that help us to load, extract, import, export and saving data in HDFS. It also covers enabling transparent encryption for HDFS as well adjusting block size of HDFS cluster.

Resources for Article:


Further resources on this subject:


You've been reading an excerpt of:

Hadoop Real-World Solutions Cookbook - Second Edition

Explore Title
comments powered by Disqus