Home Data Hadoop Blueprints

Hadoop Blueprints

By Anurag Shrivastava , Tanmay Deshpande
books-svg-icon Book
eBook $39.99 $27.98
Print $48.99
Subscription $15.99 $10 p/m for three months
$10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
BUY NOW $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
eBook $39.99 $27.98
Print $48.99
Subscription $15.99 $10 p/m for three months
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
  1. Free Chapter
    Hadoop and Big Data
About this book
If you have a basic understanding of Hadoop and want to put your knowledge to use to build fantastic Big Data solutions for business, then this book is for you. Build six real-life, end-to-end solutions using the tools in the Hadoop ecosystem, and take your knowledge of Hadoop to the next level. Start off by understanding various business problems which can be solved using Hadoop. You will also get acquainted with the common architectural patterns which are used to build Hadoop-based solutions. Build a 360-degree view of the customer by working with different types of data, and build an efficient fraud detection system for a financial institution. You will also develop a system in Hadoop to improve the effectiveness of marketing campaigns. Build a churn detection system for a telecom company, develop an Internet of Things (IoT) system to monitor the environment in a factory, and build a data lake – all making use of the concepts and techniques mentioned in this book. The book covers other technologies and frameworks like Apache Spark, Hive, Sqoop, and more, and how they can be used in conjunction with Hadoop. You will be able to try out the solutions explained in the book and use the knowledge gained to extend them further in your own problem space.
Publication date:
September 2016
Publisher
Packt
Pages
316
ISBN
9781783980307

 

Chapter 1. Hadoop and Big Data

Hadoop has become the heart of the big data ecosystem. It is gradually evolving into a full-fledged data operating system. While there is no standard definition of big data, it is generally said that by big data we mean a huge volume of data, typically several petabytes in size, data arriving at huge velocity such as several thousand clickstreams per second, or data having variety in combination with volume such as images, click data, mails, blogs, tweets and Facebook posts, and so on. A big data-processing system will have to deal with any combination of volume, velocity and variety. These are also known as the 3Vs of big data and are often used to characterize the big data system. Some analysts and companies, most notably IBM, have added a fourth V that stands for veracity, to signify the correctness and accuracy problems associated with big datasets that exists at much lower levels in the enterprise datasets.

In this chapter, we will introduce you to the explosive growth of data around the turn of the century and the technological evolution that has led to the development of Hadoop. We will cover the following topics in this chapter:

  • The technical evolution of Hadoop

  • The rise of enterprise Hadoop

  • Hadoop design and tools

  • Developing a program to run on Hadoop

  • The overview of solution blueprints

  • Hadoop architectural patterns

 

The beginning of the big data problem


The origin of Hadoop goes back to the beginning of the century, when the number of Internet searches started growing exponentially and Google emerged as the most popular Internet search engine. In 1998, when Google started offering an Internet search service, it was receiving only 10,000 search queries per day. By 2004, when Google did its IPO, it was serving 200 million queries per day. By the year 2006, Google users were submitting 10,000 queries per second to this popular search engine. One thousand computers processed a search query in just 0.2 seconds. It should be fairly obvious, by the massive numbers of queries and 50% average year to year growth between 2002 and 2006, that Google could not rely upon traditional relational database systems for its data processing needs.

Limitations of RDBMS systems

A relational database management system (RDBMS) stores data in tables. RDBMSs are the preferred choice for storing the data in a structured form, but the high price and lower performance of RDBMSs becomes a limiting factor to support big data use cases where data comes both in structured and unstructured forms. RDBMSs were designed in the period when the cost of computing and data storage was very high, and data of business relevance was generally available in a structured form. Unstructured data such as documents, drawings and photos were stored on LAN-based file servers.

As the complexity of queries and the size of datasets grow, RDBMSs require investment into more powerful servers whose costs can go up to several hundred thousand USD per unit. When the size of data grows, and the system still has to be reliable, then businesses invest in Storage Area Networks' which is an expensive technology to buy. RDBMSs need more RAM and CPUs to scale up. This kind of upward scaling is called vertical scaling. As the size of RAM and the number of CPUs increase in a single server, the server hardware becomes more expensive. Such servers gradually take the shape of a proprietary hardware solution and create a severe vendor lock-in.

Hadoop and many other NoSQL databases meet higher performance and storage requirements by following a scale out model, which is also called horizontal scaling. In this model, more servers are added in the cluster instead of adding more RAM and CPUs to a server.

Scaling out a database on Google

Google engineers designed and developed Bigtable to store massive volumes of data. Bigtable is a distributed storage system, which is designed to run on commodity servers. In the context of Hadoop, you will often hear the term commodity servers. Commodity servers are inexpensive servers that are widely available through a number of vendors. These servers have cheap replaceable parts. There is no standard definition for commodity servers but we can say that they should cost less than 7000 to 8000 USD per unit.

The scalability and performance of Bigtable and the ability to linearly scale it up made it popular among users at Google. Bigtable has been in production since 2005, and more than 60 applications make use of it, including services such as Google Earth and Google analytics. These applications demand very different size and latency requirements from Bigtable. The data size can vary from satellite images to web page addresses. Latency requirements involve batch processing of bulk data at one end while real-time data serving at the other end of the spectrum. Bigtable demonstrated that it could successfully serve workloads requiring a wide range of class of service.

In 2006, Google published a paper titled Bigtable: A Distributed Storage System for Structured Data (Fay Chang, 2015), which established that it was possible to build a distributed storage system for structured data using commodity servers. Apache HBase, which is a NoSQL key value store on the top of Hadoop Distributed File System (HDFS), is modeled after Bigtable, which is built on the top of Google File System (GFS). The goal of the HBase project is to build a storage system to store billions of rows and millions of columns with real-time querying capabilities.

Parallel processing of large datasets

With the growing popularity of Google as the search engine preferred by Internet users, the key concern of engineers at Google became keeping its search results up to date and relevant. As the number of queries exponentially grew together with the searchable information on the World Wide Web, Google needed a fast system to index web pages. In 2004, Google published a paper titled MapReduce: Simplified Data Processing on Large Clusters (Dean & Ghemawat, 2004). This paper described a new programming model named MapReduce to process large data sets. In MapReduce, data processing is mainly done in two phases, which are known as Map and Reduce. In the Map phase, multiple intermediate key/values are created using a map function specified by the user from a key/value pair. In the Reduce phase, all intermediate key/values are merged to produce the results of processing.

MapReduce based programming jobs can run on a single computer to thousands of commodity servers each costing few thousand dollars. Programmers find MapReduce easy to use because they can take the benefit of parallel processing without understanding the intricacies of complex parallel processing algorithms. A typical Hadoop cluster will be used to process from a few terabytes to several hundreds of petabytes of data.

Note

Nutch project 

From 2002 to 2004, Doug Cutting and Mike Cafarella were working on the Nutch project. The goal of the Nutch project was to develop an open source web scale crawler type search engine. Doug Cutting and Mike Cafarella were able to demonstrate that Nutch was able to search 100 million pages on four nodes. In 2004, after the publication of the MapReduce white paper, Cutting and Cafarella added a distributed file system (DFS) and MapReduce to Nutch. This considerably improved the performance of Nutch. On 20 nodes, Nutch was able to search several 100 millions of web pages but it was still far from web scale performance.

 

Building open source Hadoop


In 2006, Doug Cutting joined Yahoo in a team led by Eric Baldeschweiler (also known as eric14 or e14). This team had grid computing experts and users. Eric was in charge of figuring out how to build a next generation search grid computing framework for web searches. Here is a quote from a Yahoo employee at that time that described the situation prevailing at that time:

"Fortunately, and I remember the day well, Eric14 assembled the merry bunch of Grid (then called 'Utility Computing') engineers, and started down the path of rethinking the strategy - focussing on figuring out how to make Hadoop functional, featureful, and robust, instead." (Kumar, 2011)

The new team split out of Hadoop from Nutch with the leadership of Doug Cutting and created an open source Hadoop Framework based upon Hadoop Distributed File System as its storage system, and the MapReduce paradigm as the parallel computing model. Yahoo put more than 300 person-years of effort into Hadoop projects between 2006 - 2011. A team of nearly 100 people worked upon Apache Hadoop, and related projects such as Pig, ZooKeeper, Hive, HBase and Oozie.

In 2011, Yahoo was running Hadoop on over 40,000 machines (>300 cores). Hadoop has over a thousand regular users who use Hadoop for search-related research, advertising, detection of spam and personalization apart from many other topics. Hadoop has proven itself at Yahoo in many revenue driving improvement projects.

Figure 1 Timeline of Hadoop evolution

Nowadays, Hadoop is a top-level project at Apache Foundation. Hadoop is a software library that contains programs that allow processing of very large datasets, also known as big data, on a large cluster of commodity servers using a simple programming model known as MapReduce. At the time of writing this book, Hadoop 2.7.1 is the latest stable version.

It should be evident from the history of Hadoop that it was invented to solve the problem of searching and indexing massive data sets in large Internet companies. The purpose of Hadoop was to store and process the information inside Yahoo. Yahoo decided to make Hadoop open source so that the Hadoop project could benefit from the innovative ideas and involvement of the open source community.

 

Enterprise Hadoop


Large enterprises have traditionally stored data in data warehouse systems for reporting and analysis. These data warehouse systems store data in the order of hundreds of gigabytes, but they rarely match the scale of the storage and processing challenge Hadoop intended to take. Enterprises spend a considerable part of their budget in procuring and running ETL systems, data warehousing software and hardware required to run it. Commercial vendors of Hadoop see the opportunity to grab a share of the data warehousing spending, and increase their market share by catering to the storage and processing of big data.

Let's examine, in the next two sections, the factors which have led to the rise of Hadoop in enterprises.

Social media and mobile channels

Social media and mobile channels have emerged as the prime media through which to conduct business, and to market products and services. This trend is evident across all sectors of industry. For example, airlines use mobiles for bookings and check-ins and banks use social media such as Facebook to inform customers about their latest offerings, and to provide customer support. These channels create new kinds of customer interactions with business that happens several times per week and contain valuable information about customer behavior and preference in raw form. Analyzing this data, with the help of Hadoop, is an attractive proposition for businesses because of the lower cost of storage, and the ability to analyze data quickly.

Data storage cost reduction

Enterprise Data Warehouse Systems procured from the software vendors bring the software license costs of DBMS software, ETL tooling and schedulers with them. A resilient and high performing Enterprise data warehouse hardware setup for a Fortune 500 company could cost several million dollars. Also, 10% to 20% of procurement cost would be paid in the form of annual support services and the salary cost of operational support personnel.

Enterprise Hadoop vendors aim to derive their revenues by expecting that Hadoop can take over the storage and workload of an Enterprise Data Warehouse system in part or full, and thereby it will contribute to the reduction of the IT costs.

Open source Hadoop was not designed keeping the requirements of large enterprises in mind. Business enterprises need fine-grained security and ease of integration with other enterprise systems in Hadoop. Availability of training, and round the clock service and support, when Hadoop supports important business processes, is considered very important in enterprise adoption. Hadoop vendors emerged to fill the gaps in the Hadoop ecosystem and developed a business model to sell service and support to enterprises. They are also working on strengthening the Hadoop ecosystem to make it appealing for the enterprise market. With the help of contributions to open source Hadoop, or by developing proprietary products to enhance the appeal of their specific offering to the enterprise customers, Hadoop vendors are trying to make in roads in enterprise.

At the time of writing this book, several vendors were active in the Hadoop market as described in the next section.

Enterprise software vendors

Enterprise software vendors such as IBM, Teradata, Oracle and SAS have adopted Hadoop as the standard platform for big data processing. They are promoting Hadoop as a complimentary offering in their existing enterprise data warehouse solutions.

IBM Infosphere Big Insights product suite is one such example that packages open source Hadoop with proprietary products such as Infosphere Streams for streaming analytics, and IBM Big Sheets as a Microsoft Excel-like spreadsheet for ad-hoc analysis of data from a Hadoop cluster. IBM leverages its long experience in Enterprise Data Warehouse systems to provide the solutions for security and data lineage in Hadoop.

SAS Visual Analytics is another example in which SAS packages Hadoop as the data store for their line of analytics and visualization products. SAP positions its in-memory analytics system, SAP HANA, as the storage for high-value, often used data such as customer master data, and Hadoop as a system to store information for archiving and retrieval of weblogs, and other unstructured and unprocessed data, because storing such data in-memory on the system would be expensive, and not of much direct value.

Pure Play Hadoop vendors

Pure Play Hadoop vendors have emerged in the past six years. Vendors such as Cloudera, MapR, and Hortonworks fall in this category. These vendors are also very active contributors to open source Hadoop and its ecosystem of other tools. Despite falling into the same category, these vendors are trying to carve out their own niche in Hadoop business.

These vendors do not have a long record of accomplishment in developing and supporting enterprise software where large vendors such as IBM, SAS or SAP enjoy superiority. The familiarity of Enterprise Software vendors with complex integration and compliance challenges in large enterprises bestows on them an edge over Pure Play Hadoop vendors in the lucrative market where Pure Play vendors are relatively inexperienced.

Pure Play Hadoop vendors have a different revenue and growth model. Hortonworks, which is a spinoff company from Yahoo, focuses upon providing services on the Hadoop framework to enterprise, but also to Enterprise Software Vendors such as Microsoft, who have bundled Hadoop in their offering. Hortonworks has repackaged Apache Hadoop and related tools in a product called Hortonworks Data Platform.

Pure Play Hadoop vendor Cloudera is No. 2 in the market in terms of revenue. Cloudera has developed proprietary tools for Hadoop monitoring and data encryption. They earn a fee for licensing these products and providing support for their Hadoop distribution. They have more than 200 paying customers as of Q1 2014, some of who have deployments as large as 1,000 nodes supporting more than a petabyte of data. (Olavsrud, 2014)

MapR is another Pure Play Hadoop player. MapR lacks the aggressive marketing and presence that Hortonworks and Cloudera have. They started early on enhancing the enterprise features of Hadoop when Hadoop implementations were in their infancy in enterprises. MapR has introduced performance improvements in HBase and support for the network filesystem in Hadoop.

Pure Play Hadoop vendors may not be as dominant in enterprises as they would like to be, but they are still the driving force behind Hadoop innovations and making Hadoop a popular data platform by contributing to training courses, conferences, literature, and webinars.

Cloud Hadoop vendors

Amazon was the first company to offer Hadoop as a cloud service with Amazon EMR (Elastic MapReduce). Amazon is very successful with the EC2 service for in-cloud computing and S3 for in-cloud storage. EMR leverages the existing services of Amazon and offers to pay for actually using the model. In addition, Amazon also has Amazon Kinesis as a streaming platform and Amazon RedShift as a data warehousing platform on a cloud, which are the part of the Amazon big data roadmap.

The hosted Hadoop provided by Amazon EMR allows you to instantly provision Hadoop with the right capacity for different workloads. You can access Amazon EMR by using the AWS Management Console, Command Line Tools, SDKS, or the EMR API, which should be familiar to those who are already using the other Amazon cloud services.

Microsoft HDInsight is a Hadoop implementation on the Microsoft Azure cloud. In terms of service offering, like Amazon it leverages existing Azure services and other Microsoft applications. BI Tools such as Microsoft Excel, SQL Server Analysis Services, and SQL Server Reporting Services integrate with HDInsight. HDInsight uses the Hortonworks Data Platform (HDP) for Hadoop distribution.

These cloud-based Hadoop solutions require little setup and management effort. We can upscale or downscale the capacity based upon our workload. The relatively lower cost of initial setup make this offering very attractive for startups and small enterprises who would like to analyze big data but lack the financial resources to set up their own dedicated Hadoop infrastructure.

Despite the benefits of cloud-based Hadoop in terms of lower setup and management costs, the laws of various legal jurisdictions restrict the kind of data that can be stored in the cloud. The laws of the land also severely restrict the kind of analytics permitted on data sets if they involve customers' personal data, healthcare records or financial history. This restriction does not affect the choice of cloud-based Hadoop vendors alone, but all other Hadoop vendors too. However, storing data on a cloud outside the data center of the enterprise and in different legal jurisdictions makes compliance with privacy laws the foremost concern. To make cloud-based Hadoop successful in enterprise, t vendors need to address the compliance-related concerns in various legal jurisdictions.

 

The design of the Hadoop system


In this section, we will discuss the design of Hadoop core components. Hadoop runs on a Java platform. Hadoop has the Hadoop Distributed File System or HDFS in its core as the distributed data storage system, and Map Reduce APIs that make possible distributed parallel processing of distributed data on HDFS. In addition to the Hadoop core components, we will cover the other essential components that perform crucial process coordination among the cluster of computers. The Hadoop ecosystem is undergoing a rapid change driven by community-based innovation.

Note

This book is on Hadoop 2.x and therefore Hadoop refers to Hadoop 2.x releases in this book. If we refer to the older versions of Hadoop then we will make it explicit.

The Hadoop Distributed File System (HDFS)

The Hadoop Distributed File System, or HDFS, enables data storage over a cluster of computers. The computers in the HDFS cluster are regular commodity servers, which are available from hardware vendors such Dell, HP and Acer through their published hardware catalog. These servers come with hard disk drives for data storage. HDFS does not require RAID configuration because it manages the failover and redundancy in the application layer. HDFS is essentially a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are split into blocks and stored in a redundant fashion across multiple computers. This ensures their durability to failure and high availability to parallel applications.

Another example of a distributed file system is the Network File System (NFS). The NFS allows a server to share its storage in the form of shared directories and files on other client machines connected to the network. With the help of NFS, the other machines access the files over the network as if they were stored on a local storage. A server that intends to share its files or directories defines the names of the file and directories in a file. This file is called /etc/exports on Unix systems.

The client machine mounts the exported file system, which enables users and programs to access the resources in the file system locally. The use of NFS lowers data storage costs because the data does not have to be replicated on several machines for multiple users to get access. However, accessing the files over the network leads to heavy data traffic over the network so it requires a good network design in order that the network can deliver optimum performance when several users access the shared file system over the network.

In spite of similarities between HDFS and NFS, the most striking difference between them is the lack of built-in redundancy in NFS. NFS shares the filesystem of one server. If for any reason the server fails or the network goes down, then the file system becomes immediately unavailable to the client machine. If the client machine was in the middle of processing a file from an NFS-based server when the failure took place, then the client program must respond appropriately in the program logic to recover from the failure.

HDFS has the following characteristics, which give it the upper hand in storing a large volume of data reliably in a business critical environment:

  • It is designed to run on commodity servers with just a bunch of disks (JBOD). JBOD is a name for multiple hard drives either separately or as one volume without a RAID configuration.

  • It is designed to minimize seek attempts on disks that are suitable for handling large file sizes.

  • It has a built-in mechanism to partition and store data on multiple nodes in a redundant fashion.

  • It has built-in data replication to available nodes when one node fails.

  • It is a write-once-read-many access model for files that enables high throughput data access.

The design of the HDFS interface is influenced by the Unix filesystem design but close adherence to Unix file system specification was abandoned in favor of improved performance of applications.

Like any other filesystem, HDFS should keep track of the location of data on a large network of computers. HDFS stores this tracking information on a separate system known as NameNode. Other computers in the network store the data and are known as DataNodes. Without NameNode, it is impossible to access the information stored on HDFS because there is no reliable way to determine how data has been distributed on the DataNodes.

When an application needs to process data on HDFS, then the computation is done closer to where the data is stored. This reduces congestion over the network and increases the over throughput. This is particularly useful when the datasets stored on HDFS are huge in size. Distributing processing over multiple nodes enables the parallel processing of data and thereby reduces the overall processing time.

Data organization in HDFS

The Unix file system organizes data into blocks. For example, the ext3 filesystem on Unix has a default block size of 4,096 bytes. Solaris uses a default block size of 8,192 bytes. HDFS also organizes data in blocks. The default block size for HDFS is 128 MB but this is also configurable. The block size is the smallest size a file occupies on a file system even if its size is less than the block size. For example, for a file of 1 MB, the HDFS will take a total of 128 MB storage space on a DataNode if the default block size is configured. A file larger than one block size will take more than one block to store. HDFS stores a whole block on a single machine. It never truncates a block to store it on two or more machines. HDFS sits on the top of a filesystem of an operating system, therefore the filesystem of the OS stores HDFS files in smaller chunks that correspond to the block size in the native filesystem.

Figure 2 NameNode acts as master controlling the slave DataNodes

HDFS is designed to process huge volumes of data in an easy to scale out architecture. The choice of a relatively very large block size supports the intended use of HDFS.

Every block of data stored on HDFS requires a corresponding entry in the NameNode central metadata directory so that when a program needs to access a file on HDFS, the location of the blocks can be tracked to compose the full file as stored. The large block size means that there are fewer entries in the central metadata directory. This speeds up file access when we need to access large files on HDFS.

Figure 3 HDFS splits files in blocks of fixed size

HDFS is a resilient filesystem, which can withstand the failure of a DataNode. A DataNode may experience failure caused by a defective hard disk drive, system failure or network failure. HDFS keeps multiple copies of the same block on different nodes as a backup to cope with failures. HDFS uses these backup copies of the block in the event of failure to reconstruct the original file. HDFS uses a default replication factor of three, which implies that each block of a file in HDFS in stored on three different nodes if the cluster topology so permits.

Figure 4 A block is replicated on three DataNodes

The HDFS coherency model describes visibility of data on the file system during reads and writes of a file. Data distribution and replication on multiple nodes for large files introduces a lag between writing the data and its visibility to other programs.

When a file is created in HDFS, it becomes visible in the namespace of HDFS. However, it is not guaranteed that the contents of the file will be visible to other programs. So the file might appear to have zero length even after flushing the file stream for some time until the first block is written.

The first block of file data becomes visible to other programs when more than one block of data has been already written. The current block, which is still being written, is not visible to the other programs. HDFS provides a method to synchronize the data buffers with the data nodes. After successful execution of the synchronization method, the data visibility is guaranteed up to that point.

Note

The HDFS coherency model has some important implications in application design. If you do not synchronize your buffers with DataNodes then you should be prepared to lose buffered data in case of client or system failure. Synchronization comes at the cost of reduction in throughput. Therefore, synchronization intervals should be tuned by measuring the performance on the application at different sync intervals.

HDFS file management commands

The basic commands of the file management on HDFS should appear similar to the file management commands on the Unix operating system. We cover a small selection of the commonly used HDFS commands here. In order to try out these commands, you will need a single node Hadoop installation:

  1. Create a directory on HDFS:

    hadoop fs -mkdir /user/hadoop/dir1
    
  2. Copy a local file weblog.txt to HDFS:

    hadoop fs -put /home/anurag/weblog.txt /user/hadoop/dir1/
    
  3. List an HDFS directory contents:

    hadoop fs -ls /user/hadoop/dir1
    
  4. Show the space utilization on HDFS:

    hadoop fs -du /user/hadoop/dir1
    
  5. Copy an HDFS file to a file weblog.txt.1 in the local directory:

    hadoop fs -get /user/hadoop/dir1/weblog.txt /home/anurag/weblog.txt.1
    
  6. Get help on HDFS commands:

hadoop fs -help

The preceding examples demonstrate that HDFS commands behave similarly to Unix file management commands. A comprehensive list of HDFS commands is available on the Hadoop page of The Apache Software Foundation website at http://hadoop.apache.org/. (The Apache Software Foundation, 2015).

Note

Hadoop Installation

To get started quickly, you can use the Hadoop Sandbox from Hortonworks, which is available from the link http://hortonworks.com/products/sandbox/. Hortonworks Sandbox is a fast way to get started with many tools in the Hadoop ecosystem. To run this sandbox on VirtualBox or VMWare, you need a good PC with 16 GB or more RAM to get a decent performance.

You can also set up Hadoop from scratch on your PC. This, however, requires you to install each tool separately while taking care of compatibility of those tools on JVM versions and dependencies on various libraries. With a direct installation of Hadoop on the PC, without a virtualization software, you can get a better performance on less RAM. You can also pick and choose which tools you will install. Installation from scratch is a time consuming process. Hadoop installation instructions are available under this link: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html

In the examples given in this book, we have used both Hadoop Sandbox and the bare metal installation of Hadoop from scratch on a Linux server. In both cases, the system had 8 GB RAM.

NameNode and DataNodes

NameNode and DataNodes are the most important building blocks of Hadoop architecture. They participate in distributed data storage and process coordination on the Hadoop cluster. NameNode acts as the central point that keeps track of the metadata of files and associated blocks. NameNode does not store any of the data of the files stored on HDFS. Data is stored on one or more DataNodes.

In an HDFS cluster, NameNode has the role of a master that controls multiple DataNodes acting as workers. The main responsibility of NameNode is to maintain the tree structure of a file system and directories in the tree, and the file system namespace. The NameNode keeps a list of DataNodes for each file where the block of files is stored. This information is kept in the RAM of NameNode and it is reconstructed from the information sent by the DataNodes when the systems starts.

During the operation of a cluster, DataNodes send the information about the addition and deletion of blocks, as a result of file write or delete operations, to the NameNode as shown in Figure 5. NameNode determines which blocks will be stored in which DataNode. DataNodes perform tasks such as block creation, block deletion, and data replication when they are instructed to do so by the NameNode.

Figure 5 DataNodes and NameNode communication

NameNode and DataNode are open source software written in Java, which run on commodity servers. Generally, HDFS clusters are deployed on Linux-based servers. Hadoop software components run in the number of instances of Java Virtual Machine. Java Virtual Machines are available for all major operating systems. As a result, HDFS software can run on many operating systems, but Linux is the most common deployment platform for HDFS. It is possible to run several instances of DataNode on a single machine. It is also possible to run NameNode and DataNode on a single machine. However, the production configuration of HDFS deploys each instance of DataNode on a separate machine, and NameNode on a separate machine that does not run DataNode.

A single NameNode simplifies the architecture of HDFS, as it has the metadata repository and the role of master. DataNodes are generally identical machines operating under a command from NameNode.

A client program needs to contact NameNode and the DataNodes where the data is stored to access the data stored on an HDFS file. The HDFS exposes a POSIX-like filesystem to the client program, so the client programs do not have to know about the inner workings of NameNode and DataNodes in order to read and write data on HDFS. Because HDFS distributes file contents in blocks across several data nodes in the clusters, these file are not visible on the DataNodes local file system. Running ls commands on DataNodes will not reveal useful information about the file or directory tree of HDFS. HDFS uses its own namespace, which is separate from the namespace used by the local file system. The NameNode manages this namespace. This is why we need to use special HDFS commands for file management on HDFS.

Metadata store in NameNode

HDFS namespace is similar to namespace in other filesystems wherein a tree-like structure is used to arrange directories and files. Directories can hold other directories and files. NameNode keeps the information about files and directories in inode records. Inode records keep track of attributes such as permissions, modification and access times, namespace and diskspace quotas. Metadata such as nodes and the list of blocks that identifies files and directories in the HDFS, are called the image. This image is loaded in the RAM when NameNode starts. The persistent record of the image is stored in a local native file of the NameNode system known as checkpoint. The NameNode uses a write-ahead log called a journal in its local file system to record changes.

When a client initiates sending a request to NameNode, it is recorded in the journal or editlog. Before NameNode sends a confirmation to the client about the successful execution of their request, the journal file is flushed and synced. Running an ls command on the local file system of the NameNode where the checkpoint and journal information is stored shows the following:

$ ls -l
total 28
-rw-r--r-- 1 hduser hadoop 201 Aug 23 12:29 VERSION
-rw-r--r-- 1 hduser hadoop  42 Aug 22 19:26
    edits_0000000000000000001-0000000000000000002
-rw-r--r-- 1 hduser hadoop  42 Aug 23 12:29
    edits_0000000000000000028-0000000000000000029
-rw-r--r-- 1 hduser hadoop 781 Aug 23 12:29
    fsimage_0000000000000000027
-rw-r--r-- 1 hduser hadoop  62 Aug 23 12:29
    fsimage_0000000000000000027.md5
-rw-r--r-- 1 hduser hadoop 781 Aug 23 12:29
    fsimage_0000000000000000029
-rw-r--r-- 1 hduser hadoop  62 Aug 23 12:29
    fsimage_0000000000000000029.md5

The files with the fsimage_ prefix are the image files and the files with the edit_ prefix are the edit log of the journal files. The files with the .md5 extension contain the hash to check the integrity of the image file.

The image file format that is used by NameNode is very efficient to read but it is not suitable for making small incremental updates, as the transactions or operations are done in HDFS. When new operations are done in HDFS, the changes are recorded in the journal file instead of the image file for persistence. In this way, if the NameNode crashes, it can restore the filesystems to its pre-crash state by reading image files and then by applying all the transactions stored in the journal to it. The journal or edit log comprises a series of files, known as edit log segments, that together represent all the namespace modifications made since the creation of the image file. The HDFS NameNode metadata such as image and journals (and all the changes to them) should be safely persisted to a stable storage for fault tolerance. This is typically done by storing these files on multiple volumes and on remote NFS servers.

Preventing a single point of failure with Hadoop HA

As Hadoop made inroads into Enterprise, 24/7 availability, with near zero downtime of Hadoop clusters, became a key requirement. Hadoop HA or Hadoop High Availability addresses the issue of a single point of failure in Hadoop.

NameNode in Hadoop forms a single point of failure, if it is deployed without the secondary NameNode. A NameNode contains metadata about the HDFS and it also acts as the coordinator for DataNodes. If we lose a NameNode in a Hadoop cluster, then even with functioning DataNodes the cluster will fail to function as a whole. Before Hadoop 2.x, the NameNode risked this single point of failure. Moreover, it reduced the uptime of the cluster because any planned maintenance activity on NameNode would require it to be taken down in the maintenance window.

The secondary NameNode is a second NameNode, which is used in the Hadoop High Availability (HA) setup. In the HA setup, two NameNodes are deployed in the active-passive configuration in the Hadoop cluster. The active NameNode handles all the incoming requests to Hadoop cluster. The passive NameNode does not handle any incoming requests but just keeps track of the state of the active NameNode, so that it can take over when the active NameNode fails. To keep the state of the active NameNode synchronized with the passive NameNode, a shared file system such as NFS is used. Apart from the shared filesystem, Hadoop also offers another mechanism known as Quorum Journal Manager to keep the state of both NameNodes synchronized.

The DataNodes are aware of the location of both the NameNodes in the HA configuration. They send block reports and heartbeats to both of them. This results in a fast failover to the passive NameNode when the active NameNode fails. (Apache Software Foundation, 2015).

Checkpointing process

The primary role of an HDFS NameNode is to serve client requests that can require the creation of new files or directories on the HDFS, for example, a NameNode can have two other roles in which it can act either as a CheckpointNode or a BackupNode.

A journal file log entry can be 10 to 1,000 bytes in size, but these log entries can quickly grow to the size of journal file. In some cases, a journal file can consume all the available storage capacity on a node, and it can slow down the startup of a NameNode because the NameNode applies all the journal logs to the last checkpoint.

The checkpointing process takes a checkpoint image and a journal file and compacts them into a new image. During the next startup of the NameNode, the state of the file system can be recreated by reading the image file and applying a small journal file log. (Wang, 2014).

Figure 6 The checkpointing process

Data Store on a DataNode

The DataNode keeps files on the native filesystem for each block replica. The first file contains the actual block data of the file. The second file records the metadata of the block including the checksums to ensure data integrity of the block and the generation stamp. A sample listing of a file using ls -l on the data directory of DataNode is shown in the next listing:

$ ls -l
total 206496
-rw-r--r-- 1 hduser hadoop     37912 Aug 22 19:35 blk_1073741825
-rw-r--r-- 1 hduser hadoop       307 Aug 22 19:35 blk_1073741825_1001.meta
-rw-r--r-- 1 hduser hadoop     37912 Aug 22 19:36 blk_1073741826
-rw-r--r-- 1 hduser hadoop       307 Aug 22 19:36 blk_1073741826_1002.meta
-rw-r--r-- 1 hduser hadoop 134217728 Aug 22 19:44 blk_1073741827
-rw-r--r-- 1 hduser hadoop   1048583 Aug 22 19:44 blk_1073741827_1003.meta
-rw-r--r-- 1 hduser hadoop  75497472 Aug 22 19:44 blk_1073741828
-rw-r--r-- 1 hduser hadoop    589831 Aug 22 19:44 blk_1073741828_1004.meta

The size of the data file is equal to the actual block length. If you suppose a file needs less than a single block space, then it doesn't pad it with extra space to fill the full block length.

Handshakes and heartbeats

Before a DataNode is registered in a Hadoop cluster, it has to perform a handshake with the NameNode by sending its software version and namespace ID to the NameNode. If there is a mismatch in either of these with the NameNode, then the DataNode automatically shuts down and does not become part of the cluster.

After the successful completion of a handshake, the DataNode sends a block report to the NameNode containing information about the data blocks stored on the DataNode. It contains crucial information such as the block ID, the generation stamp and length of block copy that the DataNode has stored.

After DataNode has sent the first block report, it will keep sending block reports to the NameNode every six hours (this interval is configurable) with up to date information about the block copies stored on it.

Once DataNode is part of a running HDFS cluster, it sends heartbeats to the NameNode to confirm that the NameNode is alive and the block copies stored on it are available. The heartbeat frequency is three seconds by default. If the NameNode does not receive a heartbeat from a DataNode for 10 minutes, then it assumes that the DataNode is not available any more. In that case, it schedules the process of creation of additional block copies on other available DataNodes.

The NameNode does not send special requests to DataNodes to carry out certain tasks but it uses the replies to heartbeats to send commands to the DataNodes. These commands can ask the DataNode to shut down, send a block report immediately and remove local block copies.

Figure 7 Writing a file on HDFS involves a NameNode and DataNodes (Source: http://www.aosabook.org/en/hdfs.html)

Figure 7 shows how the NameNode and several DataNodes work together to serve a client request.

The NameNode and DataNodes play a crucial role in data storage and process coordination on the HDFS cluster. In the next section, we will discuss Map/Reduce, which is a programming model used by HDFS clusters to process the data stored on them.

 

MapReduce


MapReduce is a programming model used to process and generate large data sets in parallel across multiple computers in a cluster using a distributed algorithm. In the case of Hadoop, the MapReduce programs are executed on a cluster of commodity servers, which offers a high degree of fault tolerance and linear scalability.

MapReduce libraries are available in several programming languages for various database systems. The open source implementation of MapReduce in Hadoop delivers fast performance, not just because of the MapReduce, but also because Hadoop minimizes the expensive movement of data on a network by performing data processing close to where the data is stored.

Until the launch of Apache YARN, MapReduce was the dominant programming model on Hadoop. Though MapReduce is simple to understand at conceptual level, the implementation of MapReduce programs is not very easy. As a result, several higher order tools, such as Hive and Pig, have been invented which let users take advantage of Hadoop's large data set processing capabilities without knowing the inner workings of MapReduce. Hive and Pig are open source tools, which internally use MapReduce to run jobs on Hadoop cluster.

The introduction of Apache YARN (Yet Another Resource Negotiator), gave Hadoop the capability to run jobs on a Hadoop cluster without using the MapReduce paradigm. The introduction of YARN does not alter or enhance the capability of Hadoop to run MapReduce jobs, but MapReduce now turns into one of the application frameworks in the Hadoop ecosystem that uses YARN to run jobs on a Hadoop cluster.

From Apache Hadoop version 2.0, MapReduce has undergone a complete redesign and it is now an application on YARN, and called MapReduce version 2. This book covers MapReduce Version 2. The only exception is the next section, where we discuss MapReduce Version 1 for background information to understand YARN.

The execution model of MapReduce Version 1

In this section, we will discuss the execution model of MapReduce Version 1 so that we can better understand how Apache YARN has improved it.

MapReduce programs in Hadoop essentially take in data as their input and then generate an output. In MapReduce terminology, the unit of work is a job which a client program submits to a Hadoop cluster. A job is broken down into tasks. These tasks perform map and reduce functions.

Hadoop controls the execution of jobs with the help of a JobTracker and a number of TaskTrackers. JobTrackers manage resources and all the jobs scheduled on a Hadoop cluster. Several TaskTrackers run tasks and periodically report the progress to the JobTracker, which keeps track of the overall progress of a job. The JobTracker is also responsible for rescheduling a task if it fails.

In Hadoop, data locality optimization is an important consideration when scheduling map tasks on nodes. Map tasks are scheduled on the node where the input data resides in the HDFS. This is done to minimize the data transfer over the network.

Hadoop splits the input to MapReduce jobs into fixed size chunks. For each chunk, Hadoop creates a separate map task that runs the user-defined map function for each record in the chunk. The records in each chunk are specified in the form of key-value pairs.

An overview of a MapReduce processing stage is shown in Figure 8:

Figure 8 MapReduce processing stages

Apache YARN

Apache YARN provides a more scalable and isolated execution model for MRv2. In MRv1, a singular JobTracker handled resource management, scheduling and task monitoring work. To keep the backwards compatibility, the MRv1 framework has been rewritten so that it can submit jobs on top of YARN.

In YARN, the responsibilities of the JobTracker have been split into two separate components. These components are as follows:

  • ResourceManager

  • ApplicationMaster

ResourceManager allocates the computing resources to various applications running on top of Apache YARN. For each application running on YARN, ApplicationMaster manages the lifecycle of the application. These two components run as two daemons on a cluster.

YARN architecture also introduces the concept of the NodeManager that manages the Hadoop processes running on that machine.

The ResourceManager runs two main services. The first service is a pluggable Scheduler service. The Scheduler service manages the resource scheduling policy. The second service is the ApplicationsManager, which manages the ApplicationMasters by starting, monitoring, and restarting them in case they fail.

A container is an abstract notion on the YARN platform representing a collection of physical resources such as the CPU cores and disk, along with the RAM. When an application is about to get submitted into the YARN platform, the client allocates a container from the ResourceManager, where its ApplicationMaster will run.

Figure 9 (The Apache Software Foundation, 2015) explains the execution model of Hadoop with YARN.

Readers who are interested to learn about YARN in detail can find elaborate information on the Cloudera blog. (Radwan, 2012)

 

Building a MapReduce Version 2 program


We have done sufficient groundwork to understand the Hadoop data storage and computation model in previous sections. Now we can write our first MapReduce program to put our knowledge in practice.

Problem statement

In this problem, we will calculate the yearly average stock price of IBM from the daily stock quotes.

Publicly traded companies have fluctuating stock prices. The stock prices are available on various finance portals where you can track day-by-day movement in stock prices. Such datasets are in the public domain. We will download one such dataset that contains the historical daily stock price of IBM (Symbol: IBM). The historical stock price of IBM is available on Yahoo Finance in various formats on this URL: http://finance.yahoo.com/q/hp?s=IBM. The historical price dataset covers the stock prices from 2nd Jan 1962 until today.

Solution workflow

We will divide the solution to stock averaging into a problem in several small steps as follows:

  1. Get the dataset

  2. Study the dataset

  3. Cleanse the dataset

  4. Load the dataset on the HDFS

  5. Code and build a MapReduce program

  6. Run the MapReduce program

  7. Examine the result

  8. Further processing of the results

Each small step will help bring us closer to the final solution. Note that we are running a single node Hadoop cluster on an Ubuntu machine installed on Virtual Box. The Virtual Box itself is running on OS X Yosemite version 10.10.2.

On my Ubuntu machine, I can check the OS version as follows:

hduser@anurag-VirtualBox:~$ uname -a
Linux anurag-VirtualBox 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

Getting the dataset

You can see the historical stock price of IBM on Yahoo Finance by visiting its URL at http://finance.yahoo.com/q/hp?s=IBM. You can view the prices on your browser as shown in Figure 10. We are interested in daily stock prices so that we can create a yearly average.

We will first get the dataset from Yahoo's finance website using the wget command, and then save the results in a file called ibmstockquotes.txt:

hduser@anurag-VirtualBox:~$ wget -O ibmstockquotes.txt http://real-chart.finance.yahoo.com/table.csv?s=IBM 
--2015-08-24 19:52:51--  http://real-chart.finance.yahoo.com/table.csv?s=IBM 
Resolving real-chart.finance.yahoo.com (real-chart.finance.yahoo.com)... 188.125.66.140 
Connecting to real-chart.finance.yahoo.com (real-chart.finance.yahoo.com)|188.125.66.140|:80... connected. 
HTTP request sent, awaiting response... 200 OK 
Length: unspecified [text/csv] 
Saving to: 'ibmstockquotes.txt' 
 
    [   <=>                                 ] 861,145     1.67MB/s   in 0.5s 
 
2015-08-24 19:52:52 (1.67 MB/s) - 'ibmstockquotes.txt' saved [861145] 

Now we have downloaded the historical stock price data of IBM going back to January 02, 1962 in a file. The file ibmstockquotes.txt contains this data. This file has 13,504 lines in this file, as of August 24, 2015, but none of these lines would depend on when you downloaded this data:

hduser@anurag-VirtualBox:~$ wc -l ibmstockquotes.txt
13504 ibmstockquotes.txt

Figure 10 The historical stock price of IBM on Yahoo! Finance

Studying the dataset

Let's open the dataset using the head command and examine its contents:

hduser@anurag-VirtualBox:~$ head ibmstockquotes.txt
Date,Open,High,Low,Close,Volume,Adj Close
2015-08-21,151.50,153.190002,148.699997,148.850006,7304900,148.850006
2015-08-20,152.740005,153.910004,152.50,152.660004,3949500,152.660004
2015-08-19,155.149994,155.669998,153.410004,153.940002,4177100,153.940002
2015-08-18,155.509995,156.520004,155.25,156.009995,2013800,156.009995
2015-08-17,155.199997,156.690002,154.699997,156.309998,2242100,156.309998
2015-08-14,155.00,156.210007,154.580002,155.75,3220300,155.75
2015-08-13,156.059998,156.089996,154.320007,155.070007,2505800,155.070007
2015-08-12,154.259995,156.520004,153.949997,156.160004,3560300,156.160004
2015-08-11,155.960007,155.990005,154.860001,155.509995,3143300,155.509995

In this dataset, we have the date, the opening stock quote, the day's high, the day's low, the traded volume and the closing price. The fields are separated by a comma and the first line in the dataset is the header. We will use the opening stock quote to calculate the average. Except for the date and the opening quote, all other fields in this dataset will not be used in the solution.

Cleaning the dataset

If the quality of the dataset is not very good, then it should be cleansed before we load it on our single node Hadoop cluster. A good quality dataset is a must for processing. In a bad quality dataset, you might find problems such as missing data fields, data field header mismatches, missing entries and missing delimiters such as commas.

Tip

For very large datasets, it is time-consuming to visually scan the data line by line to check its quality. Therefore, we can cleanse the datasets using common Unix tools such as awk, sed, and grep, or commercial tools such as Talend Open Studio for Data Quality.

Our dataset ibmstockquote.txt is a clean and well-structured dataset, which does not require much cleansing. The first row in the dataset is the header data, which need not be processed. Using a text editor tool such as vieditor, we will remove the first line containing the header from this dataset. We can also remove the first line using the Unix stream editor sed as follows:

$sed '1d' ibmstockquote.txt > ibmstockquote.clean.txt 

We also notice that this file contains additional data such as the day's low, day's high, the volume and the closing price, which we do not need to process. We can either remove this data from this dataset, or just leave it as it is, in case we need it for other problems. In this case, we just leave the additional data in the dataset.

At the end of this step, our dataset ibmstockquote.txt has the header line removed, and now it is ready to be loaded on the Hadoop cluster in the next step.

Loading the dataset on the HDFS

We will use the Hadoop filesystem command to put our dataset on the HDFS. We will first create a directory structure on the HDFS to store our dataset. We will use this directory structure to put the dataset ibmstockquote.txt from our local filesystem on the HDFS.

Let's list the root directory of our Hadoop cluster:

hduser@anurag-VirtualBox:~$ hadoop fs -ls /
Found 2 items
drwx------   - hduser supergroup          0 2015-08-24 11:53 /tmp
drwxr-xr-x   - hduser supergroup          0 2015-08-24 10:50 /user

We will make a new directory structure for our examples, which will be /hbp/chapt1:

hduser@anurag-VirtualBox:~$ hadoop fs -mkdir /hbp
hduser@anurag-VirtualBox:~$ hadoop fs -mkdir /hbp/chapt1

We will copy our dataset in the new directory /hbp/chapt1:

hduser@anurag-VirtualBox:~$ hadoop fs -put ibmstockquotes.txt /hbp/chapt1

Let's examine the contents of the directory /hbp/chapt1 to see if our file is on the Hadoop filesystem:

hduser@anurag-VirtualBox:~$ hadoop fs -ls /hbp/chapt1
Found 1 items
-rw-r--r--   1 hduser supergroup     861145 2015-08-24 21:00 /hbp/chapt1/ibmstockquotes.txt

We can also check the contents of the directory by using the web interface of the HDFS on the URL http://localhost:50070/explorer.html#/hbp/chapt1 as shown in Figure 11. We are running a single node Hadoop cluster locally on the PC. In a production environment, typically the name localhost will be replaced with the hostname or IP address of the NameNode.

Figure 11 Browsing an HDFS directory using a web interface

Click on the link ibmstockquotes.txt. We can see that the block size for this dataset is 128 MB, and it has occupied exactly one block on the HDFS. If you click the filename link then you will see the additional information such as the block ID and generation stamp, as shown in Figure 12.

Figure 12 Additional block information

Starting with a MapReduce program

In this example, we will write a MapReduce program using the Java programming language. For Java programming, we will make use of the Eclipse IDE to build and package the programs.

Installing Eclipse

You can download Eclipse from https://www.eclipse.org/downloads/.

Note

We have used Eclipse Java EE IDE, 64 bit, for Web Developers Mars Release (4.5.0) in the examples used in this book.

I am using a MacBook Pro to run Eclipse. After installing of Eclipse, launch it by clicking on the Eclipse icon.

The Eclipse select workspace dialog should pop up. This indicates that Eclipse has been successfully installed.

Let's create a new workspace in Eclipse in the directory <your directory>/workspace/hbp/chapt1 and go to the Eclipse workbench by clicking the workbench icon.

We will now install the Hadoop Development tools. You can download the Hadoop Development Tools from http://hdt.incubator.apache.org/download.html.

After downloading, unzip and untar the file in your local directory. Now go to Eclipse Help | Install New Software. In the pop-up dialog, as shown in Figure 13 Adding Hadoop Development Tools in Eclipse, click on the Add. button. You will see another dialog box. In this dialog box, specify the local repository in the directory where you have untared the downloaded file.

Figure 13 Adding Hadoop development tools in Eclipse

We have now set up the environment to start creating our MapReduce project in Eclipse.

Creating a project in Eclipse

We will create a Maven project in Eclipse. Navigate to File | New | Maven Project. We will see the window shown in the following screenshot:

Figure 14 File | New | Maven Project

Check the default workspace location and click on the Next button. In the next window, shown in Figure 15, we choose archetype-quickstart and click on the Next button.

Figure 15 Select an archetype

We will see the window shown in Figure 16. In this window, we will specify the GroupID and ArtifactId as show in the window. Now click on the Finish button. This will trigger the creation of a Maven project in Eclipse. Eclipse will create the file pom.xml which contains build dependencies and the basic directory structure for the project. It will also create an App.java file. We will not use this file, so you can delete it from your project.

Figure 16 Specify GroupId and ArtifactId

We will need to specify the dependency on the Hadoop libraries in the pom.xml of Maven so that we can build our programs. To do so, you should open the pom.xml file in your newly created project. Add the following lines in the dependencies section of the pom.xml file as shown in Figure 17:

<dependency> 
<groupId>org.apache.hadoop</groupId> 
<artifactId>hadoop-client</artifactId> 
<version>2.7.1</version> 
</dependency> 

Figure 17 Project dependencies in the pom.xml file

Coding and building a MapReduce program

We are building a simple MapReduce program in Java. This program has three Java files:

  • Mapper : StockAverageMapper.java

  • Reducer: StockAverageReducer.java

  • Driver: StockAverageDriver.java

We will first create our Mapper file by navigating to File | New | Other in Eclipse. Locate Hadoop in the dialog that has just popped up, as shown in Figure 18. Click on the Next button.

Figure 18 Add a new Mapper class

On the next screen, specify the name of your Mapper class, which is StockAverageMapper, as shown in Figure 19.

Figure 19 Specify the Mapper class name

Now open the newly created StockAverageMapper.java file in Eclipse, and replace the contents of the file with the listing given here:

package hbp.chapt1; 
 
import java.io.IOException; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
 
public class StockAverageMapper extends Mapper<LongWritable, Text,  Text, DoubleWritable> { 
  private DoubleWritable quote = new DoubleWritable(1); 
    private Text word = new Text(); 
  public void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException { 
       
    //Extract the tokens from the line text  
    String line = value.toString(); 
    String[] tokens = line.split(","); 
     
    //Extract the year value from date 
    String year = tokens[0].split("-")[0]; 
     
    //Extract the stock quote and convert it into a number 
    String quoteStr = tokens[1]; 
    double quoteVal = Double.parseDouble(quoteStr); 
         
    //Set the key  
        word.set(year); 
         
        //Set the value 
        quote.set(quoteVal); 
         
        context.write(word, quote); 
         
  } 
} 

Using steps similar to the ones described in the Mapper class creation, you will now create the Reducer class. Replace the contents of the newly created class with the following listing:

package hbp.chapt1; 
 
import java.io.IOException; 
 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
 
public class StockAverageReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { 
 
  public void reduce(Text key, Iterable<DoubleWritable> values,    Context context) 
      throws IOException, InterruptedException { 
    double quoteAverage = 0; 
    double quoteTotal = 0; 
    int quoteCount = 0; 
    for (DoubleWritable value : values) { 
      quoteTotal += value.get(); 
      System.out.println("Reducer: " + key + " "+ quoteTotal); 
      quoteCount++; 
    } 
    quoteAverage = quoteTotal/quoteCount; 
    context.write(key, new DoubleWritable(quoteAverage)); 
  } 
 
} 

Using steps similar to the ones described in the Mapper class creation, you will now create the Driver class. Replace the contents of the newly created class StockAverageDriver with the following listing:

package hbp.chapt1; 
 
import java.io.IOException; 
 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
public class StockAverageDriver { 
 
  public static void main(String[] args) throws IOException,
  InterruptedException, ClassNotFoundException { 
     
    Job job = Job.getInstance(); 
   
    job.setJarByClass(StockAverageMapper.class); 
   
    job.setJobName( "First Job"  ); 
   
    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
   
    job.setMapperClass(StockAverageMapper.class); 
    job.setReducerClass(StockAverageReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(DoubleWritable.class); 
   
    boolean success = job.waitForCompletion(true); 
    System.exit(success ? 0 : 1); 
  }; 
 
} 

Run the MapReduce program locally

We are now ready to run our MapReduce program. We will first run this program locally on our Unix file system before running it on HDFS. In Eclipse, click on Run | Run Configurations…. We will see the dialog shown in Figure 20 Input file and output directory for MapReduce job. We go to the Arguments tab and specify these two values in the Program arguments field:

  • Input filename: ibm-stock.csv

  • Output directory name: output

Make sure that the file ibm-stock.csv exists in your local project directory. Click on the Run button now. Congratulations. Now you are running your MapReduce program.

Figure 20 Input file and output directory for MapReduce job

Examine the result

After running the program, go to the project directory using a terminal window. Run an ls -l command in your shell. You will see a new directory output. This directory has been created by running your MapReduce program. Run the directory listing for the output directory using an ls -l output command.

In the output directory, you will see two files. The first file is _SUCCESS which indicates that our program has run successfully. The second file is part-r-00000 which contains the results of our MapReduce execution.

Figure 21 Examine the results of the program execution

Run the MapReduce program on Hadoop

We have successfully run the MapReduce program on our desktop. We used this program to process the files stored on our local file system Now, let's run this program on our Hadoop server on a file stored on the HDFS.

Create a JAR file for your program. To do so, right-click on the pom.xml file and go to Run As | Maven build. You will see the Edit Configuration window. Type package in the Field Goals and click on the Run button. You have now created a JAR file called chapt1-0.0.1.SNAPSHOT.jar.

First let's copy the file MapReduce.jar file onto our Hadoop system, where we have the correct environment and libraries to run this program. We have copied this file to our Hadoop system using the scp command:

$ pwd
/Users/anurag/hdproject/eclipse/chapt1
$ ls
ibm-stock.csv  pom.xml    target
output    src
$ ls target
chapt1-0.0.1-SNAPSHOT.jar  maven-status
classes        surefire-reports
maven-archiver      test-classes
$ scp chapt1-0.0.1-SNAPSHOT.jar hduser@192.168.2.120:/home/hduser
hduser@192.168.2.120's password:
chapt1-0.0.1-SNAPSHOT.jar                           100% 5095     5.0KB/s   00:00

You will recall that you have already copied ibm-stock.csv onto the HDFS in the HDFS directory /hbp/chapt1. You can verify this by running the following command:

hduser@anurag-VirtualBox:~$ hadoop fs -ls /hbp/chapt1
Found 1 items
-rw-r--r--   1 hduser supergroup     861145 2015-08-24 21:00 /hbp/chapt1/ibmstockquotes.txt

Now we will run our program on the Hadoop system using the following command:

hduser@anurag-VirtualBox:~$ hadoop jar chapt1-0.0.1-SNAPSHOT.jar hbp.chapt1.StockAverageDriver /hbp/chapt1/ibmstockquotes.txt /hbp/chapt1/output

Let's examine the contents of the output directory:

hduser@anurag-VirtualBox:~$ hadoop fs -ls /hbp/chapt1/output15/09/12 
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2015-09-12 19:16 /hbp/chapt1/output/_SUCCESS
-rw-r--r--   1 hduser supergroup       1273 2015-09-12 19:16 /hbp/chapt1/output/part-r-00000

We can see the part-r-00000 file on the HDFS that contains the output of our MapReduce job.

Further processing of results

We have successfully run our MapReduce job but our results are still stored on the HDFS. Now we can use the HDFS copy command to copy the results file to the local filesystem for further processing using other tools such as Microsoft Excel:

hduser@anurag-VirtualBox:~$ hadoop fs -get  /hbp/chapt1/output/part-r-00000 /home/hduser/results.csv

Let's view the contents of the file:

hduser@anurag-VirtualBox:~$ head results.csv
1962  433.3511795396825
1963  448.48554415139404
1964  489.16551569960467
1965  480.82985311111105
1966  408.9866005873015
1967  499.4905379123505
1968  425.8838481415928
1969  328.94249967600007
1970  297.1336121732284
1971  321.8779635454545

We have averaged the stock prices by year using Hadoop.

 

Hadoop platform tools


We have successfully run our first MapReduce program on Hadoop written in Java. The Hadoop ecosystem offers a rich set of tools to perform various activities on the Hadoop cluster. The rich tool set is a mix of open source tools and commercial tools available from vendors. The Hadoop ecosystem of tools improves continuously through the very active open source community and commercial vendors who actively contribute to improvements.

In this section, we cover some of the popular tools that we will use to build solutions from Chapter 2, A 360-Degree View of Customer, onwards in this book. A very comprehensive list of tools is available on a website known as Hadoop Illuminated at this URL: http://hadoopilluminated.com/hadoop_illuminated/Bigdata_Ecosystem.html.

Figure 22 Hadoop: The ecosystem of tools

In the following sections, we will cover a brief overview of the tool after having learned what HDFS, MapReduce v2 and YARN are capable of. In the upcoming chapters, we will cover the installation and usage of these tools as we start using them.

Data ingestion tools

Data ingestion is the process of reading data from a source outside the HDFS and loading them onto the HDFS for storage and further processing. The data can come from flat files, relational database management systems, and through live data feeds. Three common tools to ingest incoming data in Hadoop are as follows:

  • Sqoop: Hadoop usually coexists with other databases in the enterprise. Apache Sqoop is used to transfer the data between Hadoop and relational database systems or mainframe computers that are ubiquitous in enterprises of all sizes. Typically, you can use Sqoop to import the data from Oracle or SQL Server, transform it using MapReduce, or other tools such as Hive and Pig, and then export the data back to other systems. Sqoop internally uses MapReduce to import data into the Hadoop cluster. Relational database management systems are used to store operational data such as point of sale transactions, orders and customer master data. Apache Sqoop offers fast data loading in HDFS by running data loading tasks in parallel.

  • Flume: Often we have to load data coming from streaming data sources in HDFS. Streaming data sources delivers data in the form of events, which arrive at random or fixed time intervals.

    Flume is used to ingest large volumes of data coming from sources such as web logs, click stream logs, twitter feeds and livestock feeds in the form of events. In Flume, such data is processed as one event at a time. Flume allows you to build multi-hop flows where an event can travel through before it is stored in the HDFS. Flume offers reliable event delivery. Events are removed from a channel only if they are stored in the next channel, or persisted in a permanent data store such as the HDFS. The Flume channel manages the recoverability of the events in case of failure by using a durable channel that makes use of the local filesystem.

  • Kafka: Kafka is a messaging system based on a publish subscribe (pubsub) model of messaging commonly used by JMS and ActiveMQ. However, the design of Kafka is akin to a distributed, partitioned and replicated commit log service. Kafka maintains message feeds, which are classified using categories known as topics. Producers are the processes that publish the messages to the topics so that Consumers can consume them. In the pubsub model, a published message gets broadcast to all the consumers. Kafka also supports a queuing model in which a published message gets picked by exactly one consumer who is part of a consumer group.

    Between producers and consumers, Kafka acts as a broker running on a cluster of servers. Typical use cases for Kafka are log aggregation, website activity monitoring and stream processing. Despite its similarity to Flume, Kafka is a general purpose commit-log service suitable for Hadoop, but also for many other systems, while Flume is a solution designed for Hadoop alone. It is not uncommon to see Kafka and Flume working together to build a data ingestion flow for the HDFS.

Data access tools

With the help of data access tools, you can analyze the data stored on Hadoop clusters to gain new insights from the data. Data access tools help us with data transformation, interactive querying and advanced analytics. In this section, we will cover commonly used open source data access tools available from the Apache Software Foundation:

Note

In addition to open source tools, several commercial tools are available from vendors such as Datameer, IBM, and Cloudera

  • Hive: SQL is a widely understood query language. Hive provides an SQL-like query language known as Hive Query Language(HQL). Though HQL is limited in features compared to SQL language, it is still very useful to developers who are familiar with running SQL queries on relational database management systems. A bigger group of database programmers can be engaged in the Hadoop ecosystem, using Hive. The Hive service breaks down HQL statements into MapReduce jobs, which execute on the Hadoop cluster like any other MapReduce job. From the Hive 1.2 release, we can take advantage of Apache Tez. Apache Tez is a new application framework built on top of YARN. Using Apache Tez instead of MapReduce solves some of the inefficiencies associated with the planning and execution of queries. This makes Hive perform faster.

  • Pig: Apache Pig is another tool for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with an infrastructure for evaluating these programs. Pig uses a special language called Pig Latin. This programming language uses high-level constructs to shield the programmers from the complexity of coding MapReduce programs in Java. Pig comes with its own command line shell. You can load HDFS files into Pig and perform various transformations, and then store the results on the HDFS. Pig translates transformation tasks into MapReduce jobs. Once Pig scripts are ready they can be run with the help of a scheduler without any manual intervention to perform routine transformations.

  • Hbase: Apache Hbase is a NoSQL database that works on top of the HDFS. Hbase is a distributed and non-relational column-oriented data store modeled after Google's Bigtable. It gives you random, real-time, read and write access to data, which is not possible in HDFS. It is designed to store billions of rows and millions of columns. Hbase uses HDFS to store data but it is not limited to HDFS alone. Hbase delivers low access latency for a small amount data from a very large data set.

  • Storm: Apache Storm is a distributed near real-time computation system. Similarly to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for performing real-time computation. It defines its workflows in Directed Acyclic Graphs (DAG's) called topologies. These topologies run until they are shut down by the user or until they encounter an unrecoverable failure. Storm can read and write data from the HDFS. In its original form, Storm did not natively run on Hadoop clusters but it used Apache Zookeeper and its own master/ minion worker processes to coordinate topologies with a master and worker state. Now Storm is also available on YARN, bringing real-time streamed data-processing capabilities in Hadoop.

  • Spark: Apache Spark is the newest kid on the block. It performs the memory processing of data stored in the HDFS. Hadoop is inherently a batch-oriented data processing system. In order to build a data pipeline, we have to read and write data several times on the HDFS. Spark addresses this issue by storing the data in memory, which makes low latency data processing possible after the initial load of data from the HDFS into the RAM. Spark provides an excellent model for performing iterative machine learning and interactive analytics. However, Spark also excels in some areas similar to Storm's capabilities, such as near real-time analytics and ingestion. Apache Spark does not require Hadoop to operate. However, its data parallel paradigm requires a shared file system for the optimal use of stable data. The stable source can range from Amazon S3, NFS, MongoDB or, more typically, HDFS. (Ballou, 2014). Spark supports a number of programming languages such as Java, Python, R, and Scala. Spark supports relational queries using a Spark SQL, machine learning with Mlib, graph processing with GraphX, and streaming with Spark Streaming.

Monitoring tools

Hadoop monitoring tools monitor its infrastructure and resources. This information is useful to maintain the quality of service as per the service level agreements, and monitor them over and under the utilization of deployed resources for financial charging and capacity management. Ambari is the most popular open source tool for Hadoop monitoring. Hadoop vendor Cloudera has a proprietary tool, called Cloudera Manager, which is also used to monitor Cloudera Hadoop installations

Note

Cloudera Manager has been positioned as the Enterprise Hadoop Admin System by Cloudera. It helps in the deployment and management of Hadoop clusters. Monitoring is just a subset of many other features offered by it.

In this section, we will cover Ambari.

Apache Ambari is a monitoring tool for Hadoop. It also covers the provisioning and management of Hadoop clusters. Ambari has a REST interface that enables the easy integration of Hadoop provisioning, management, and monitoring capabilities to other applications in the enterprise. You can get an instant insight into the health of a running Hadoop cluster by looking at the web-based dashboard of Ambari (Zacharias, 2014). Ambari can collect the operational metrics of a Hadoop cluster for analysis later. Using the alert framework of Ambari, you can define the rules to generate alerts that you can act upon.

Figure 23 Ambari dashboard

Data governance tools

Data governance (DG) refers to the overall management of the availability, usability, integrity, and security of the data employed in an enterprise. The data governance architecture of Hadoop is evolving, and is still very underdeveloped. Presently, there is no comprehensive data governance within the Hadoop stack, and integration with the external governance frameworks is lacking.

Industry is responding to the need to build a comprehensive data-governance architecture, because Hadoop is becoming the central building block of big data processing systems in enterprises.

Apache Atlas: The aim of the Apache Atlas initiative is to deliver a comprehensive data governance framework. The goals of this initiative are:

  • Support for data classification

  • Support for centralized auditing

  • Proving a search and lineage function

  • Building a security and policy engine

 

Big data use cases


In the previous sections of this chapter, we discussed the design and architecture of Hadoop. Hadoop and its powerful ecosystem of tools provides a strong data platform to build data-driven applications. In this section, you will get an overview of the use cases covered in this book. These use cases have been derived from real business problems in various industry sectors, but they have been simplified to fit a chapter in this book. You can read from Chapter 2, A 360-Degree View of the Customer, onwards in any order because each chapter is complete in itself.

Creating a 360 degree view of a customer

A 360 degree view of a customer combines information about a customer's attitude, behavior, preferences and static data such as date of birth, and presents it as a single integrated view. Call center agents and field sales agents use this information to better understand the customer's needs and to offer better services or sell the right product.

Large financial institutions operating in the retail market have millions of customers. These customers buy one or more financial products from such institutions.

Large enterprises use master data-management systems to store customer data. The customer data includes key details about the customers such as their name, address and data of birth. Customer service processes use this information to identify a customer during the processing of service requests. Marketing processes use this information to segment customers for direct mailings. The data stored in the MDM systems remain static for several months, if not for several years, because a change in customer addresses does not happen too often, and the other data, such as the name or date of birth, remains unchanged in the lifetime of the customer. The information stored in the MDM systems is very reliable because it undergoes multiple levels of checks before it is stored in the system. Many times, the information in the MDM system is directly taken from the customer data form filled in by the customers.

Despite the high quality of data in the MDM systems or other enterprise data stores, the data in these systems does not create a complete view of the customers. The views created from the static data lack information about what is happening currently that might define the nature of the product or service required by the customer. For example, if a customer is deeply unsatisfied with a product then trying to sell them another accessory will be counterproductive. A 360-degree view should attempt to capture the information about the products in the customer's possession, but also information about his recent experience with the product.

In Chapter 2, A 360-Degree View of the Customer, we will build a 360 degree view of a customer by combining information available in the enterprise data store and information available via social media. We will use Hadoop as the central data warehouse to create the 360-degree view.

Fraud detection systems for banks

According to a report published in Forbes in 2011 (Shaughnessy, 2011), merchants in the United States lose $190 billion per year owing to credit card fraud. Most of this credit card fraud originates from online channels. Banks lose $11 billion to fraud. With the proliferation of digital channels, online fraud is on the rise, and therefore timely fraud detection makes a strong business case for the banks. A solid fraud detection system helps banks in two ways:

  • By reducing financial loss, lowering the risk exposure, and reducing the capital tied to indemnify customers against the fraud

  • By strengthening the safe image of the bank and thereby growing its market share

Spending behavior of bank customers usually follows a pattern that repeats based upon events such as a credit of their salary into the bank account or the payment of utility bills. Any significant deviations from the spending pattern could point to a potential fraudulent activity. Such potential fraudulent activity should trigger an alert so that the bank can take timely action to limit or prevent the financial loss.

In Chapter 3, Building a Fraud Detection System, we will cover a transaction screening system. The goal of this system is to screen every transaction for a potential fraud and generate real-time fraud notifications, so that we can block the transaction and alert the prey. A big data based fraud detection system uses transaction data and enriches it with other static and location data to predict a possible fraud. In this use case, we will focus upon real-time fraud detection as opposed to detecting frauds in a historical data set.

A real-time fraud detection system is a very effective way to fight transaction fraud because it can prevent the movement of funds immediately when a fraud is detected. This prevention mechanism can be built into the transaction approval process. Batch-processing based fraud detection also has value because some types of frauds cannot be detected in real time owing to intensive computing power requirements. However, by the time a fraud is detected using the batch-processing mechanism the money might be irrecoverably lost, and the criminal might have fled, which is why a real-time fraud detection system is more useful.

Marketing campaign planning

You will be familiar with promotional folders that get delivered to your mailboxes by post or with newspapers and magazines. These promotional folders are sent as a part of a campaign run by the marketing departments of companies. A campaign is typically part of a project with a well-defined objective. Often these objectives are related to the successful sale of a product, or a customer visiting a store in response to a campaign. The rewards of the employees in the marketing department are linked to the success of the campaign. Promotional campaigns have a lot of waste associated with them because they target the wrong customers, who are unlikely to respond to a promotional folder, but they still get them because there is no way of knowing who the right and wrong customers are.

Note

For example, if you send offers for meat products to a person who is a vegetarian then it is very unlikely that it will result in a sale of your product

As a result, the promotional folders are sent to everyone.

In Chapter 4, Marketing Campaign Planning, we will build a system to decide which customers are more likely to respond to a promotional folder by using an example of a fictitious company. We will build a predictive model from the historical campaign response data. We will use the predictive model to create a new target list of customers who are more likely to respond to our promotional campaign. The aim of this exercise is to increase the success of marketing campaigns. We will use a tool called BigML to build a predictive model and Hadoop to process the customer data.

Churn detection in telecom

Customer churn or customer attrition refers to the loss of clients to competitors. This problem is acute among technology service providers such as Internet service providers, mobile operators and vendors of software as a service. As a result of customer churn, the companies lose a source of revenue. In very competitive markets where vendors outdo each other by slashing the prices, the cost of acquiring new customers is much more than retaining customers. In these saturated markets, with little or no room for growth, customer retention is the only strategy to maintain market share and revenue. A customer churn detection system is a very compelling business case in the telecom sector.

In the telecom business, a customer might defect to another provider at the end of a contract period. If a telecom company knows in advance, which customer is likely to move to a new provider, then they can make a suitable offer to the customer that will increase the likelihood that the customer will stay with them after the end of the existing contract period.

To predict customer churn, we should examine what kind of signal we can derive from the data. Just by examining the static data about the customer, we will not be able to conclude much about an upcoming churn event. Therefore, a churn-detection system should look into customer behavior such as the calling patterns, social interactions and contacts with the call center. All this information, when analyzed properly, can be put to use for building a churn-detection model.

The churn-detection problem is also well suited for large-scale batch analytics, which Hadoop excels at. We can start shortlisting the customers who are likely to churn a few months before the contract end date. Once we have this list, we have a way that we can target the customers with both inbound and outbound marketing campaigns to increase the chances that customer will stay with the company after the end of the existing contract period.

In Chapter 5, Churn Detection, we will build a system to predict customer churn. We will use customer master data, and other master data, to build a customer-churn model. We will use this model to predict customers who are likely to churn. We will use batch processing to generate the list that will be used by inbound sales staff and outbound campaign managers to target customers with tailor made offers.

Analyzing sensor data

Nowadays sensors are everywhere. GPS sensors are fitted in taxis to track their movement and location. Smartphones carry GPS, temperature and speed sensors. Even large buildings and factory complexes have thousands of sensors that measure the lighting, temperature, and humidity. The sensor data is collected, processed and analyzed in three distinct steps using a big data system. The first step involves the detection of events that generate data from the sensor. The sensor transports this data using a wire or wireless protocol to a centralized data-storage system. In the second step, the sensor data is stored in a centralized data-storage system after data cleansing, if necessary. The third step involves analyzing and consuming this data by an application. A system capable of processing sensor data is also called an Internet of Things (IOT) system. However, sometimes we might need to analyze the sensor data in near real time. For example, the temperature or the humidity in a large factory complex, if not monitored or controlled in real time, might lead to perished goods or loss of human productivity. In such cases, we need a near real-time data analytics solution. Although the HDFS is suitable for batch analytics, other tools in the Hadoop ecosystem can support near real-time analytics very well.

Sensor data usually takes the form of time series, because data sent by a sensor is a measurement at a specific moment in time. This measurement might contain information about temperature, voltage, humidity or some other physical parameter of interest. In the use case covered in Chapter 6, Analyze Sensor Data Using Hadoop, we will use sensor data to build a batch and real-time data analytics system for a factory.

Building a data lake

The term data lake has gained popularity in recent years. The main promise of a data lake is to provide access to large volumes of data in a raw form for analytics of an entire enterprise and to introduce agility into the data-warehousing processes.

Data Lakes are challenging the traditional enterprise data-warehousing paradigm. Traditional data warehousing is based upon the Extract-Transform-Load (ETL) paradigm. The ETL based data-warehousing processes have a long cycle time because they require a well-defined data model where the data should be loaded. This process is called transformation because the data extracted from the operational systems is transformed for loading into the enterprise data warehouse. It's only when the data is loaded into the data warehouse that it becomes available for further analysis.

Hadoop supports the Extract-Load-Transform ELT paradigm. The data files in their raw format can be loaded into the HDFS. It does not require any kind of knowledge of the data model. Once the data has been loaded on the HDFS, then we can use a variety of tools for ad-hoc exploration and analytics. The transformation of data to facilitate structured exploration and analytics can continue, but users already get access to the raw data for exploration without waiting for a long time.

The data lake use case opens up new frontiers for businesses because data lakes give access to data to a large group of users. The lower cost of data storage in the HDFS makes this an attractive proposition because data with no immediate use does not have to be discarded to save the expensive storage space in the Enterprise Data Warehouse. The data lake stores all the data for an enterprise. It offers an opportunity to break the data silos in enterprises that made data analysis using cross-departmental data a very slow process owing to interdepartmental politics and boundaries created by different IT systems.

The data lake use case opens up a new set of questions about data governance and data security. Once all the enterprise data is stored in the data lake, fine-grained access to datasets becomes crucial to ensure that data does not get into the wrong hands. A system containing all the enterprise data becomes a valuable target for hackers too.

In Chapter 7, Build a Data Lake, we will build a basic data lake and see how we can keep it secure by using various tools available in the Hadoop ecosystem.

 

The architecture of Hadoop-based systems


After covering the various use cases covered in this book, we will discuss the architecture that forms the basis of the solutions that we will create in the following chapters. Lambda architecture is a good architecture pattern that we can refer to while designing a big data system. Lambda architecture is a generic architecture in which Hadoop and its ecosystem of tools fit nicely.

Lambda architecture

Nathan Marz is an ex-Twitter engineer who gained significant experience in building real-time distributed data-processing systems. He went on to design Lambda architecture, which is a generic architecture for big data systems. This architecture addresses the requirements of batch and real-time analytics.

The underlying thoughts behind building a generic architecture for big data systems are as follows:

  • The system should be fault tolerant and capable of coping with hardware failures.

  • The system should be able to run a variety of workloads. In some workloads, low latency is required such as interactive queries.

  • It should support linear scalability. This means that adding more servers should scale up the processing capacity of the system linearly.

From a high-level perspective, this system has the following building blocks:

  • A data ingestion block, which dispatches data to the batch and the speed layer for processing.

  • The batch layer, which is an immutable, append-only data store. This computes the batch views.

  • The serving layer, which indexes batch views for consumption by interactive queries and applications.

  • The speed layer, which processes the data in real time and compensates for the delay introduced in the batch layer.

  • Any interactive query can be answered by merging the results from the serving layer and batch layer.

Figure 24 The Lambda architecture for big data systems

As a reader, you will quickly recognize how Hadoop and the HDFS fit in the batch layer. Technologies such as MapReduce and Pig are suited for the creation of batch views. In the serving layer, we can use tools such as Hbase or MongoDB, which support both interactive queries and full CRUD operations on the datasets. In the speed layer, we can use tools such as Apache Storm and Apache Spark Streaming.

Lambda architecture is a reference model. In our use cases, we should make good choices about which components of this reference model are used in our solution architecture. For example, a use case that does not require a near real-time response or interactive queries will not need components to build the speed layer and serving layer.

 

Summary


In this chapter, we started by learning about the origins of big data problems. We learned how Google publications gave rise to the development of Hadoop and its ecosystem of tools and how the engineering teams at Yahoo were the main driving force behind the evolution of Hadoop.

We covered how industrial scale use of Hadoop at Yahoo paved the way for the commercial scale of adoption of Hadoop in diverse industry segments.

We learned about the design of the HDFS and MapReduce as computing paradigms followed by an overview of the tools in the Hadoop ecosystem. We developed a MapReduce program and also studied how to run it on Hadoop.

The latter part of this chapter was devoted to giving you a brief overview of cases covered in this book, which we will learn in our projects in the coming chapters. We also covered Lambda architecture as the reference architecture for building big data systems.

About the Authors
  • Anurag Shrivastava

    Anurag Shrivastava is an entrepreneur, blogger, and manager living in Almere near Amsterdam in the Netherlands. He started his IT journey by writing a small poker program on a mainframe computer 30 years back, and he fell in love with software technology. In his 24-year career in IT, he has worked for companies of various sizes, ranging from Internet start-ups to large system integrators in Europe. Anurag kick-started the Agile software movement in North India when he set up the Indian business unit for the Dutch software consulting company Xebia. He led the growth of Xebia India as the managing director of the company for over 6 years and made the company a well-known name in the Agile consulting space in India. He also started the Agile NCR Conference, which has become a heavily visited annual event on Agile best practices, in the New Delhi Capital Region. Anurag became active in the big data space when he joined ING Bank in Amsterdam as the manager of the customer intelligence department, where he set up their first Hadoop cluster and implemented several transformative technologies, such as Netezza and R, in his department. He is now active in the payment technology and APIs, using technologies such as Node.js and MongoDB. Anurag loves to cycle on the reclaimed island of Flevoland in the Netherlands. He also likes listening to Hindi film music.

    Browse publications by this author
  • Tanmay Deshpande

    Tanmay Deshpande is a Hadoop and big data evangelist. He currently works with Schlumberger as a Big Data Architect in Pune, India. He has interest in a wide range of technologies, such as Hadoop, Hive, Pig, NoSQL databases, Mahout, Sqoop, Java, cloud computing, and so on. He has vast experience in application development in various domains, such as oil and gas, finance, telecom, manufacturing, security, and retail. He enjoys solving machine-learning problems and spends his time reading anything that he can get his hands on. He has great interest in open source technologies and has been promoting them through his talks. Before Schlumberger, he worked with Symantec, Lumiata, and Infosys. Through his innovative thinking and dynamic leadership, he has successfully completed various projects. He regularly blogs on his website http://hadooptutorials.co.in. You can connect with him on LinkedIn at https://www.linkedin.com/in/deshpandetanmay/. He has also authored Mastering DynamoDB, published in August 2014, DynamoDB Cookbook, published in September 2015, Hadoop Real World Solutions Cookbook-Second Edition, published in March 2016, Hadoop: Data Processing and Modelling, published in August, 2016, and Hadoop Blueprints, published in September 2016, all by Packt Publishing.

    Browse publications by this author
Latest Reviews (1 reviews total)
not enough concepts and no samples
Hadoop Blueprints
Unlock this book and the full library FREE for 7 days
Start now