HBase Administration, Performance Tuning

Master HBase configuration and administration for optimum database performance with this book and ebook

Setting up Hadoop to spread disk I/O

Modern servers usually have multiple disk devices to provide large storage capacities. These disks are usually configured as RAID arrays, as their factory settings. This is good for many cases but not for Hadoop.

The Hadoop slave node stores HDFS data blocks and MapReduce temporary files on its local disks. These local disk operations benefit from using multiple independent disks to spread disk I/O.

In this recipe, we will describe how to set up Hadoop to use multiple disks to spread its disk I/O.

Getting ready

We assume you have multiple disks for each DataNode node. These disks are in a JBOD (Just a Bunch Of Disks) or RAID0 configuration. Assume that the disks are mounted at /mnt/d0, /mnt/d1, …, /mnt/dn, and the user who starts HDFS has write permission on each mount point.

How to do it...

In order to set up Hadoop to spread disk I/O, follow these instructions:

  1. On each DataNode node, create directories on each disk for HDFS to store its data blocks:
  2. hadoop$ mkdir -p /mnt/d0/dfs/data
    hadoop$ mkdir -p /mnt/d1/dfs/data

    hadoop$ mkdir -p /mnt/dn/dfs/data

  3. Add the following code to the HDFS configuration file (hdfs-site.xml):
  4. hadoop@master1$ vi $HADOOP_HOME/conf/hdfs-site.xml

  5. Sync the modified hdfs-site.xml file across the cluster:
  6. hadoop@master1$ for slave in `cat $HADOOP_HOME/conf/slaves`
    rsync -avz $HADOOP_HOME/conf/ $slave:$HADOOP_HOME/conf/

  7. Restart HDFS:
  8. hadoop@master1$ $HADOOP_HOME/bin/stop-dfs.sh
    hadoop@master1$ $HADOOP_HOME/bin/start-dfs.sh

How it works...

We recommend JBOD or RAID0 for the DataNode disks, because you don't need the redundancy of RAID, as HDFS ensures its data redundancy using replication between nodes. So, there is no data loss when a single disk fails.

Which one to choose, J BOD or RAID0? You will theoretically get better performance from a JBOD configuration than from a RAID configuration. This is because, in a RAID configuration, you have to wait for the slowest disk in the array to complete before the entire write operation can complete, which makes the average I/O time equivalent to the slowest disk's I/O time. In a JBOD configuration, operations on a faster disk will complete independently of the slower ones, which makes the average I/O time faster than the slowest one. However, enterprise-class RAID cards might make big differences. You might want to benchmark your JBOD and RAID0 configurations before deciding which one to go with.

For both JBOD and RAID0 configurations, you will have the disks mounted at different paths. The key point here is to set the dfs.data.dirproperty to all the directories created on each disk. The dfs.data.dirproperty specifies where the DataNode should store its local blocks. By setting it to comma-separated multiple directories, DataNode stores its blocks across all the disks in round robin fashion. This causes Hadoop to efficiently spread disk I/O to all the disks.

Do not leave blanks between the directory paths in the dfs.data.dir property value, or it won't work as expected.

You will need to sync the changes across the cluster and restart HDFS to apply them.

There's more...

If you run MapReduce, as MapReduce stores its temporary files on TaskTracker's local file system, you might also like to set up MapReduce to spread its disk I/O:

  1. On each TaskTracker node, create directories on each disk for MapReduce to store its intermediate data files:

    hadoop$ mkdir -p /mnt/d0/mapred/local
    hadoop$ mkdir -p /mnt/d1/mapred/local

    hadoop$ mkdir -p /mnt/dn/mapred/local

  2. Add the following to MapReduce's configuration file (mapred-site.xml):
  3. hadoop@master1$ vi $HADOOP_HOME/conf/mapred-site.xml

  4. Sync the modified mapred-site.xml file across the cluster and restart MapReduce.

MapReduce generates a lot of temporary files on TaskTrackers' local disks during its execution. Like HDFS, setting up multiple directories on different disks helps spread MapReduce disk I/O significantly.

Using network topology script to make Hadoop rack-aware

Hadoop has the concept of "Rack Awareness ". Administrators are able to define the rack of each DataNode in the cluster. Making Hadoop rack-aware is extremely important because:

  • Rack awareness prevents data loss
  • Rack awareness improves network performance

In this recipe, we will describe how to make Hadoop rack-aware and why it is important.

Getting ready

You will need to know the rack to which each of your slave nodes belongs. Log in to the master node as the user who started Hadoop.

How to do it...

The following steps describe how to make Hadoop rack-aware:

  1. Create a topology.sh script and store it under the Hadoop configuration directory. Change the path for topology.data, in line 3, to fit your environment:

    hadoop@master1$ vi $HADOOP_HOME/conf/topology.sh
    while [ $# -gt 0 ] ; do
    exec< /usr/local/hadoop/current/conf/topology.data
    while read line ; do
    ar=( $line )
    if [ "${ar[0]}" = "$nodeArg" ] ; then
    if [ -z "$result" ] ; then
    echo -n "/default/rack "
    echo -n "$result "

    Don't forget to set the execute permission on the script file:

    hadoop@master1$ chmod +x $HADOOP_HOME/conf/topology.sh

  2. Create a topology.data file, as shown in the following snippet; change the IP addresses and racks to fit your environment:
  3. hadoop@master1$ vi $HADOOP_HOME/conf/topology.data /dc1/rack1 /dc1/rack2 /dc1/rack3

  4. Add the following to the Hadoop core configuration file (core-site.xml):
  5. hadoop@master1$ vi $HADOOP_HOME/conf/core-site.xml

  6. Sync the modified files across the cluster and restart HDFS and MapReduce.
  7. Make sure HDFS is now rack-aware. If everything works well, you should be able to find something like the following snippet in your NameNode log file:
        2012-03-10 13:43:17,284 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack3/
        2012-03-10 13:43:17,297 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack1/
        2012-03-10 13:43:17,429 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack2/
  9. Make sure MapReduce is now rack-aware. If everything works well, you should be able to find something like the following snippet in your JobTracker log file:
        2012-03-10 13:50:38,341 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack3/ip-10-160-19-149.us-west-1.compute.internal
        2012-03-10 13:50:38,485 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack1/ip-10-161-30-108.us-west-1.compute.internal
        2012-03-10 13:50:38,569 INFO org.apache.hadoop.net.NetworkTopology: 
        Adding a new node: /dc1/rack2/ip-10-166-221-198.us-west-1.compute.internal

How it works...

The following diagram shows the concept of Hadoop rack awareness:

Each block of the HDFS files will be replicated to multiple DataNodes, to prevent loss of all the data copies due to failure of one machine. However, if all copies of data happen to be replicated on DataNodes in the same rack, and that rack fails, all the data copies will be lost. So to avoid this, the NameNode needs to know the network topology in order to use that information to make intelligent data replication.

As shown in the previous diagram, with the default replication factor of three, two data copies will be placed on the machines in the same rack, and another one will be put on a machine in a different rack. This ensures that a single rack failure won't result in the loss of all data copies. Normally, two machines in the same rack have more bandwidth and lower latency between them than two machines in different racks. With the network topology information, Hadoop is able to maximize network performance by reading data from proper DataNodes. If data is available on the local machine, Hadoop will read data from it. If not, Hadoop will try reading data from a machine in the same rack, and if it is available on neither, data will be read from machines in different racks.

In step 1, we create a topology.sh script. The script takes DNS names as arguments and returns network topology (rack) names as the output. The mapping of DNS names to network topology is provided by the topology.data file, which was created in step 2. If an entry is not found in the topology.data file, the script returns /default/rack as a default rack name.

Note that we use IP addresses, and not hostnames in the topology. data file. There is a known bug that Hadoop does not correctly process hostnames that start with letters "a" to "f". Check HADOOP-6682 for more details.

In step 3, we set the topology.script.file.name property in core-site.xml, telling Hadoop to invoke topology.sh to resolve DNS names to network topology names.

After restarting Hadoop, as shown in the logs of steps 5 and 6, HDFS and MapReduce add the correct rack name as a prefix to the DNS name of slave nodes. This indicates that the HDFS and MapReduce rack awareness work well with the aforementioned settings.

Mounting disks with noatime and nodiratime

If you are mounting disks purely for Hadoop and you use ext3 or ext4, or the XFS file system, we recommend that you mount the disks with the noatime and nodiratime attributes.

If you mount the disks as noatime, the access timestamps are not updated when a file is read on the filesystem. In the case of the nodiratimeattribute, mounting disks does not update the directory inode access times on the filesystem. As there is no more disk I/O for updating the access timestamps, this speeds up filesystem reads.

In this recipe, we will describe why the noatimeand nodiratimeoptions are recommended for Hadoop, and how to mount disks with noatimeand nodiratime.

Getting ready

You will need root privileges on your slave nodes. We assume you have two disks for only Hadoop—/dev/xvdc and /dev/xvdd. The two disks are mounted at /mnt/is1 and /mnt/is2, respectively. Also, we assume you are using the ext3 filesystem.

How to do it...

To mount disks with noatimeand nodiratime, execute the following instructions on every slave node in the cluster:

  1. Add the following to the /etc/fstab file:
  2. $ sudo vi /etc/fstab
    /dev/xvdc /mnt/is1 ext3 defaults,noatime,nodiratime 0 0
    /dev/xvdd /mnt/is2 ext3 defaults,noatime,nodiratime 0 0

  3. Unmount the disks and mount them again to apply the changes:
  4. $ sudo umount /dev/xvdc
    $ sudo umount /dev/xvdd

    $ sudo mount /dev/xvdc
    $ sudo mount /dev/xvdd

  5. Check that the mount options have been applied:
  6. $ mount
    /dev/xvdc on /mnt/is1 type ext3 (rw,noatime,nodiratime)
    /dev/xvdd on /mnt/is2 type ext3 (rw,noatime,nodiratime)

How it works...

As Hadoop (HDFS) manages the metadata (inode) of its filesystem with NameNode, any access time information kept by Hadoop is independent of the atimeattribute of individual blocks. So, the access timestamps in DataNode's local filesystem makes no sense here. That's why we recommend that you mount disks with noatimeand nodiratime, if the disks are used purely for Hadoop. Mounting disks with noatimeand nodiratimesaves the write I/O every time a local file is accessed.

These options are set in the /etc/fstab file. Do not forget to unmount and mount the disks again, for the changes to be applied.

With these options enabled, an improvement in the performance of the HDFS read is expected. As HBase stores its data on HDFS, the HBase read performance is also expected to improve.

There's more...

Another method for optimization is to reduce the percentage of reserved blocks of the ext3 or ext4 filesystems. By default, some of the filesystem blocks are reserved for use by privileged processes. This is to avoid situations wherein user processes fill up the disk spaces that are required by the system daemons, in order to keep working. This is very important for the disks hosting the operating system but less useful for the disks only used by Hadoop.

Usually these Hadoop-only disks have a very large storage space. Reducing the percentage of the reserved blocks can add quite a few storage capacities to the HDFS cluster. Normally, the default percentage of reserved blocks is 5%. It can be reduced to 1%.

Do not reduce the reserved blocks on the disks hosting the operating system.

To achieve this, run the following command on each disk of each slave node in the cluster:

$ sudo tune2fs -m 1 /dev/xvdc
tune2fs 1.41.12 (17-May-2010)
Setting reserved blocks percentage to 1% (1100915 blocks)

Setting vm.swappiness to 0 to avoid swap

Linux moves the memory pages that have not been accessed for some time to the swap space, even if there is enough free memory available. This is called swap out. On the other hand, reading swapped out data from the swap space to memory is called swap in. Swapping is necessary in many situations, but as Java Virtual Machine(JVM) does not behave well under swapping, HBase may run into trouble if swapped. The ZooKeeper session expiring is a typical problem that may be introduced by a swap.

In this recipe, we will describe how to tune the Linux vm.swappinessparameter to avoid swap.

Getting ready

Make sure you have root privileges on your nodes in the cluster.

How to do it...

To tune the Linux parameter to avoid swapping, invoke the following on each node in the cluster:

  1. Execute the following command to set vm.swappinessto 0:
  2. root# sysctl -w vm.swappiness=0
    vm.swappiness = 0

    This change will persist until the next reboot of the server.

  3. Add the following to the /etc/sysctl.conf file, so that the setting will be enabled whenever the system boots:

    root# echo "vm.swappiness = 0" >> /etc/sysctl.conf

How it works...

The vm.swappiness parameter can be used to define how aggressively memory pages are swapped to disk. It accepts any value from 0 to 100—a low value means that the kernel will be less likely to swap, whereas a higher value will make the kernel swap out applications more often. The default value is 60.

We set vm.swappiness to 0 in step 1, which will cause the kernel to avoid swapping processes out of physical memory for as long as possible. This is good for HBase, because HBase processes consume a large amount of memory. A higher vm.swappiness value will make HBase swap a lot and encounter very slow garbage collection. This is likely to result in the RegionServer process being killed off, as the ZooKeeper session times out. We recommend that you set it to 0 or any other low number (for example, 10) and observe the state of swapping.

Note that the value set by the sysctlcommand only persists until the next reboot of the server. You need to set vm.swappinessin the /etc/sysctl.conf file, so that the setting is enabled whenever the system reboots.

Java GC and HBase heap settings

As HBase runs within JVM, the JVM Garbage Collection(GC) settings are very important for HBase to run smoothly, with high performance. In addition to the general guideline for configuring HBase heap settings, it's also important to have the HBase processes output their GC logs and then tune the JVM settings based on the GC logs' output.

We will describe the most important HBase JVM heap settings as well as how to enable and understand GC logging, in this recipe. We will also cover some general guidelines to tune Java GC settings for HBase.

Getting ready

Log in to your HBase region server.

How to do it...

The following are the recommended Java GC and HBase heap settings:

  1. Give HBase enough heap size by editing the hbase-env.sh file. For example, the following snippet configures a 8000-MB heap size for HBase:

    $ vi $HBASE_HOME/conf/hbase-env.sh
    export HBASE_HEAPSIZE=8000

  2. Enable GC logging by using the following command:
  3. $ vi $HBASE_HOME/conf/hbase-env.sh
    export HBASE_OPTS="$HBASE_OPTS -verbose:gc -XX:+PrintGCDetails
    -XX:+PrintGCTimeStamps -Xloggc:/usr/local/hbase/logs/gc-hbase.log"

  4. Add the following code to start the Concurrent-Mark-Sweep GC(CMS) earlier than the default:

    $ vi $HBASE_HOME/conf/hbase-env.sh
    export HBASE_OPTS="$HBASE_OPTS -XX:CMSInitiatingOccupancyFraction=60"

  5. Sync the changes across the cluster and restart HBase.
  6. Check that the GC logs were output to the specified log file (/usr/local/hbase/logs/gc-hbase.log).
    The GC log looks like the following screenshot:


How it works...

In step 1, we configure the HBase heap memory size. By default, HBase uses a heap size of 1GB, which is too low for modern machines. A heap size of more than 4GB is good for HBase, while our recommendation is 8GB or larger, but under 16GB.

In step 2, we enable the JVM logging. With that setting, you will get a region server's JVM logs, similar to what we have shown in step 5. Basic knowledge about JVM memory allocation and garbage collection is required to understand the log output. The following is a diagram of the JVM generational garbage collection system:

There are three heap generations: the Perm(or Permanent) generation , the Old Generation (or Tenured) generation , and the Young Generation. The young generation section consists of three separate spaces: the Eden space and two survivor spaces, S0 and S1.

Usually, objects are allocated in the Eden space of the young generation. If an allocation fails (the Eden is full), all Java threads are halted and a young generation GC (Minor GC) is invoked. All surviving objects in the young generation (Eden and S0 space) are copied to the S1 space. If the S1 space is full, objects are copied (promoted) to the old generation. The old generation is collected (Major/Full GC) when a promotion fails. The permanent and old generations are usually collected together. The permanent generation is used to hold class and method definitions for objects.

Back to our sample in step 5, the minor GC output for the aformentioned options is produced in the following form:

<timestamp>: [GC [<collector>: <starting occupancy1> -> <ending occupancy1>, <pause time1> secs] 
<starting occupancy3> -> <ending occupancy3>, <pause time3> secs] 
[Times: <user time> <system time>, <real time>]

In this output:

  • <timestamp> is the times at which the GCs happen, relative to the start of the application.
  • <collector> is an internal name for the collector used in the minor collection.
  • <starting occupancy1> is the occupancy of the young generation before the collection.
  • <ending occupancy1> is the occupancy of the young generation after the collection.
  • <pause time1> is the pause time in seconds for the minor collection.
  • <starting occupancy3> is the occupancy of the entire heap before the collection.
  • <ending occupancy3> is the occupancy of the entire heap after the collection.
  • <pause time3> is the pause time for the entire garbage collection. This would include the time for a major collection.
  • [Time:] explains the time spend in GC collection, user time, system time, and real time.

The first line of our output in step 5 indicates a minor GC, which pauses the JVM for 0.0764200 seconds. It has reduced the young generation space from about 14.8MB to 1.6MB.

Following that, we see the CMS GC logs. HBase uses CMS GC as its default garbage collector for the old generation.

CMS GC performs the following steps :

  1. Initial mark
  2. Concurrent marking
  3. Remark
  4. Concurrent sweeping

CMS halts the application's threads only during the initial mark and remark phases. During the concurrent marking and sweeping phases, the CMS thread runs along with the application's threads.

The second line in the example indicates that the CMS initial mark took 0.0100050 seconds and the concurrent marking has taken 6.496 seconds. Note that it is a concurrent marking; Java was not paused.

There is a pause at the line that starts with 1441.435: [GC[YG occupancy:…] in the earlier screenshot of the GC log. The pause here is 0.0413960 seconds to remark the heap. After that, you can see the sweep starts. The CMS sweep took 3.446 seconds, but the heap size didn't change that much (it kept on occupying about 150MB) here.

The tuning point here is to keep all these pauses low. To keep the pauses low, you may need to adjust the young generation space's size via the -XX:NewSize and -XX:MaxNewSize JVM flags, to set them to relative small values (for example, up to several hundred MB). If the server has more CPU power, we recommend using the Parallel New Collector by setting the -XX:+UseParNewGC option. You may also want to tune the parallel GC thread number for the young generation, via the -XX:ParallelGCThreads JVM flag.

We recommend adding the aforementioned settings to the HBASE_REGIONSERVER_OPTS variable , instead of the HBASE_OPTS variable in the hbase-env.sh file. The HBASE_REGIONSERVER_OPTS variable only affects the region server processes, which is good, because the HBase master neither handles heavy tasks nor participates in the data process.

For the old generation, the concurrent collection (CMS) generally cannot be sped up, but it can be started earlier. CMS starts running when the percentage of allocated space in the old generation crosses a threshold. This threshold is automatically calculated by the collector. For some situations, especially during loading, if the CMS starts too late, HBase may run into full garbage collection. To avoid this, we recommend setting the -XX:CMSInitiatingOccupancyFraction JVM flag to explicitly specify at what percentage the CMS should be started, as what we did in step 3. Starting at 60 or 70 percent is a good practice. When using CMS for an old generation, the default young generation GC will be set to the Parallel New Collector.

There's more...

If you are using an HBase version prior to 0.92, consider enabling the MemStore-Local Allocation Buffer to prevent old generation heap fragmentation under heavy write loads:

$ vi $HBASE_HOME/conf/hbase-site.xml

This feature is enabled by default in HBase 0.92.

Using compression

One of the most important features of HBase is the use of data compression. It's important because:

  • Compression reduces the number of bytes written to/read from HDFS
  • Saves disk usage
  • Improves the efficiency of network bandwidth when getting data from a remote server

HBase supports the GZip and LZO codec. Our suggestion is to use the LZO compression algorithm because of its fast data decompression and low CPU usage. As a better compression ratio is preferred for the system, you should consider GZip.


Unfortunately, HBase cannot ship with LZO because of a license issue. HBase is Apache-licensed, whereas LZO is GPL-licensed. Therefore, we need to install LZO ourselves. We will use the hadoop-lzo library, which brings splittable LZO compression to Hadoop.

In this recipe, we will describe how to install LZO and how to configure HBase to use LZO compression.

Getting ready

Make sure Java is installed on the machine on which hadoop-lzo is to be built. Apache Ant is required to build hadoop-lzo from source. Install Ant by running the following command:

$ sudo apt-get -y install ant

All nodes in the cluster need to have native LZO library installed. You can install it by using the following command:

$ sudo apt-get -y install liblzo2-dev

How to do it...

We will use the hadoop-lzo library to add LZO compression support to HBase:

  1. Get the latest hadoop-lzo source from https://github.com/toddlipcon/hadoop-lzo.
  2. Build the native and Java hadoop-lzo libraries from source. Depending on your OS, you should either choose to build 32-bit or 64-bit binaries. For example, to build 32-bit binaries, run the following commands:
  3. $ export JAVA_HOME="/usr/local/jdk1.6"
    $ export CFLAGS="-m32"
    $ export CXXFLAGS="-m32"
    $ cd hadoop-lzo
    $ ant compile-native
    $ ant jar

    These commands will create the hadoop-lzo/build/native directory and the hadoop-lzo/build/hadoop-lzo-x.y.z.jar file. In order to build 64-bit binaries, just change the value of CFLAGS and CXXFLAGS to -m64.

  4. Copy the built libraries to the $HBASE_HOME/lib and $HBASE_HOME/lib/native directories on your master node:
  5. hadoop@master1$ cp hadoop-lzo/build/hadoop-lzo-x.y.z.jar $HBASE_
    hadoop@master1$ mkdir $HBASE_HOME/lib/native/Linux-i386-32
    hadoop@master1$ cp hadoop-lzo/build/native/Linux-i386-32/lib/*

    For a 64-bit OS, change Linux-i386-32(in the previous step) to Linux-amd64-64.

  6. Add the configuration of hbase.regionserver.codecs to your hbase-site.xml file:
  7. hadoop@master1$ vi $HBASE_HOME/conf/hbase-site.xml

  8. Sync the $HBASE_HOME/conf and $HBASE_HOME/lib directories across the cluster.
  9. HBase ships with a tool to test whether compression is set up properly. Use this tool to test the LZO setup on each node of the cluster. If everything is configured accurately, you will get the SUCCESS output:
  10. hadoop@client1$ $HBASE_HOME/bin/hbase org.apache.hadoop.hbase.util.CompressionTest /tmp/lzotest lzo
    12/03/11 11:01:08 INFO hfile.CacheConfig: Allocating LruBlockCache with maximum size 249.6m
    12/03/11 11:01:08 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
    12/03/11 11:01:08 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev Unknown build revision]
    12/03/11 11:01:08 INFO compress.CodecPool: Got brand-new compressor
    12/03/11 11:01:18 INFO compress.CodecPool: Got brand-new decompressor

  11. Test the configuration by creating a table with LZO compression, and verify it in HBase Shell:
  12. $ hbase> create 't1', {NAME => 'cf1', COMPRESSION => 'LZO'}
    $ hbase> describe 't1'
    {NAME => 't1', FAMILIES => [{NAME => 'cf1', BLOOMFILTER =>
    'NONE', true
    MIN_VERSIONS => '0', TTL => '2147483647', BLOCKSIZE => '65536',
    IN _MEMORY => 'false', BLOCKCACHE => 'true'}]}
    1 row(s) in 0.0790 seconds

How it works...

We built the hadoop-lzo Java and native libraries and installed them under the $HBASE_HOME/lib and $HBASE_HOME/lib/native directories, respectively. By adding LZO compression support, HBase StoreFiles (HFiles) will use LZO compression on blocks as they are written. HBase uses the native LZO library to perform the compression, while the native library is loaded by HBase via the hadoop-lzo Java library that we built.

In order to avoid starting a node with any codec missing or misinstalled, we add LZO to the hbase.regionserver.codecs setting in the hbase-site.xml file. This setting will cause a failed startup of a region server if LZO is not installed properly. If you see logs such as "Could not load native gpl library", there is an issue with the LZO installation. In order to fix it, make sure that the native LZO libraries are installed and the path is configured properly.

A compression algorithm is specified on a per-column family basis. As shown in step 7, we create a table, t1, with a single column family, cf1, which uses LZO compression on it. Although it adds a read-time penalty as the data blocks probably is decompressed when reading, LZO is fast enough as a real-time compression library. We recommend using LZO as the default compression algorithm in production HBase.

There's more...

Another compression option is to use the recently released Snappy compression library and Hadoop Snappy integration. As the setup is basically the same as what we did before, we will skip the details. Check the following URL to know how to add Snappy compression to HBase:


Managing compactions

An HBase table has the following physical storage structure:

It consists of multiple regions. While a region may have several Stores, each holds a single column family. An edit first writes to the hosting region store's in-memory space, which is called MemStore. When the size of MemStore reaches a threshold, it is flushed to StoreFiles on HDFS.

As data increases, there may be many StoreFiles on HDFS, which is not good for its performance. Thus, HBase will automatically pick up a couple of the smaller StoreFiles and rewrite them into a bigger one. This process is called minor compaction. For certain situations, or when triggered by a configured interval (once a day by default), major compaction runs automatically. Major compaction will drop the deleted or expired cells and rewrite all the StoreFiles in the Store into a single StoreFile; this usually improves the performance.

However, as major compaction rewrites all of the Stores' data, lots of disk I/O and network traffic might occur during the process. This is not acceptable on a heavy load system. You might want to run it at a lower load time of your system.

In this recipe, we will describe how to turn off this automatic major compaction feature, and run it manually.

Getting ready

Log in to your HBase master server as the user who starts the cluster.

How to do it...

The following steps describe how to disable automatic major compaction:

  1. Add the following to hbase-site.xml file:
  2. $ vi $HBASE_HOME/conf/hbase-site.xml

  3. Sync the changes across the cluster and restart HBase.
  4. With the aforementioned setting, automatic major compaction will be disabled; you will now need to run it explicitly.
  5. In order to manually run a major compaction on a particular region through HBase Shell, run the following command:
  6. $ echo "major_compact 'hly_temp,,1327118470453.5ef67f6d2a792fb0bd7
    37863dc00b6a7.'" | $HBASE_HOME/bin/hbase shell

    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell Version 0.92.0, r1231986, Tue Jan 17 02:30:24 UTC 2012

    major_compact 'hly_temp,,1327118470453.5ef67f6d2a792fb0bd737863dc0

    0 row(s) in 1.7070 seconds

How it works...

The hbase.hregion.majorcompaction property specifies the time (in milliseconds) between major compactions of all the StoreFiles in a region. The default value is 86400000, which means once a day. We set it to 0 in step 1 to disable the automatic major compaction. This will prevent a major compaction from running during a heavy load time, for example when the MapReduce jobs are running over the HBase cluster.

On the other hand, major compaction is required to help performance. In step 4, we've shown an example of how to manually trigger a major compaction on a particular region, via HBase Shell. In this example, we have passed a region name to the major_compact command to invoke the major compaction only on a single region. It is also possible to run major compaction on all regions of a table, by passing the table name to the command. The major_compact command queues the specified tables or regions for major compaction; this will be executed in the background by the region server hosting them.

As we mentioned earlier, you might want to execute major compaction manually only during a low load time. This can be done easily by invoking the major_compact command from a cron job.

There's more...

Another approach to invoke major compaction is to use the majorCompact API provided by the org.apache.hadoop.hbase.client.HBaseAdmin class. It is easy to call this API in Java, thus you can manage complex major compaction scheduling from Java.

Managing a region split

Usually an HBase table starts with a single region. However, as data keeps growing and the region reaches its configured maximum size, it is automatically split into two halves, so that they can handle more data. The following diagram shows an HBase region splitting:

This is the default behavior of HBase region splitting. This mechanism works well for many cases, however there are situations wherein it encounters problems, such as the split/ compaction storms issue.

With a roughly uniform data distribution and growth, eventually all the regions in the table will need to be split at the same time. Immediately following a split, compactions will run on the daughter regions to rewrite their data into separate files. This causes a large amount of disk I/O and network traffic.

In order to avoid this situation, you can turn off automatic splitting and manually invoke it. As you can control at what time to invoke the splitting, it helps spread the I/O load. Another advantage is that, manually splitting lets you have better control of the regions, which helps you trace and fix region-related issues.

In this recipe, we will describe how to turn off automatic region splitting and invoke it manually.

Getting ready

Log in to your HBase master server as the user who starts the cluster.

How to do it...

To turn off automatic region splitting and invoke it manually, follow these steps:

  1. Add the following to the hbase-site.xml file:
  2. $ vi $HBASE_HOME/conf/hbase-site.xml

  3. Sync the changes across the cluster and restart HBase.
  4. With the aforementioned setting, region splitting will not happen until a region's size reaches the configured 100GB threshold. You will need to explicitly trigger it on selected regions.
  5. To run a region split through HBase Shell, use the following command:
  6. $ echo "split 'hly_temp,,1327118470453.5ef67f6d2a792fb0bd737863dc00b6a7.'" | $HBASE_HOME/bin/hbase shell
    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell Version 0.92.0, r1231986, Tue Jan 17 02:30:24 UTC 2012
    split 'hly_temp,,1327118470453.5ef67f6d2a792fb0bd737863dc00b6a7.'
    0 row(s) in 1.6810 seconds

How it works...

The hbase.hregion.max.filesize property specifies the maximum region size in bytes. By default, the value is 1GB (256MB for versions prior to HBase 0.92), which means that when a region exceeds this size, it is split into two. We set this maximum region size to 100GB in step 1, which is a very high number.

As splitting won't happen until regions reach the 100GB upper boundary, we need to invoke it explicitly. In step 4, we invoke splitting on a specified region via HBase Shell, using the split command.

Do not forget to split large regions. A region is the basic unit of data distribution and balancing in HBase. Regions should be split into proper size and at low load time.

On the other hand, too much splitting is not good. Having too many regions on a region server lowers its performance.

You might also want to trigger major compaction and balancing, after manually splitting regions.

There's more...

The setting that we set previously causes the entire cluster to have a default maximum region size of 100GB. Besides changing the entire cluster, it is also possible to specify the MAX_FILESIZE property on a column family basis, when creating a table:

  $ hbase> create 't1', {NAME => 'cf1', MAX_FILESIZE => '107374182400'}

Like major compaction, you can also use the split API provided by the org.apache.hadoop.hbase.client.HBaseAdmin Java class.

Books to Consider

comments powered by Disqus

An Introduction to 3D Printing

Explore the future of manufacturing and design  - read our guide to 3d printing for free