HBase High Performance Cookbook

4 (1 reviews total)
By Ruchir Choudhry
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Configuring HBase

About this book

Apache HBase is a non-relational NoSQL database management system that runs on top of HDFS. It is an open source, disturbed, versioned, column-oriented store and is written in Java to provide random real-time access to big Data.

We’ll start off by ensuring you have a solid understanding the basics of HBase, followed by giving you a thorough explanation of architecting a HBase cluster as per our project specifications. Next, we will explore the scalable structure of tables and we will be able to communicate with the HBase client. After this, we’ll show you the intricacies of MapReduce and the art of performance tuning with HBase. Following this, we’ll explain the concepts pertaining to scaling with HBase. Finally, you will get an understanding of how to integrate HBase with other tools such as ElasticSearch.

By the end of this book, you will have learned enough to exploit HBase for boost system performance.

Publication date:
January 2017
Publisher
Packt
Pages
350
ISBN
9781783983063

 

Chapter 1. Configuring HBase

In this chapter, we will cover the following topics:

  • Configuring and deploying HBase

  • Using the file system

  • Administering clusters

  • Managing clusters

 

Introduction


HBase is inspired by the Google big table architecture, and is fundamentally a non-relational, open source, and column-oriented distributed NoSQL. Written in Java, it is designed and developed by many engineers under the framework of Apache Software Foundation. Architecturally it sits on Apache Hadoop and runs by using Hadoop Distributed File System (HDFS) as its foundation.

It is a column-oriented database, empowered by a fault-tolerant distributed file structure known as HDFS. In addition to this, it also provides very advanced features, such as auto sharding, load-balancing, in-memory caching, replication, compression, near real-time lookups, strong consistency (using multi-version). It uses the latest concepts of block cache and bloom filter to provide faster response to online/real-time request. It supports multiple clients running on heterogeneous platforms by providing user-friendly APIs.

In this chapter, we will discuss how to effectively set up mid and large size HBase cluster on top of Hadoop/HDFS framework.

This chapter will help you set up HBase on a fully distributed cluster. For cluster setup, we will consider REH (RedHat Enterprise-6.2 Linux 64 bit); for our setup we will be using six nodes.

 

Configuring and deploying HBase


Before we start HBase in fully distributed mode, we will be setting up first Hadoop-2.2.0 in a distributed mode, and then on top of Hadoop cluster we will set up HBase because HBase stores data in HDFS.

Getting ready

The first step will be to create a directory at user/u/HBase B and download the TAR file from the location given later. The location can be local, mount points or in cloud environments; it can be block storage:

wget wget –b http://apache.mirrors.pair.com/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz

Tip

This –b option will download the tar file as a background process. The output will be piped to wget-log. You can tail this log file using tail -200f wget-log.

Untar it using the following commands:

tar -xzvf hadoop-2.2.0.tar.gz

This is used to untar the file in a folder hadoop-2.2.0 in your current directory location.

Once the untar process is done, for clarity it's recommended use two different folders one for NameNode and other for DataNode.

Tip

I am assuming app is a user and app is a group on a Linux platform which has access to read/write/execute access to the locations, if not please create a user app and group app if you have sudo su - or root/admin access, in case you don't have please ask your administrator to create this user and group for you in all the nodes and directorates you will be accessing.

To keep the NameNodeData and the DataNodeData for clarity let's create two folders by using the following command, inside /u/HBase B:

Mkdir NameNodeData DataNodeData

NameNodeData will have the data which is used by the name nodes and DataNodeData will have the data which will be used by the data nodes:

ls –ltr will show the below results.
drwxrwxr-x 2 app app  4096 Jun 19 22:22 NameNodeData
drwxrwxr-x 2 app app  4096 Jun 19 22:22 DataNodeData

-bash-4.1$ pwd
/u/HBase B/hadoop-2.2.0
-bash-4.1$ ls -ltr
total 60K
drwxr-xr-x 2 app app 4.0K Mar 31 08:49 bin
drwxrwxr-x 2 app app 4.0K Jun 19 22:22 DataNodeData
drwxr-xr-x 3 app app 4.0K Mar 31 08:49 etc

The steps in choosing Hadoop cluster are:

  1. Hardware details required for it

  2. Software required to do the setup

  3. OS required to do the setup

  4. Configuration steps

HDFS core architecture is based on master/slave, where an HDFS cluster comprises of solo NameNode, which is essentially used as a master node, and owns the accountability for that orchestrating, handling the file system, namespace, and controlling access to files by client. It performs this task by storing all the modifications to the underlying file system and propagates these changes as logs, appends to the native file system files, and edits. SecondaryNameNode is designed to merge the fsimage and the edits log files regularly and controls the size of edit logs to an acceptable limit.

In a true cluster/distributed environment, it runs on a different machine. It works as a checkpoint in HDFS.

We will require the following for the NameNode:

Components

Details

Used for nodes/systems

Operating System

Redhat-6.2 Linux x86_64 GNU/Linux, or other standard linux kernel.

All the setup for Hadoop/HBase and other components used

Hardware /CPUS

16 to 32 CPU cores

NameNode/Secondary NameNode

2 quad-hex-/octo-core CPU

DataNodes

Hardware/RAM

128 to 256 GB, In special cases 128 GB to 512 GB RAM

NameNode/Secondary NameNodes

128 GB -512 GB of RAM

DataNodes

Hardware/storage

It's pivotal to have NameNode server on robust and reliable storage platform as it responsible for many key activities like edit-log journaling. As the importance of these machines are very high and the NameNodes plays a central role in orchestrating everything, thus RAID or any robust storage device is acceptable.

NameNode/Secondary Namenodes

2 to 4 TB hard disk in a JBOD

DataNodes

RAID is nothing but a random access inexpensive drive or independent disk. There are many levels of RAID drives, but for master or a NameNode, RAID 1 will be enough.

JBOD stands for Just a bunch of Disk. The design is to have multiple hard drives stacked over each other with no redundancy. The calling software needs to take care of the failure and redundancy. In essence, it works as a single logical volume:

Before we start for the cluster setup, a quick recap of the Hadoop setup is essential with brief descriptions.

How to do it…

Let's create a directory where you will have all the software components to be downloaded:

  1. For the simplicity, let's take it as /u/HBase B.

  2. Create different users for different purposes.

  3. The format will be as follows user/group, this is essentially required to differentiate different roles for specific purposes:

    • Hdfs/hadoop is for handling Hadoop-related setup

    • Yarn/hadoop is for yarn related setup

    • HBase /hadoop

    • Pig/hadoop

    • Hive/hadoop

    • Zookeeper/hadoop

    • Hcat/hadoop

  4. Set up directories for Hadoop cluster. Let's assume /u as a shared mount point. We can create specific directories that will be used for specific purposes.

    Tip

    Please make sure that you have adequate privileges on the folder to add, edit, and execute commands. Also, you must set up password less communication between different machines like from name node to the data node and from HBase master to all the region server nodes.

    Once the earlier-mentioned structure is created; we can download the tar files from the following locations:

    -bash-4.1$ ls -ltr
    total 32
    
    drwxr-xr-x  9 app app 4096 hadoop-2.2.0
    drwxr-xr-x 10 app app 4096 zookeeper-3.4.6
    drwxr-xr-x 15 app app 4096 pig-0.12.1
    
    drwxrwxr-x  7 app app 4096 HBase -0.98.3-hadoop2
    drwxrwxr-x  8 app app 4096 apache-hive-0.13.1-bin
    drwxrwxr-x  7 app app 4096 Jun 30 01:04 mahout-distribution-0.9
    
  5. You can download these tar files from the following location:

    wget –o https://archive.apache.org/dist/HBase /HBase -0.98.3/HBase -0.98.3-hadoop1-bin.tar.gz
    wget -o https://www.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
    wget –o https://archive.apache.org/dist/mahout/0.9/mahout-distribution-0.9.tar.gz
    wget –o https://archive.apache.org/dist/hive/hive-0.13.1/apache-hive-0.13.1-bin.tar.gz
    wget -o https://archive.apache.org/dist/pig/pig-0.12.1/pig-0.12.1.tar.gz
    

    Here, we will list the procedure to achieve the end result of the recipe. This section will follow a numbered bullet form. We do not need to give the reason that we are following a procedure. Numbered single sentences would do fine.

Let's assume that there is a /u directory and you have downloaded the entire stack of software from: /u/HBase B/hadoop-2.2.0/etc/hadoop/ and look for the file core-site.xml.

Place the following lines in this configuration file:

<configuration>
<property>
    <name>fs.default.name</name>
    <value>hdfs://addressofbsdnsofmynamenode-hadoop:9001</value>
 </property>
</configuration>

Tip

You can specify a port that you want to use, and it should not clash with the ports that are already in use by the system for various purposes.

Save the file. This helps us create a master /NameNode.

Now, let's move to set up SecondryNodes, let's edit /u/HBase B/hadoop-2.2.0/etc/hadoop/ and look for the file core-site.xml:

<property>
  <name>fs.defaultFS</name>
  <value>hdfs://custome location of your hdfs</value>
</property>
<configuration>
<property>       
    <name>fs.checkpoint.dir</name>       
    <value>/u/HBase B/dn001/hadoop/hdf/secdn
        /u/HBase B/dn002/hadoop/hdfs/secdn 
</value>   
</property>
</configuration>

Note

The separation of the directory structure is for the purpose of a clean separation of the HDFS block separation and to keep the configurations as simple as possible. This also allows us to do a proper maintenance.

Now, let's move towards changing the setup for hdfs; the file location will be /u/HBase B/hadoop-2.2.0/etc/hadoop/hdfs-site.xml.

Add these properties in hdfs-site.xml:

For NameNode:

<property>         
<name>dfs.name.dir</name>         
<value>
/u/HBase B/nn01/hadoop/hdfs/nn,/u/HBase B/nn02/hadoop/hdfs/nn
</value>     
</property>

For DataNode:

<property>         
<name>dfs.data.dir</name>         
<value>
/u/HBase B/dnn01/hadoop/hdfs/dn,/HBase B/u/dnn02/hadoop/hdfs/dn
</value> 
</property>

Now, let's go for NameNode for http address or to access using http protocol:

<property> 
<name>dfs.http.address</name> 
<value>yournamenode.full.hostname:50070</value>
</property>
<property> 
<name>dfs.secondary.http.address</name>
<value>
secondary.yournamenode.full.hostname:50090
</value>     
</property>

We can go for the https setup for the NameNode too, but let's keep it optional for now:

Let's set up the yarn resource manager:

  1. Let's look for Yarn setup:

    /u/HBase B/hadoop-2.2.0/etc/hadoop/ yarn-site.xml
  2. For resource tracker a part of yarn resource manager:

    <property>
      <name>yarn.yourresourcemanager.resourcetracker.address</name> 
    <value>youryarnresourcemanager.full.hostname:8025</value>
    </property>
  3. For resource schedule part of yarn resource scheduler:

    <property>
    <name>yarn.yourresourcemanager.scheduler.address</name>
    <value>yourresourcemanager.full.hostname:8030</value>
    </property>
  4. For scheduler address:

    <property>
    <name>yarn.yourresourcemanager.address</name>
    <value>yourresourcemanager.full.hostname:8050</value>
    </property>
  5. For scheduler admin address:

    <property>
    <name>yarn.yourresourcemanager.admin.address</name>
    <value>yourresourcemanager.full.hostname:8041</value>
    </property>
  6. To set up a local dir:

    <property>         <name>yarn.yournodemanager.local-dirs</name>         <value>/u/HBase /dnn01/hadoop/hdfs/yarn,/u/HBase B/dnn02/hadoop/hdfs/yarn </value>    </property>
  7. To set up a log location:

    <property>
    <name>
    yarn.yournodemanager.logdirs
    </name>         
    <value>/u/HBase B/var/log/hadoop/yarn</value> 
    </property>

    This completes the configuration changes required for Yarn.

Now, let's make the changes for Map reduce:

  1. Let's open the mapred-site.xml:

    /u/HBase B/hadoop-2.2.0/etc/hadoop/mapred-site.xml
  2. Now, let's place this property configuration setup in the mapred-site.xml and place it between the following:

    <configuration >
    </configurations >
    <property><name>mapreduce.yourjobhistory.address</name>
    <value>yourjobhistoryserver.full.hostname:10020</value>
    </property>
  3. Once we have configured Map reduce job history details, we can move on to configure HBase .

  4. Let's go to this path /u/HBase B/HBase -0.98.3-hadoop2/conf and open HBase -site.xml.

    You will see a template having the following:

    <configuration >
    </configurations >
  5. We need to add the following lines between the starting and ending tags:

    <property> 
    <name>HBase .rootdir</name> 
    <value>hdfs://HBase .yournamenode.full.hostname:8020/apps/HBase /data
    </value>
    </property>
    
    
    <property>
    <name>HBase .yourmaster.info.bindAddress</name> 
    <value>$HBase .yourmaster.full.hostname</value> 
    </property> 
  6. This competes the HBase changes.

ZooKeeper: Now, let's focus on the setup of ZooKeeper. In distributed env, let's go to this location and rename the zoo_sample.cfg to zoo.cfg:

/u/HBase B/zookeeper-3.4.6/conf

Open zoo.cfg by vi zoo.cfg and place the details as follows; this will create two instances of zookeeper on different ports:

yourzooKeeperserver.1=zoo1:2888:3888 
yourZooKeeperserver.2=zoo2:2888:3888

If you want to test this setup locally, please use different port combinations. In a production-like setup as mentioned earlier, yourzooKeeperserver.1=zoo1:2888:3888 is server.id=host:port:port:

yourzooKeeperserver.1= server.id
zoo1=host
2888=port
3888=port

Atomic broadcasting is an atomic messaging system that keeps all the servers in sync and provides reliable delivery, total order, casual order, and so on.

Region servers: Before concluding it, let's go through the region server setup process.

Go to this folder /u/HBase B/HBase -0.98.3-hadoop2/conf and edit the regionserver file.

Specify the region servers accordingly:

RegionServer1 
RegionServer2 
RegionServer3 
RegionServer4

Note

RegionServer1 equal to the IP or fully qualified CNAME of 1 Region server.

You can have as many region servers (1. N=4 in our case), but its CNAME and mapping in the region server file need to be different.

Copy all the configuration files of HBase and ZooKeeper to the relative host dedicated for HBase and ZooKeeper. As the setup is in a fully distributed cluster mode, we will be using a different host for HBase and its components and a dedicated host for ZooKeeper.

Next, we validate the setup we've worked on by adding the following to the bashrc, this will make sure later we are able to configure the NameNode as expected:

Tip

It preferred to use it in your profile, essentially /etc/profile; this will make sure the shell which is used is only impacted.

Now let's format NameNode:

Sudo su $HDFS_USER 
/u/HBase B/hadoop-2.2.0/bin/hadoop namenode -format 
HDFS is implemented on the existing local file system of your cluster. When you want to start the Hadoop setup first time you need to start with a clean slate and hence any existing data needs to be formatted and erased.

Before formatting we need to take care of the following.

Check whether there is a Hadoop cluster running and using the same HDFS; if it's done accidentally all the data will be lost.

/u/HBase B/hadoop-2.2.0/sbin/hadoop-daemon.sh --config
$HADOOP_CONF_DIR start namenode

Now let's go to the SecondryNodes:

Sudo su $HDFS_USER
/u/HBase B/hadoop-2.2.0/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR start secondarynamenode

Repeating the same procedure in DataNode:

Sudo su $HDFS_USER
/u/HBase B/hadoop-2.2.0/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR start datanode
Test 01>

See if you can reach from your browser http://namenode.full.hostname:50070:

Test 02> sudo su $HDFS_USER touch /tmp/hello.txt

Now, hello.txt file will be created in tmp location:

/u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -mkdir -p /app
/u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -mkdir -p /app/apphduser

This will create a specific directory for this application user in the HDFS FileSystem location(/app/apphduser)
/u/HBase B/hadoop-2.2.0/bin/hadoop dfs -copyFromLocal /tmp/hello.txt /app/apphduser 
/u/HBase B/hadoop-2.2.0/bin/hadoop dfs –ls /app/apphduser

Tip

apphduser is a directory which is created in hdfs for a specific user.

So that the data is separated based on the users, in a true production env many users will be using it.

Tip

You can also use hdfs dfs –ls / commands if it shows hadoop command as depricated.

You must see hello.txt once the command executes:

Test 03> Browse http://datanode.full.hostname:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=/&nnaddr=$datanode.full.hostname:8020

Tip

It is important to change the data host name and other parameters accordingly.

You should see the details on the DataNode. Once you hit the preceding URL you will get the following screenshot:

On the command line it will be as follows:

Validate Yarn/MapReduce setup and execute this command from the resource manager:

<login as $YARN_USER> /u/HBase B/hadoop-2.2.0/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start resourcemanager

Execute the following command from NodeManager:

<login as $YARN_USER >
/u/HBase B/hadoop-2.2.0/sbin /yarn-daemon.sh --config 
$HADOOP_CONF_DIR start nodemanager

Executing the following commands will create the directories in the hdfs and apply the respective access rights:

Cd u/HBase B/hadoop-2.2.0/bin
hadoop fs -mkdir /app-logs // creates the dir in HDFS
hadoop fs -chown $YARN_USER /app-logs //changes the ownership
hadoop fs -chmod 1777 /app-logs // explained in the note section
Execute MapReduce

Start jobhistory servers:

<login as $MAPRED_USER>
/u/HBase B/hadoop-2.2.0/sbin/mr-jobhistory-daemon.sh start historyserver --config $HADOOP_CONF_DIR

Let's have a few tests to be sure we have configured properly:

Test 01: From the browser or from curl use the link to browse: http://yourresourcemanager.full.hostname:8088/.

Test 02:

Sudo su $HDFS_USER
/u/HBase B/hadoop-2.2.0/bin/hadoop jar /u/HBase B/hadoop-2.2.0/hadoop-mapreduce/hadoop-mapreduce-examples-2.0.2.1-alpha.jar teragen 100 /test/10gsort/input 
/u/HBase B/hadoop-2.2.0/bin/hadoop jar /u/HBase B/hadoop-2.2.0/hadoop-mapreduce/hadoop-mapreduce-examples-2.0.2.1-alpha.jar 

Validate the HBase setup:

Login as $HDFS_USER
/u/HBase B/hadoop-2.2.0/bin/hadoop fs –mkdir -p /apps/HBase 
/u/HBase B/hadoop-2.2.0/bin/hadoop fs –chown app:app –R  /apps/HBase 

Now login as $HBase _USER:

/u/HBase B/HBase -0.98.3-hadoop2/bin/HBase -daemon.sh –-config $HBase _CONF_DIR start master

This command will start the master node. Now let's move to HBase Region server nodes:

/u/HBase B/HBase -0.98.3-hadoop2/bin/HBase -daemon.sh –-config $HBase _CONF_DIR start regionserver

This command will start the regionservers:

Note

For a single machine, direct sudo ./HBase master start can also be used.

Please check the logs in case of any logs at this location /opt/HBase B/HBase -0.98.5-hadoop2/logs.

You can check the log files and check for any errors:

Now let's login using:

Sudo su- $HBase _USER
/u/HBase B/HBase -0.98.3-hadoop2/bin/HBase shell

We will connect HBase to the master.

Validate the ZooKeeper setup. If you want to use an external zookeeper, make sure there is no internal HBase based zookeeper running while working with the external zookeeper or existing zookeeper and is not managed by HBase :

For this you have to edit /opt/HBase B/HBase -0.98.5-hadoop2/conf/ HBase -env.sh.

Change the following statement (HBase _MANAGES_ZK=false):

# Tell HBase whether it should manage its own instance of Zookeeper or not.

export HBase _MANAGES_ZK=true.

Once this is done we can add zoo.cfg to HBase 's CLASSPATH.

HBase looks into zoo.cfg as a default lookup for configurations

dataDir=/opt/HBase B/zookeeper-3.4.6/zooData 

# this is the place where the zooData will be present

server.1=172.28.182.45:2888:3888

# IP and port for server 01

server.2=172.29.75.37:4888:5888

# IP and port for server 02

You can edit the log4j.properties file which is located at /opt/HBase B/zookeeper-3.4.6/conf and point the location where you want to keep the logs.

# Define some default values that can be overridden by system properties:

zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=. # you can specify the location here 
zookeeper.tracelog.file=zookeeper_trace.log

Once this is done you start zookeeper with the following command:

-bash-4.1$ sudo /u/HBase B/zookeeper-3.4.6/bin/zkServer.sh start
Starting zookeeper ... STARTED

You can also pipe the log to the ZooKeeper logs:

/u/logs//u/HBase B/zookeeper-3.4.6/zoo.out 2>&1

2 : refers to the second file descriptor for the process, that is stderr.

> : means re-direct
&1:  means the target of the redirection should be the same location as the first file descriptor i.e stdout

How it works…

Note

Sizing of the environment is very critical for the success of any project, and it's a very complex task to optimize it to the needs.

We dissect it into two parts, master and slave setup. We can divide it in the following parts:

Master-NameNode
Master-Secondary NameNode
Master-Jobtracker
Master-Yarn Resource Manager
Master-HBase Master 
Slave-DataNode
Slave-Map Reduce Tasktracker
Slave-Yarn Node Manager
Slave-HBase Region server
  • NameNode: The architecture of Hadoop provides us a capability to set up a fully fault tolerant/high availability Hadoop/HBase cluster. In doing so, it requires a master and slave setup. In a fully HA setup, nodes are configured in active passive way; one node is always active at any given point of time and the other node remains as passive.

    Active node is the one interacting with the clients and works as a coordinator to the clients. The other standby node keeps itself synchronized with the active node and to keep the state intact and live, so that in case of failover it is ready to take the load without any downtime.

    Now we have to make sure that when the passive node comes up in the event of a failure, the passive node is in perfect sync with the active node, which is currently taking the traffic. This is done by Journal Nodes(JNs), these Journal Nodes use daemon threads to keep the primary and secondary in perfect sync.

  • Journal Node: By design, JournalNodes will only have single NameNode acting as a active/primary to be a writer at a time. In case of failure of the active/primary, the passive NameNode immediately takes the charge and transforms itself as active, this essentially means this newly active node starts writing to Journal Nodes. Thus it totally avoids the other NameNode to stay in active state, this also acknowledges that the newly active node work as a fail over node.

  • JobTracker: This is an integral part of Hadoop EcoSystem. It works as a service which farms MapReduce task to specific nodes in the cluster.

  • ResourceManager (RM): This responsibility is limited to scheduling, that is, only mediating available resources in the system between different needs for the application like registering new nodes, retiring dead nodes, it dose it by constantly monitoring the heartbeats based on the internal configuration. Due to this core design practice of explicit separation of responsibilities and clear orchestrations of modularity and with the inbuilt and robust scheduler API, This allows the resource manager to scale and support different design needs at one end, and on the other, it allows us to cater to different programming models.

  • HBase Master: The Master server is the main orchestrator for all the region servers in the HBase cluster . Usually, it's placed on the ZooKeeper nodes. In a real cluster configuration, you will have 5 to 6 nodes of Zookeeper.

  • DataNode: It's a real workhorse and does most of the heavy lifting; it runs the MapReduce Job and stores the chunks of HDFS data. The core objective of the data node was to be available on the commodity hardware and should be agnostic to the failures.

    It keeps some data of HDFS, and the multiple copy of the same data is sprinkled around the cluster. This makes the DataNode architecture fully fault tolerant. This is the reason a data node can have JBOD01 rather rely on the expensive RAID02.

  • MapReduce: Jobs are run on these DataNodes in parallel as a subtask. These subtasks provides the consistent data across the cluster and stays consistent.

We will discuss this in more details in Chapter 3, Working with Large Distributed Systems Part 1.

There's more…

Apache Yarn is a robust, distributed, application management framework that surpasses the traditional Apache Hadoop MapReduce framework to process data in a large Hadoop clusters.

This change was needed because during the map phase of the mapreduce process, the data is chunked into small discrete packets that can be processed, followed by a second phase reduce, which allows this split data to be aggregated and thus produces the desired results. This works well with small, mid-sized and to some extent large clusters, but for the very large cluster (more than 4000 nodes), the unpredictable behavior starts to surface. The core issue was replication of data during the cascading failure.

Thus, it helps us in reliability, scalability, and sharing. Hadoop Yarn essentially works with JobTracker and splits the multiple accountabilities into resource management, job monitoring and scheduling into more granular and distributed by resource manager and application Master.

It works in synchronicity with per-node NodeManager and the per-application ApplicationMaster.

NodeManager takes a remote invocation from resource manager and manage resources available on a single node.

ApplicationMaster is responsible for negotiating resource with the resourceManager and works with the NodeManager to start the containers.

HBase provides low-latency random read and writes on top of HDFS, being a large-scale key value store, the main differentiating factor for HBase is that it can scan petabyte of data at a very high speed. It also comes with an inbuilt capability of autosharding by splitting the tables dynamically when the table becomes too large.

This enables HBase to horizontally scale. This is quantified as regions. Regions are a portion of table data, which are stored together and of prime efficiency. This does not make sense. The slave servers in HBase are the region server. It does a fair bit of work and provides true distribution across different regions. It can serve one or more regions based on the needs, each reason is assigned to a region server or start-up.

HBase 0.96 removed the concept of ROOT containing the META table location, rather it moved it to ZooKeeper as the META table cannot split and can be in only single region:

  • HMaster: This does administrative operations and coordinated cluster.

  • HTable: It allows client for, get, put, delete, and other data manipulation options. This interacts directly with the region server. Essentially, it finds the region server, which is responsible for serving the particular row range.

  • HFile: This is a physical representation of data in HBase, the read of data in always done using the region servers. It's generated by flush or compactions. There are two versions of HFile V2, and V3.

  • HFile V2: The main issues with HFile V1 were to load all the monolithic indexes and large bloom filter in memory. V2 was introduced to provide efficiency as compared to V1, while sorting large amount of data by using multilevel indexes and a block level bloom filter. It also improves the caching and memory utilization. Index is also moved to block level. This essentially means that each block has its own leaf index, which allows multilevel index. The multilevel index is like b+ tree and uses last key of each block to facilitate intermediate. The detailed explanation is beyond the scope of this book:

  • MemStore: It collects data edits as they're received and buffers them in memory. It helps the system to push the data on the disk at one go, and on the other hand, it keeps the data in memory for subsequent access and avoid the expensive disk seeks. It also helps in keeping the data block size to the HDFS block size specified. It is also needed to mention about the sorting it does before flushing to Hfile.

  • Block cache: For efficient I/O usage, HBase is programmed to read the entire block at one go and kept in memory (In JVM memory) per region servers. It is initialized during the region server startup and stays the same during the lifetime of the server startup.

  • LruBlockCache: The data blocks are cached in-memory (JVM heap). The block is divided into different size, 25% (for single access), 50% (multi access), 25% (in-memory) of total block size, respectively.

  • SlabCache: It's a way off-heap memory outside the JVM heap using the DirectByteBuffer.

    SlabCache minimizes the fragmentation but the other part of HBase that is JVM-dependent, still can do fragmentations. The main advantage that we get is, it reduces the frequency of stop the world pause GC cycle, which can lead to the no heartbeats of the region servers and can signal as dead, this can be catastrophic in an actual production system. While reading the data from the slabcache, the data is copied from the disk based on "copy on read approach", which means reading data from the JVM if the data is present. If the data is not copied then the data is copied on the heap from the slab: http://en.wikipedia.org/wiki/XOR_swap_algorithm.

    SlabCache works as an L2 cache, and replaces the FS cache. The on-heap JVM cache works as the L1 cache.

    This approach allows us to use large memory without losing the performance of the system, and it reduces the chances of missed heartbeats because of stop the world GC process.

    This is mainly achieved due to the Direct ByteBuffer class available in the java.nio package, which allows us to allocate memory outside the normal Java Heap/JVM very similar to malloc() in C programming. The Garbage collection process will not remove the unreferenced objects when the memory is allocated by direct bytebuffer.

  • Bucket cache: It's an implementation of block cache similar to LruBlockCache. It can be also used as a secondary cache to expand the cache space. The blocks of data can be stored in memory or on the file system. It significantly helps the CMS and heap fragments by Java garbage cleaning (GC) process.

  • Multilevel caching: It's a design strategy of effective and large cache management. The first-level cache is an L1 level cache, which is LruBlockCahce. The second level is L2. Both the cache levels interact independently to each other and are checked in case of eviction and retrieve block of data.

See Also

Refer to the following chapter:

  • Working with Large Distributed Systems

 

Using the filesystem


HBase depends on the Hadoop Distributed File System (HDFS).

HDFS fundamentally is a distributed file system, which relies on following core principles:

Getting ready

The following are the benefits of using HDFS:

  • It's designed to work as a fault-tolerant system and is rack aware.

  • It works on the low-cost commodity hardware.

  • HDFS relaxes core system POSIX requirements to facilitate streaming access to the underlying OS access of file system data.

  • It's designed to write once and read many times. It also supports parallel reading and processing the data (read, write, and append). It doesn't support random writes of data.

  • It's designed to scale at a very large level, which means file size like petabyte of data.

  • It works with minimum data motion. The MapReduce processes the data on the machine/node where the data is actually present. This intelligent invocation process, thus avoiding or minimizing the network I/O and keep the expensive I/O operation localized (within the same rack or to the local disk).

  • HDFS has an excellent checksummed file system at a block level, and if an inconsistency between the checksum and the block contents is observed, This does not make sense!, the communication is sent to the HDFS master, which synchronizes the making of a new replica of the affected block as well, as the removal of the corrupted block immediately.

A lot of work is continuously happening on the core implementations of HDFS; some are as follows:

  • Much granular file-level permissions and authentication.

  • Rack awareness was added to optimize the physical location during scheduling task and allocating storage.

  • For administrative purposes, a new feature was added known as Safemode.

  • In addition to these, for administrators a diagnostics service like fsck was added, this enables is to do an analysis on the missing blocks of a file system.

  • Rebalancer tool is an internal distribution mechanism which re-distributes the load in the DataNode, which becomes unbalanced due to the continuous data between DataNodes.

  • An upgrade and rollback step was added for administrators, which now allow reverting to the old version of HDFS in case of any unforeseen situations which was caused by the upgrade; this allows us a safe and painless recovery.

  • The concept of checkpoints by secondary NameNode is introduced to make sure size of the file which holds logs of HDFS changes stays within the specified limits at the NameNode.

    More Information can be obtained at this locations http://hadoop.apache.org/.

    Tip

    We are not considering a local setup of HBase as we are more focused on the HA and larger scale fully distributed setup.

Data in HDFS is not placed homogeneously in the distributed DataNodes. The most obvious reason is addition of new DataNodes is the preexisting cluster. Internally the system (NameNode) performs various checks before is starts sending the data/new blocks to the DataNode, which are listed as below:

  • One replica of a blow is kept on the same node which is writing the block.

    To make sure the fault tolerant design is compiled, the replicas are kept across the distributed rack within the cluster.

  • To reduce cross-network chattiness, one replica is placed on the same rack of the node writing to the file. This also helps to keep the homogeneousness of HDFS data in a distributed very large DataNode cluster.

  • In some scenario's there can be competing considerations, and this may cause non-uniform data across DataNode.

    To overcome this scenario, the new HDFS framework enables administrators with tools which can be use to re-balance, check the data across different DataNodes.

You would need to set up Hadoop 2.2.0 in a fully distributed mode, as discussed in the previous section. Web interface is also used for browsing the file system.

How to do it…

To use the File system we go as per the following steps:

  1. Logging the NameNode instance by the following:

    ssh [email protected] 
    ( you can you IP or the fully qualified machine name) 
    then type cd /u/HBase B/hadoop-2.2.0/bin
    
  2. Let's run some commands related to dfs:

    Note: this will make sure the setup is proper and we are able to interact with it
    /u/HBase B/hadoop-2.2.0/bin/hadoop  dfs -ls /
    drwxr-xr-x   - hadoop supergroup  0 2014-08-13 22:48 /nn01
    drwxr-xr-x   - hadoop supergroup  0 2014-08-17 23:28 /nn02
    

    For Putting the file into HDFS:

    /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -put hello.txt /nn02/hello.txt
    running /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  –du /nn01/  /nn02
    0  /nn02/hello.txt
    0  /nn01/hello.txt
    

    For the recursive version:

    /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -ltr /
    drwxr-xr-x   - hadoop supergroup  0 2014-08-13 22:48 /nn01
    -rw-r--r--   3 hadoop supergroup  0 2014-08-13 22:48 /nn01/hello.txt
    drwxr-xr-x   - hadoop supergroup  0 2014-08-17 23:39 /nn02
    -rw-r--r--   3 hadoop supergroup  0 2014-08-17 23:39 /nn02/hello.txt
    

    Similarly you can use the following commands:

    touchz, text,tail, stat, setrep, rmr, rm, put, mv, movefromLocal, mkdir, lsr, ls, getmerge, get, dus, expunge, du, copyToLocal, chown, chmod, chgrp, cat.
    
  3. Let us take a look at fsck commands:

    hdfs fsck [GENERIC_OPTIONS] <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
    
    • -move: This moves the corrupted files to /lost +found

    • -delete: This deletes the corrupted files

    • -openforwrite: This prints out the files opened for write

    • -files: This prints out the files being checked

    • -blocks: This prints the block report

    • -locaitons: This prints location of every block

    • -rackes: This prints network topology for the data-node location

  4. Let's take a look on some NameNode:

    hadoop namenode [-format] | [-upgrade] | [-rollback] | [-finalize] | [-importCheckpoint]
    hadoop namenode –format  Formats the namenode.
    Hafoop namenode –upgrade ,first it upgraded the namenode and then distributes and starts the new namenode 
    Hadop namnode –rollback as the name suggests the Rollsback namenode to the previous version. This should be used only after stopping the cluster and distributing the old hadoop version.
    hadoop namenode -finalize Resent upgrade will become permanent.
    hadoop namnode –importCheckpoint Load image from a checkpoint directory and save it into the current one.
    
  5. Let's consider seconderynamenode:

    hadoop secondarynamenode [-checkpoint [force]] | [-geteditsize]
    hadoop secondarynamenode –geteditsize  Prints the Edit Log size
    hadoop secondarynamenode –checkpoint [force] checkpoints the secondary namenode if EditLog size >= fs.checkpoint.size. If –force is used, checkpoint irrespective of EditLog size.
    
  6. We have discussed DataNode and its functions:

    hadoop datanode [-rollback]
    It rollsback the datanode to the previous version. This should be only used after stopping the all the datanode and distributing the old hadoop version.
    
  7. Considering Jobtracker runs the MapReduce job tracker node:

    hadoop jobtracker
    

The HBase setup

Configuring HBase in a fully distributed environment:

  • Prerequisites: The hadoop/hdfs cluster is healthy

  • It has namenode,data node, secondary namenode setup done as discussed earlier

  • Passwordless access is there between the namenode, datanode, secondary namenocde

  • The directory structure is having appropriate access levels

  • Hope paths are set as described earlier

Just for recap you can run this command, and it must show the following details:

Tip

Please check the compatibility of Hadoop and HBase .

In this book, we used hadoop-2.2.0 and HBase 0.98.5-hadoop2.

  1. Let's go to the NameNode of Hadoop/HDFS by typing this command:

    Vi /u/HBase B/hadoop-2.2.0/etc/hadoop/ hdfs-site.xml
    

    The setup should be like this:

    These are the data nodes that we will use for regional servers later on. We will use NameNode as an HBase master node.

    vi  /u/HBase B/hadoop-2.2.0/etc/hadoop/slave
    
      it should have the nodes which will be used as a data node
      your-datanode01
      your-datenode02

The following steps will help you to implement the same:

  1. Copy the hdfs-stie.xml which is in Hadoop setup to:

    cd $HBase _HOME/conf
    
  2. Also, copy it to all the Region servers. Edit the regionserver file by:

    Vi $HBase _HOME/conf/ regionservers on the HMASTER server
    
  3. Place the IP or the fully qualified name of the region servers.

    Vi HBase -env.sh and change the export HBase _MANAGES_ZK=true
    
  4. This will allow HBase to manage the zookeeper internally on port 2181.

Starting the cluster

For starting the HBase cluster, we will go to:

  cd $HBase _HOME/bin start-HBase .sh 

This will start the entire cluster and its region servers.

Tip

Please check the logs in the log folder just to make sure the cluster starts properly:

cd $HBase _LOGS/
ls -ltr
-rw-rw-r--. 1 hadoop hadoop      0 Aug 29 19:22 SecurityAuth.audit
-rw-rw-r--. 1 hadoop hadoop  92590 Aug 30 15:04 HBase -hadoop-zookeeper-your-HBase -master.log
-rw-rw-r--. 1 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log

tail -200 HBase -hadoop-zookeeper-your-HBase -master.log

There you will see no binding errors or exceptions.

tail -200 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log

There should be no errors or exceptions.

Validating the cluster

Let's validate all of the setup of HBase ; on the master node run jps, it will show the following:

[[email protected] logs]$ jps
960 SecondaryNameNode  // secondary name node is up
8467 NameNode // Name node is up
11892 HQuorumPeer // zookeeper is running in Quorum mode
25318 Jps // pls neglect this 
12008 HMaster // HBase Master is running successfully
8699 ResourceManager // Resource manager is running 
12171 HRegionServer  // HBase Region server is running
8974 JobHistoryServer // JobHistory Server is running

This will ensure that all the system on the master is working perfectly. We are having a region server on the master node; hence, we are seeing HRegionServer listed as earlier.

On the region server (your region server running on different node), use the same command and you will see the following:

13026 NodeManager
12425 Jps
12778 DataNode
13567 HRegionServer

We will make sure that all the region servers are working. Basic operations on the cluster:

On the HBase Master:

cd $HBase _HOME/bin
[[email protected] bin]$ HBase shell -d
HBase (main):001:0>

This is the command line for HBase shell. We are using the –d option to manage it in a debug mode. In production, it should be avoided and we should see the logs file to make sure that the logs is not having connection errors to any of the components:

HBase (main):001:0> list
City_Table                                                        
MyClickStream                                                      
t1                                                                 
3 row(s) in 1.1270 seconds

["City_Table", "MyClickStream", "t1"]
HBase (main):002:0>  statusHBase (main):002:0> status 'simple'HBase (main):002:0> status 'summary'HBase (main):002:0> status 'detailed'
HBase (main):002:0> describe 'MyClickStream'
HBase (main):002:0> scan 'yourtablename'
HBase (main):002:0> create 'yourtablename','cf01',cf'02'

There are many such commands that we can run from the HBase shell command line, which we will discuss as we go through different chapters as we go ahead.

The preceding tables are created in the following section. It's just for reference.

The following is the Snapshot process:

  • We will consider from Hadoop and then from an HBase prospective; once the directory is marked as ready to snapshot, which essentially means it's not getting any operations of read/write at this particular time, at this time a snapshot can be taken.

  • It can be taken on any dir within the Hadoop/HBase data ecosystem. A snapshottable directory has a limit of 65,536 concurrent snapshots. There is no limit on the snapshottable directories (however file descriptor or other OS limitations can come into the picture). It's a good practice for administrators to set any directory to be snapshottable.

    Note

    If a snapshottable directory has snapshots, it won't allow deletes or renames before all the snapshots residing are deleted.

  • There is a system limitation that doesn't allow nested snapshottable directories.

Create a directory as a snapshot:

hdfs dfs -mkdir /snapshot 
using this command we can make it enable for snapshots.
hdfs dfsadmin -allowSnapshot /snapshot
hdfs dfs -createSnapshot /snapshot [<snapshotName>]

Deleting a snapshot:

Delete a snapshot from a snapshottable directory.

This can be only done using the owners privilege of the snapshottable directory:

  hdfs dfs -deleteSnapshot <path> <snapshotName>

Snapshots in HBase :

To reduce the impact on the Region Servers, HBase snapshots by design give flexibility to clone a table without making data copies. In addition to this, we can export the table to another cluster, this will also avoid any impact on the region server.

Configuring HBase Snapshot:

<property>
    <name>HBase .snapshot.enabled</name>
    <value>true</value>
</property>

We are assuming that a table MyClickStream is created in HBase . We can also create the table if it's not present:

./bin/HBase shell
HBase > create 'MyClickStream' ,'cf01', 'cf2'

cf01-> is represented as a column family  01
cf02-> is represented as a column family 02 

./bin/HBase shell –d 
HBase > disable 'MyClickStream'
HBase > snapshot 'MyClickStream' ,'MyClickStreamSnapshot-08302014'

Listing a Snapshot: List all the snapshots taken:

./bin/HBase shell 
HBase > list_snapshots 
  • Deleting a Snapshot: We can remove the unwanted Snapshots by running the following command:

    ./bin/HBase / shell
    HBase > delete_snapshot ''MyClickStreamSnapshot-08212014'
    
  • Clone a table from Snapshot: Cloning allows us to create a new table with the same dataset when the snapshot was taken. Changes to the clone table are isolated to itself, and the changes in the original table are not going to impact the snapshot:

    ./bin/HBase shell
    HBase > clone_snapshot 'MyClickStreamSnapshot-08212014', 'MyClickStreamSnapshot01-08212014'
    
  • Restoring Snapshots: This can be only performed when the table is disabled. The effectiveness of this process is that the table comes up with the same state as before, when we took the snapshot:

    ./bin/HBase / shell
    HBase > disable 'MyClickSteam' –-- the name of the table
    

    This will disable the table for active use and no operation like read/write it does at this point:

    HBase > restore_snapshot ''MyClickStreamSnapshot-08212014'
    

    Internally there are differences in which replication and snapshot works.

    Replication is performed at log level wherein snapshots are always at file system . Thus its essential to sync the states from the master as once the restore operation is done the replica will be different then the master. In case of we performed restore operation, it's pivotal to stop the replication process first and perform the bootstrapping operation again.

    In the scenario of limited data loss due to any client, it's recommended to clone the table using the existing snapshot and run a MapReduce job which essentially copies the data from cloned to the main, this way we don't have to go for a full restore which predecessor process is to disable the tables :

    Specify the HBase .rootdir of the other cluster:

    ./bin/HBase 
    HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8
    

    In case of a highly used production environment, it's advisable to restrict bandwidth consumption while exporting a snapshot.

    This can be achieved by invoking the preceding command with bandwidth parameter, as shown next; the unit of measure is megabyte per second and the value is an integer:

    ./bin/HBase 
    HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8 –bandwidth 200
    

How it works…

To better understand the concepts, I have broken down the parameter into:

  • WebInterface: This shows the details of NameNode and DataNode and display basic information about the cluster. The URL will be http://your -namenode-name:50070/. Alternatively you can use the same interface for navigating the filesystem within the NameNode.

  • Snapshots: Snapshots in HDFS are always read-only and represent the status of the file at the time snapshot was taken. You can restrict Inconsistency throughout chapter of snapshot versus Snapshot to a limited scope of a filesystem or Snapshot can or it can span to the entire file system.

  • HBase Snapshots: A snapshot is an array of metadata information used by administrators to restore the previous state of the tables on which it was taken. In technical meaning it's not a copy of table but it's a set of operation which calibrates metadata (which is nothing but table and regions) and the actual data (HFiles, me store, WALs).

  • Offline Snapshots: The standard scenario is to take the snapshot when the table is disabled, This makes sure that all the data is flushed on disk, and no writes or reads are accepted on this dataset. Which means, taking a snapshot is just a matter of working through the table metadata and the HFiles which reside on the disk and keeping a reference to them. The master invokes this operation, and the time required to do this operation is governed by the time taken by the HDFS NameNode to calibrate and provide the list of the files.

  • Online Snapshots: This type of snapshot works differently; in it, tables are enabled and the regions are getting read and write, or in the other words it's getting put and get by the live traffic, when master receives the request for snapshot, master coordinates it by asking all the region server to take a snapshot of their region. This works on simple-flush and does not provide casual consistency. This type of snapshot has minimal performance overhead.

DFS Administration commands:

  • bin/hadoop dfsadmin -help: provide you all the commands.

  • bin/hadoop dfsadmin -reports: provides statistics and file information.

  • bin/hadoop dfsadmin -safemode enter | leave | get | wait –.

  • Safe mode: Immediately blocks changes to the name space and converts it to read only. It also blocks replication and any delete operations on the data block.

    Note

    An important point to note about the safe mode, is that during the startup process, safe mode is turned on automatically but is switched to normal once the process detects the minimum condition is fulfilled. You can also manual trigger safe mode but in this case you have to switch-off manual mode too.

  • bin/hadoop dfsadmin –saveNamespace: This command requires su permission and saves the current namespace and resets the edit logs.

  • bin/hadoop dfsadmin –rollEdits: This rolls the edit logs. Note that this requires super user permission.

  • bin/hadoop dfsadmin -restoreFailedStorage: This comes with three parameters (Set/Unset/Check) it attempts to restore failed storage replicas only if they become available.

    Note

    This can be only done by su option.

  • bin/hadoop dfsadmin –refreshNodes: This commend updated the NameNode by allowing the DataNode to connect to the NameNode.

  • bin/hadoop dfsadmin - finalizeUpgrade: This concludes the upgrade of HDFS. This invokes an internal process and instructs the DataNodes to delete their previous version working directories, and then invoking the Namenode to do the same. This finishes the upgrade process.

  • bin/hadoop dfsadmin -deleteBlockPool: Arguments are datanodehost:port, blockpool id and an optional argument force. If force is passed, block pool directory for the given blockpool Inconsistency between id and ID on the given DataNode is deleted along with its contents; otherwise, the directory is deleted only if it is empty. The command will fail if DataNode is still serving the block pool. Refer to refresh NameNodes to shut down a block pool service on a DataNode:

  • bin/hadoop dfsadmin –help.

Let's discuss other important components:

  • SecondaryNameNode: NameNode stores changes to the native file system file (edits). During the startup process, the HDFS state is read from the image file commonly known as fsimage. These changes are applied to the edit log files. The latest state of the HDFS is pushed to the fsimage, then the normal process is invoked by generating a blank edit log file. In essence, NameNode combines these two(fsimage and log) files during the startup. This merge process makes the next restart faster.

  • Rebalancer: The HDFS cluster gets easily imbalanced due to the following reasons:

    When a new DataNode joins the cluster, any map task assigned to the machine most likely does not access local data, thus consuming more network bandwidth. When the DataNodes becomes full new, atablocks are placed on full data nodes, thus reducing the read parallelism.

  • Rack Awareness: As NameNode design is for HA/Fault tolerant thus the system attempts to cascade the replicas of block on the multiple racks. Using the variable dfs.network.script, the administrator can govern these settings.

  • Safemode: Makes the HDFS block read-only.

  • fsck: It's designed to report problems with missing blocks, under-replicated blocks; fsck ignores open files. Depending on the needs it can be run on a subsection of files or can be run on the entire file system which is under NameNode.

  • Snapshotting: We will consider from Hadoop and then from the HBase perspective.

    Snapshots process is very flexible and robust and it allows snapshots at directory level, cascaded directory level. Total of 65,536 simultaneous snapshots can be accommodated. In essence there is no limit on snapshottable directories.

    Tip

    Nested snapshottable directories are currently not possible.

    Exporting to another cluster tool helps us duplicate the data between clusters. The data copied is hfiles, logs, and snapshot metadata. This works at a file system (HDFS) level, thus it's necessary to have an HBase cluster fully online also. This is the reason it does not impact the RegionServer workload.

    In the preceding section, we discussed the core file system, which is the foundation of HBase . We discussed HDFS and how it's related to Hadoop ecosystem and then how HBase relies on the Hadoop/HBase foundation to work. In doing so, we discussed the internal structure of the HDFS, HBase integration points. In step 1 to 9, we discussed the HDFS/Hadoop commands in a fully distributed mode. This is needed to make sure that HBase runs in the fully distributed environment. We cannot run HBase if we don't have the Hadoop setup; however for development purposes we can run HBase using standalone mode installation; the other way will be to run it in Pseudo-Distributed.

There is more…

The entire process helps us set up the Hadoop/HDFS file system, and later on HBase can sit on get the benefits of the HDFS distributed architecture.

See also

Refer to the following chapter:

  • Working with Large Distributed Systems.

 

Administering clusters


It's pivotal at this time to know more about the HBase administrative process, as it stores petabyte of data in distributed locations and requires the system to work smoothly agnostic to the location of the data in a multithreaded, easy-to-manage environment in addition to the hardware, OS, JDK and other hardware and software components.

HbBase and administrative GUI provide the current state of the cluster and and plenty of command-line tools, which can really give us an in-depth knowledge about what is going on in the cluster.

Getting ready

We must have a HDFS/Hadoop setup in a cluster or fully distributed mode as the first step, which we did it in the earlier section. The second step is to have an HBase setup in a fully distributed mode.

Tip

It's assumed that we have a setup of password-less communication between the master and the region servers on the HBase side. If you are using the same nodes for Hadoop/Hdfs setup, we need to have the Hadoop user also to have a password-less setup from Namenode to Secondary NameNode, DataNodes, and so on.

We must have a full HBase cluster running on top of hadoop/HDFS cluster.

How to do it…

HBase provides multiple ways to do administration work on the fully distributed clustered environment:

  • WebUI-based environment

  • Command line-based environment

  • HBase Admin UI

The Master run on 60010 ports as default, the web interface is looked up on this port only. This needs to be started on the HMaster node.

The master UI gives a dashboard of what's going on in the HBase cluster.

The UI contains the following details:

  • HBase home

  • Table details

  • Local logs

  • Log levels

  • Debug dump

  • Metric dump

  • HBase configuration

We will go through it in the following sections.

  • HBase Home: It contains the dashboard that gives a holistic picture of the HBase cluster.

    • The region server

    • The backup master

    • Tables

    • A task

    • Software attributes

  • Region Server: The image shows the region server and also provided a tab view of various metrics like (basic stats, Memory, Request, Storefiles, Compactions):

A Detailed discussion is out of scope at this point. For more details, see later sections. For our purpose, we will avoid to go for backup master:

Let's list all the user-created tables using Tables. It provides the details about User Tables (tables that are created by the users/actors), Catalog Tables (this contains HBase :meta and HBase :namespaces), which is seen in the following figure:

Clicking any table, as listed earlier, other important details are shown such as table attributes, table regions, and region-by-region server details. Actions as compaction and split can be taken using admin UI.

Task provides the details of talk, which is happening. We can see Monitored task, RPC Tasks, RPC Handler task, Active RPC Calls, Client Operations, and a JSON response:

The following Software Attributes page provides the details of the software used in the HBase cluster:

After clicking on zk_dump, it provides further details about the zookeeper Quorum stats as follows:

HBase provides various command-line tools for administrating, debugging, and doing an analysis on the HBase cluster.

The first tool is the HBase shell, and the details are as follows:

bin/HBase 
Usage: HBase [<options>] <command> [<args>] 

Identify inconsistencies with hbck; this tool provides consistencies, checks for corruption of data, and it runs against the cluster.

bin/HBase  hbck  

This runs against the cluster and provides the details of inconsistencies between the regions and masters.

bin/HBase  hbck –details  

The -details provides the insight of splits which happens in all the tables.

  bin/HBase hbck MyClickStream 

The preceding line enables us to vView the HFile content in a text format.

   bin/HBase org.apache.hadoop.HBase .io.hfile.HFile

Use FSFLogs for manual splitting and dumping:

HBase org.apache.hadoop.HBase .regionserver.wal.FSHLog --dump 
HBase org.apache.hadoop.HBase .regionserver.wal.FSHLog –split

Enable compressor tool:

HBase org.apache.hadoop.HBase .util.CompressionTest hdfs://host/path/to/HBase snappy

Enable compressor tool on the Column Family or while creating a tables:

HBase > disable 'MyClickStream'
HBase > alter 'MyClickStream', {NAME => 'cf', COMPRESSION => 'GZ'} 
HBase > enable 'MyClickStream'
HBase > create MyClickStream', {NAME =>'cf2', COMPRESSION => 'SNAPPY'}

Load Test too Usage below are some of the commands which can be used to do a quick load test on your compression performance:

bin/HBase org.apache.hadoop.HBase .util.LoadTestTool -h

usage: bin/HBase org.apache.hadoop.HBase .util.LoadTestTool <options>.

Options: includes –batchupdate

-compression: <arg> Compression type , arguments can be LZq,GZ,NONE and SNAPPY

We will limit ourselves with the above commands.

A good example will be as follows:

HBase org.apache.hadoop.HBase .util.LoadTestTool -write 2:20:20 -num_keys 500000 -read 60:30 -num_tables 1 -data_block_encoding NONE -tn load_test_tool_NONE
-write (here we are passing 2 as ->avg_cols_per_key)
20 is the ->avg_data_size>:
20 is number of parallel threads to be used.
-num_key has an integer arguments as 500000, this is the number of keys to read and write.

Now let's look at a read:

-read 60 is the verify percent

30 is the number of threads

-num_tables a positve interger is passed which is the number of tables to be loaded in parallel
-data_block_encoding there are various encoding algorithms which can be passed as an argument, this allow the data block to be encoded based on the need. Some of them  are [NONE,PREFIX,DIFF,FAST_DIFF,PREFIX_TREE].

-tn is a table name prefix which exports the content and data to HDFS in a sequence file using this:

HBase org.apache.hadoop.HBase .mapreduce.Export <tablename> <outputdir> [<versions> [<starttime> [<endtime>]]] 

Note

You can configure HBase .client.scanner.caching in the job configuration; this is for all the scans.

Importing: This tool will load data that has been exported back into HBase :

This can be done by the following command:

HBase org.apache.hadoop.HBase .mapreduce.Import <tablename> <inputdir> 

-tablename: Is the name of the table to be imported by the tool.

-inputdir: Is the input dir which will be used.

Utility to Replay WAL files into HBase :

HBase org.apache.hadoop.HBase .mapreduce.WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]> 
HBase org.apache.hadoop.HBase .mapreduce.WALPlayer /backuplogdir MyClickStream newMyClickStream
walinputdi:  /backuplogdir
tables:  MyClickStream
tableMappings→newMyClickStream
 newMyClickStream

HBase clean: is dangerous and should be avoided in production setup.

HBase clean

Options: as parameters

  --cleanZk   cleans HBase related data from zookeeper.
  --cleanHdfs cleans HBase related data from hdfs.
  --cleanAll  cleans HBase related data from both zookeeper and hdfs.

HBase pe: This is a shortcut to run the performance evaluations tools

HBase ltt: This command is a shortcut provided to run the rg.apache.hadoop.HBase .util.LoadTestTool utility. It was introduced in 0.98 version.

View the details of the table as shown here:

Go to the log tab on the HBase Admin UI home page, and you will see the following details. Alternatively, you can log in to the directory using the Linux shell to tail the logs.

Directory: /logs/:

SecurityAuth.audit 0 bytes  Aug 29, 2014 7:22:00 PM
HBase -hadoop-master-rchoudhry-linux64.com.log 691391 bytes   Sep 2, 2014 11:01:34 AM
HBase -hadoop-master-rchoudhry-linux64.com.out 419 bytes 	Aug 29, 2014 7:31:21 PM
HBase -hadoop-regionserver-rchoudhry-linux64.com.log 1048281 bytes   Sep 2, 2014 11:01:23 AM
HBase -hadoop-regionserver-rchoudhry-linux64com.out 419 bytes   Aug 29, 2014 7:31:23 PM
HBase -hadoop-zookeeper-rchoudhry-linux64.log 149832 bytes   Aug 31, 2014 12:51:42 AM
HBase -hadoop-zookeeper-rchoudhry-linux64.com.out 419 bytes   Aug 29, 2014 7:31:19 PM
HBase -hadoop-zookeeper-rchoudhry-linux64.com.out.1 1146 bytes   Aug 29, 2014 7:26:29 PM

Get and set the log levels as required at runtime:

Log dump

Have a look at what is going in the cluster with Log dump:

Master status for HBase -hadoop-master-rchoudhry-linux64.com,60000,1409365881345 as of Tue Sep 02 11:17:33 PDT 2014
Version Info:
===========================================================
HBase 0.98.5-hadoop2
Subversion file:///var/tmp/0.98.5RC0/HBase -0.98.5 -r Unknown
Compiled by apurtell on Mon Aug  4 23:58:06 PDT 2014
Hadoop 2.2.0
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z


Tasks:
===========================================================
Task: RpcServer.reader=1,port=60000
Status: WAITING:Waiting for a call
Running for 315970s

Task: RpcServer.reader=2,port=60000
Status: WAITING:Waiting for a call
Running for 315969s

Metrics dump

Exposes the JMX details for the following components in a JSON format using Matrix dump:

  • Start-up progress

  • Balancer

  • Assignment Manager

  • Java Management extension details

  • Java Runtime Implementation System Properties

    Note

    These are various system properties; the discussion of it is beyond the scope of this book.

How it works…

When the browser points to the http address http://your-HBase-master:60010/master-status, the web interface of HBase admin is loaded.

Internally, it connects to the Zookeeper and can collect the details from the zookeeper interface, where in zookeeper tracks the Region server as per the region server configuration in the region server file. These are the values set in the HBase -site.xml. The data for hadoo/hdfs, region servers, zookeeper quorum details are continuously looked by the RPC calls, which the master makes via zookeeper.

In the above HBase -site.xml, the user/ sets the Master, Backup master, various other software attributes, the refresh time, memory allocations, storefiles, compactions,request, zk dumps etc

Node or Cluster view: In this, the user chooses either monitoring Hmaster or Region server data view. The HMaster view contains data and graphics about the node status. The Region server view is the main and the most important one because it allows monitoring of all the region server aspects.

You can point the http address to http://your-HBase-master:60030/rs-status. It loads the admin UI for the region servers.

The matrix that is captured here is as follows:

  • Region server Metrics (Base stats, Memory, request, Hlog, Storefiles, Queues) Tasks happening in Region servers (Monitored, RPC, RPC handler, active RPC .JSON, client operations)

  • Block Cache provides different options for on-heap and off-heap: LurBlockCache and Bucket are off-heap.

Backup master is a design/architecture choice, which needs careful considerations before enabling it. HBase by design is a fault-tolerant distributed system with assumes hardware failure in the network topology.

However, HBase does provide various options to for it such as:

  • Data center-level failure

  • Accidental deletion of records/data

  • For audit purpose

See also

Refer to the following chapter:

  • Working with Large Distributed Systems.

 

Managing clusters


In HBase ecosystem, it's must to monitor the cluster to control and improve their performance and states as it grows. As HBase sits on top of Hadoop ecosystem and serves real-time user traffic, it's essential to see the performance of the cluster at any given point of time, this allows us to detect the problem well in advance and take corrective actions before it happens.

Getting ready

It is important to know some of the details of Ganglia and its distributed components before we get into the details of managing clusters

gmond

This is an acronym for a low footprint service known as Ganglia Monitoring Daemon. This service needs to be installed at each node from where we want to pull the matrix. This daemon is the actual workhorse and collects the data of each host by listen/announce protocol. It also helps collect some of the core metrics such as disk, active process, network, memory, and CPU/VCPUs.

gmetad

This is an acronym for Ganglia meta daemon. It is a service that collects data from other gmetad and gmond and mushes it together into a single meta-cluster image. The format used to store the data is RRD and XML. This enables the client application browsing.

gweb

It's a web interface or a view to the data that is collected by the earlier two services. It's a PHP-based web interface. It requires the following:

  • Apache web server

  • PHP 5.2 or later

  • The PHP json extension

How to do it…

We will divide our how to do it into two sections. In the first section, we will talk about installing Ganglia on all the nodes.

Once it's done, we will do the integration with HBase so that the relevant metrics are available.

Ganglia setup

To install Ganglia it is best to use prebuild binary package that is available from the vendor distributions. This will help in dealing with the pre-requisites libraries. Alternatively, it can be downloaded from the Ganglia website, http://sourceforge.net/projects/ganglia/files/latest/download?source=files.

If you are using browser from command prompt, you can do it by using following command:

wget –o http://downloads.sourceforge.net/project/ganglia/\ganglia%20monitoring%20core/3.0.7%20%28Fossett%29/ganglia-3.0.7.tar.gz

When doing wget, use it as a single line on your shell. Use sudo in case you don't have privilege for the current directory or download it in /tmp and later on copy to the respective location.

  1. tar –xzvf ganglia-3.0.7.tar.gz –c /opt/HBase B

  2. rm –rf ganglia-3.0.7.tar.gz // it will delete the tar file which is not needed now.

  3. Now let's Install the dependencies

    sudo apt-get –y install build-essential libapr1-dev libconfuse-dev libexpat1-dev python-dev
    

    The -y options means that apt-get won't wait for users confirmation. It will assume yes when question for confirmation would appear.

  4. Building and installing the downloaded and exploded binary:

    cd /opt/HBase B/ganglia-3.0.7
    ./configure --- is a configuration command on linux env
    make
    sudo make install
    
  5. Once the preceding step is completed, you can generate a default configuration file by:

    gmond --default_con
    fig > /etc/gmond.conf        --use "sudo su - " in case there is a privilege issue
    sudo su – will make you a root user and will allow the system library to be accessed by the gmond.conf
    
  6. vi /etc/gmond.conf and change the following:

    globals
    {
    user=HBase gangila in place of above.
    }
    

    Note

    In case you are using a specific user to perform ganglia task then change the above and add this user as shown above.

  7. The recommendation will be to create this user by the following commands:

    sudo adduser --disabled-login --no-create-home ganglia
    cluster {
    name =HBase B --- name of your cluster will be used 
    owner ="HBase B Company"
    url =http://yourHBase bMaster.ganglia-monitor.com/
    --- url of the main monitor or the CNAME 
    }
    
  8. The UDP setup, which is the default setup, if good for fewer than 120 nodes. For more than 120 nodes, we have to switch to unicast.

    The setup is as follows:

    Change in /etc/gmond.conf
    Udp_send_channel
    {
    #mcast_join=--your IP address to join in  
    host = yourHBase bMaster.ganglia-monitor.com
    post=8649
    # ttl=1 
    }
    udp_recv_channel
    {
    #mcast_join=--your IP address to join in  
    port =8649
    # bind =--your IP address to join in  
    }
  9. Start the monitoring daemon with:

    sudo gmond
    

    We can test it by nc <hostname> 8649 or telnet hostname 8649

    Note

    You have to kill the daemon thread to stop it using ps –ef | grep gmond. This will provide the process ID with the following process:

    Execute sudo kill -9 <PID>

  10. Now we have to install Ganglia meta daemon. It is good to have one if the cluster is less than 100 nodes. This is the workhorse and it will require powerful machine with decent compute power, as these are responsible for creating graphs.

  11. Let's move ahead:

    cd  /u/HBase B/ganglia-3.0.7
    ./configure –-with-gmetad
    make
    sudo make install
    sudo cp /u/HBase B/gangli-3.0.7/gmetad/gmetad.conf /etc/gmetad.conf
    
  12. Open using sudo vi /etc/gmrtad.conf change the code:

    setuid_username "ganglia"
    data_source "HBase B"  yourHBase bMaster.ganglia-monitor.com
    gridename "<our grid name say HBase B Grid>"
  13. Now we need to create directories, which will store data in a round-robin database (rrds):

    mkdir –p /var/lib/ganglia/rrds
    

    Now let's change the ownership to ganglia users, so that it can read and write as needed.

    chown –R ganglia:ganglia /var/lib/ganglia/
    
  14. Let's start the daemon:

    gmetad
    

    Note

    You have to kill the daemon thread to stop it using ps –ef | grep gmetad. This will provide the process ID with the process.

    Execute sudo kill -9 <PID>

  15. Now, let's focus on Ganglia web.

    sudo apt-get -y install rrdtool apache2 php5-mysql libapache2-mod-php5 php5-gd
    

    Tip

    Note that this will install rrdtool (round robin database tool), Apache/httpd, php5 connector (apache to mysql), Php5-mysql drivers, and so on.

  16. Copy the PHP-based file to the following locations:

    cp –r /u/HBase B/ganglia-3.0.7/web  /var/www/ganglia
    sudo /etc/init.d/apache2 restart ( others which can be used are, status, stop )
    
  17. Point http:// HBase bMaster.ganglia-monitor.com/ganglia, you should start seeing the basic graphs as the HBase setup is still not done.

  18. Integrate HBase and Ganglia:

    vi  /u/HBase B/HBase -0.98.5-hadoop2/conf /hadoop-metrics2-HBase .properties 
    
  19. Change the below parameter for getting different status on the ganglia:

    HBase .extendedperiod = 3600
    HBase .class= org.apache.hadoop.metrics2.sink.FileSink
    HBase .period=5
    HBase .servers=master2:8649
    # jvm context will provide memory used , thread count in JVM etc.
    jvm.class= org.apache.hadoop.metrics2.sink.FileSink
    jvm.period=5
    # enable rpc context to see the metrics on each HBase rpc method invocation.
    
    jvm.servers=master2:8649
    rpc.class= org.apache.hadoop.metrics2.sink.FileSink
    rpc.period=5
    rpc.servers=master2:8649
    
  20. Copy the /u/HBase B/HBase B/HBase -0.98.5-hadoop2/conf/ hadoop-metrics2-HBase .properties to all the nodes and restart HMaster and all the region servers:

How it works…

As the system grows from a few nodes to the tens or hundreds or becomes a very large cluster having more than hundreds of nodes it's pivotal to have a holistic view, drill down view, historical view of the logs at any given point of time in a graphical representation. In a large or very large installation, administrators are more concerned about redundancy, which avoids single point of failure. HBase and underlying HDFS are designed to handle the node failures gracefully, but it's equally important to monitor these failure as this can lead to pull down the cluster if a corrective action is not taken in time. HBase exposes various matrix to JMX and Ganglia like HMaster, region servers statistics, JMV (Java virtual machines), RPC (Remote procedure calls), Hadoop/HDFS, MapReduce details. Taking into consideration all these points and various other salient and powerful features, we considered Ganglia.

Ganglia provides the following advantages:

  1. It provides near-real-time monitoring for all the vital information of a very large cluster.

  2. It runs on commodity hardware and can be suited for most of the popular OS.

  3. Its open sourced and relatively easy to install.

  4. It integrates easily with traditional monitoring systems

  5. It provides an overall view of all nodes in a grid and all nodes in the cluster.

  6. The monitored data is available in both textual and graphic format.

  7. Works on multicast listen/announce protocol.

  8. Works with open standards.

    • JSON

    • XML

    • XDR

    • RRDTool

    • APR – Apache portable runtime

    • Apache HTTPD server

    • PHP-based web interface

HBase works with only 3.0.X and higher version of Ganglia, hence we used 3.0.7 version.

In step 4, we installed the dependencies of libraries, which will be required for the ganglia to compile.

In step 5, we compiled ganglia and installed it by running the configure command, then we used make and then make install command.

In step 6, we created a file gmond.conf, and later on in step 7, we changed the setting to point to HBase master node. We also configured the port to 8649 with a user ganglia who can read from the cluster. By commenting the multicast address and the TTL (time to live), we also changed the UDP-based multicasting to which is a default one to unicasting, which enables us to expand the cluster to above 120 nodes. We also added a master Gmond node in this config file.

In step 8 we started the gmond and got some core monitoring such as CPU, disk, network, memory, and load average of the nodes.

In step 9, we went back to the /u/HBase B/ganglia-3.0.7/ and reran the configuration, but this time, we added configure –with-gmetad, so that it complies with gmetad.

In step 11, we copied the gmetad.conf from.

sudo /u/HBase B/gangli-3.0.7/gmetad/gmetad.conf to /etc/gmetad.conf.

In step 12, we added ganglia user and Master details in the data_source HBase B HBase bMaster.ganglia-monitor.com.

In step 13/14, we create the rrds directory that will hold the data in round-robin databases; later on, we stated the gmetad daemon on the master nodes.

In step 15, we installed all the dependency, which is required to run the web interface.

In step 16, we copied the web .php file from the existing location.

  • (/u/HBase B/ganglia-3.0.7/web) to ( /var/www/ganglia)

In step 17, we restarted the apache instance and saw all the basic graphs, which provides the details of the nodes and the host but not HBase details. We also copied it to all the nodes so that we have a similar configuration and the Ganglia master is getting the data from the child nodes.

In step 18, we changed the setting in hadoop-metrics2-HBase .properties so that it starts collecting the metrics and starts sending it to the ganglia servers on port 8649. The main class that is responsible for providing these details is org.apache.hadoop.metrics2.sink.FileSink and it properties.

Now we point at the URL of master, and once the page is rendered, it starts showing the graphs as described by the image HBase -Ganglia-MasterAndRegion01-01.png. It starts showing the following graphs:

  • Memory and CPU usage

  • JVM details (GC cycle, memory consumed by JVM, threads used, heap consumed, and so on)

  • HBase Master details

  • HBase Region compaction queue details

  • Region server flush queue utilizations

  • Region servers IO

There is more…

Ganglia is used for monitoring very large cluster, and in the word of Hadoop/HBase , it can be very useful as it provides the following:

  • JVM

  • HDFS

  • Map reduce

  • Region compaction time

  • Region store files

  • Region block cache hit ratio

  • Master spilt size

  • Master split number of operations

  • Region block free

  • Name Node activities

  • Secondary name node details

  • Disk status

About the Author

  • Ruchir Choudhry

    Ruchir Choudhry is a principle architect in one of the largest e-commerce companies and specializes in leading, articulating, technology vision, strategizing, and implementing very large-scale software engineering-driven technology changes, with a track record of over 16 years of success.

    He was responsible for leading strategy, architecture, engineering, and operations of multitenant e-commerce sites and platforms in US, UK, Brazil, and other major markets for Walmart. The sites helped Walmart enter new markets and/or grow its market share. The sites combined service millions of customers and take in orders with annual revenues exceeding $2.0 billion. His personal interest is in performance and scalability. Recently, he has become obsessed with technology as a vehicle to drive and prioritize optimization across organizations and in the world.

    He is a core team member in conceptualizing, designing, and reshaping a new platform that will serve the next generation of frontend engineering, based on the cutting edge technology in WalMart.com and NBC/GE/VF Image ware.

    He has led some of the most complex and technologically challenging R&D and innovative projects in VF Image Ware, Walmart.com and in GE/NBC (China and Vancouver Olympic websites), Hiper World Cyber Tech Limited (which created the first wireless-based payment gateway of India that worked on non-smart phones, which was presented at Berlin in 1999).

    He is the author of more than 8 white papers, which spans from biometric-based single sign on to Java Cards, performance tuning, and JVM tuning, among others. He was a presenter of JVM, performance optimization using Jboss, in Berlin and various other places.

    Ruchir Choudhry did his BE at Bapuji Institute of Technology, MBA in information technology at National institute of Engineering and Technology, and his MS Systems at BITS Pilani.

    He is currently working and consulting on HBase, Spark, and Cassandra.

    He can be reached at [email protected]

    Browse publications by this author

Latest Reviews

(1 reviews total)
I would like to see explored more real world situations.