"We call this the problem of big data."
Arguably, the first time big data was being talked about in a context we know now was in July, 1997. MichaelCox and DavidEllsworth, scientists/researchers from NASA, described the problem they faced when processing humongous amounts of data with the traditional computers of that time. In the early 2000s, Lexis Nexis designed a proprietary system, which later went on to become the High-PerformanceComputingCluster (HPCC), to address the growing need of processing data on a cluster. It was later open sourced in 2011.
It was an era of dot coms and Google was challenging the limits of the internet by crawling and indexing the entire internet. With the rate at which the internet was expanding, Google knew it would be difficult if not impossible to scale vertically to process data of that size. Distributed computing, though still in its infancy, caught Google's attention. They not only developed a distributed fault tolerant filesystem, Google File System (GFS), but also a distributed processing engine/system called MapReduce. It was then in 2003-2004 that Google released the white paper titled The Google File System by SanjayGhemawat, HowardGobioff, and Shun-TakLeung, and shortly thereafter they released another white paper titled MapReduce: Simplified Data Processing on Large Clusters by JeffreyDean and SanjayGhemawat.
Doug Cutting, an open source contributor, around the same time was looking for ways to make an open source search engine and like Google was failing to process the data at the internet scale. By 1999, Doug Cutting had developed Lucene, a Java library with the capability of text/web searching among other things. Nutch, an open source web crawler and data indexer built by Doug Cutting along with Mike Cafarella, was not scaling well. As luck would have it, Google's white paper caught Doug Cutting's attention. He began working on similar concepts calling them Nutch Distributed File System (NDFS) and Nutch MapReduce. By 2005, he was able to scale Nutch, which could index from 100 million pages to multi-billion pages using the distributed platform.
However, it wasn't just Doug Cutting but Yahoo! too who became interested in the development of the MapReduce computing framework to serve its processing capabilities. It is here that Doug Cutting refactored the distributed computing framework of Nutch and named it after his kid's elephant toy, Hadoop. By 2008, Yahoo! was using Hadoop in its production cluster to build its search index and metadata called web map. Despite being a direct competitor to Google, one distinct strategic difference that Yahoo! took while co-developing Hadoop was the nature in which the project was to be developed: they open sourced it. And the rest, as we know is history!
In this chapter, we will cover the following topics:
Big data can be best described by using its dimensions. Those dimensions are called the Vs of big data. To categorize a problem as a big data problem, it should lie in one or more of these dimensions.
The big data world started with three dimensions or 3Vs of big data, which are as follows:
Let us now take a look at each one in detail:
Then, with time, our 3D world changed to a 7D world, with the following newer dimensions:
In a classical sense, if we are to talk of Hadoop then it comprises of two components: a storage layer called HDFS and a processing layer called MapReduce. Resource management task prior to Hadoop 2.X was done using the MapReduce framework of Hadoop itself. However, that changed with the introduction of YARN. In Hadoop 2.0, YARN was introduced as the third component of Hadoop to manage the resources of the Hadoop cluster and make it more MapReduce agnostic.
The Hadoop Distributed File System (HDFS), as the name suggests, is a distributed filesystem based on the lines of the Google File System written in Java. In practice, HDFS resembles closely any other UNIX filesystem with support for common file operations such as ls
, cp
, rm
, du
, cat
, and so on. However what makes HDFS stand out, despite its simplicity, is its mechanism to handle node failure in the Hadoop cluster without effectively changing the search time for accessing stored files. The HDFS cluster consists of two major components: DataNodes and NameNode.
HDFS has a unique way of storing data on HDFS clusters (cheap commodity networked commodity computers). It splits the regular file in smaller chunks called blocks and then makes an exact number of copies of such chunks depending on the replication factor for that file. After that, it copies such chunks to different DataNodes of the cluster.
The NameNode is responsible for managing the metadata of the HDFS cluster, such as lists of files and folders that exist in a cluster, the number of splits each file is divided into, and their replication and storage at different DataNodes. It also maintains and manages the namespace and file permission of all the files available in the HDFS cluster. Apart from bookkeeping, NameNode also has a supervisory role that keeps a watch on the replication factor of all the files and if some block goes missing, then it issue commands to replicate the missing block of data. It also generates reports to ascertain cluster health. It is important to note that all the communication for a supervisory task happens from DataNode to NameNode; that is, DataNode sends reports (block reports) to NameNode and it is then that NameNode responds to them by issuing different commands or instructions as the need may be.
An HDFS read operation from a client involves the following:
An HDFS write operation from a client involves the following:
FSDataOutputStream
which points to the data queue.The simplest way to understand YARN (YetAnotherResourceManager) is to think of it as an operating system on a cluster; provisioning resources, scheduling jobs and node maintenance. With Hadoop 2.x, the MapReduce model of processing the data and managing the cluster (Job Tracker/Task Tracker) was divided. While data processing was still left to MapReduce, the cluster's resource allocation (or rather, scheduling) task was assigned to a new component called YARN. Another objective that YARN met was that it made MapReduce one of the techniques to process the data rather than being the only technology to process data on HDFS, as was the case in Hadoop 1.x systems. This paradigm shift opened the floodgates for the development of interesting applications around Hadoop and a new ecosystem other than the classical MapReduce processing system evolved. It didn't take much time after that for Apache Spark to break the hegemony of classical MapReduce and become arguably the most popular processing framework for parallel computing as far as active development and adoption is concerned.
In order to serve multi-tenancy, fault tolerance, and resource isolation in YARN, it developed the following components to manage the cluster seamlessly:
The following steps follow the flow of application submission in YARN:
Depending on the priority and availability of resources the RM grants containers represented by the container ID and hostname of the node on which it can be launched.
Before delving deep into MapReduce implementation in Hadoop, let's first understand MapReduce as a concept in parallel computing and why it is a preferred way of computing. MapReduce comprises two mutually exclusive but dependent phases, each capable of running on two different machines or nodes:
Let's begin with John Lennon's song, Imagine.
Sample text:
Imagine there's no heaven It's easy if you try No hell below us Above us only sky Imagine all the people living for today
After running the Map phase on the sampled text and splitting it over <space>,
it will get converted to a key value pair as shown here:
<imagine, 1> <there's, 1> <no, 1> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <no, 1> <hell, 1> <below, 1> <us, 1> <above, 1> <us, 1> <only, 1> <sky, 1> <imagine, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
The key here represents the word and the value represents the count. Also it should be noted that we have converted all the keys to lowercase to reduce any further complexity arising out of matching case sensitive keys.
[<imagine, 2> <there's, 1> <no, 2> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <hell, 1> <below, 1> <us, 2> <above, 1> <only, 1> <sky, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
As we can see, both the Map and Reduce phases can be run exclusively and hence can use independent nodes in the cluster to process the data. This approach of separation of tasks into smaller units called Map and Reduce has revolutionized general purpose distributed/parallel computing, which we now know as MapReduce.
Apache Hadoop's MapReduce has been implemented pretty much the same way as discussed, except for adding extra features into how the data from the Map phase of each node gets transferred to their designated Reduce phase node.
Hadoop's implementation of MapReduce enriches the Map and Reduce phases by adding a few more concrete steps in between to make it fault tolerant and truly distributed. We can describe MR jobs on YARN in five stages:
If all the three conditions are met, then the MR job jar along with its configuration details of input split are copied to HDFS in a directory named the application ID provided by RM. Then the job is submitted to RM to launch a job-specific Application Master, MRAppMaster
.
MRAppMaster
, a call is made to the YARN scheduler for assigning a container. As per the resource availability, the container is granted and hence the MRAppMaster
is launched at the designated node with provisioned resources. After this, MRAppMaster
fetches input split information from the HDFS path that was submitted by the client and computes the number of mapper tasks that will be launched based on the splits. Depending on the number of mappers, it also calculates the required number of reducers as per the configuration, If MRAppMaster
now finds the number of mapper, reducer and size of input files to be small enough to be run in the same JVM, then it goes ahead in doing so. Such tasks are called Uber tasks. However, in other scenarios, MRAppMaster
negotiates container resources from RM for running these tasks, albeit mapper tasks have a higher order and priority. This is why Mapper tasks must finish before the sorting phase can start.
Data locality is another concern for containers hosting mappers, as local data nodes are preferred over rack locals, with the least preference being given to remote node hosted data. But when it comes to the Reduce phase no such preference of data locality exists for containers. Containers hosting function mappers first copy mapReduce
JAR and configuration files locally and then launch a class called YarnChild
in the JVM. The mapper then starts reading the input file, processes them by making key value pairs, and writes them in a circular buffer.
A Map task keeps updating the status to ApplicationMaster for its entire life cycle. It is only when 5 percent of a Map task has been completed that the Reduce task starts. An auxiliary service in the NodeManager serving the Reduce task starts a Netty web server that makes a request to MRAppMaster for Mapper hosts having specific Mapper partitioned files. All the partitioned files that pertain to the Reducer are copied to their respective nodes in a similar fashion. Since multiple files get copied as data from various nodes representing that Reduce nods gets collected, a background thread merges the sorted map file and again sorts them and if the combiner is configured, then combines the result too.
MRAppMaster
about the task completion. Once the Reduce task is over, the ApplicationMaster starts the clean-up activity. The submitted job status is changed from running to successful, and all the temporary and intermediate files and folders are deleted .The application statistics are archived to a job history server.MapReduce is on its way to being a legacy. We've got Spark, says the man behind Apache Hadoop, Doug Cutting. MapReduce is an amazingly popular distributed framework but it comes with its fair amount of criticism as well. Since its inception, MapReduce has been built to run jobs in batch mode. Although it supports streaming, it's not very well suited for ad-hoc queries, machine learning, and so on. Apache Spark is a distributed in-memory computing framework, and somehow tries to address some of the major concern that surrounds MapReduce:
The very first paper on RDD Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing described it as follows:
ResilientDistributedDatasets (RDDs), a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. As Spark is written in a functional programming paradigm, one of the key concepts of functional programming is immutable objects. Resilient Distributed Dataset is also an immutable dataset.
Formally, we can define an RDD as an immutable distributed collection of objects. It is the primary data type of Spark. It leverages cluster memory and is partitioned across the cluster.
The following is the logical representation of RDD:
RDDs can consist of (key, value) pairs as well. The following is the logical representation of pair of RDDs:
Also, as mentioned, RDD can be partitioned across the cluster. So the following is the logical representation of partitioned RDDs in a cluster:
An RDD supports only two types of operations. One is called transformation and the other is called action. The following are the explanations of both of these:
So, a filter operation on an RDD will return another RDD with all the values that passes through the filter condition. So, a filter is an example of a transformation
So, the rule is if after an operation on an RDD, you get an RDD then it is a transformation; otherwise, it is an action. We will discuss all the available transformations and actions that can be performed on an RDD, with the coding examples, in Chapter 4, Understanding the Spark Programming Model and Chapter 7, Spark Programming Model - Advanced.
Another important thing to understand about RDD is Lazy evaluation. Spark creates a DAG, also called the lineage graph, of all the operations you perform on an RDD. Execution of the graph starts only when an action is performed on RDD. Let's consider an example of DAG operations on RDD:
Here, first an RDD is calculated by reading data from a stable storage and two of the transformations are performed on the RDD and then finally an action is performed to get the result.
Look at the previous diagram; one would infer that RDD1 will be created as soon as a Spark job finds the step to create it from the database and then it will find the transformation steps, so it will perform transformations. Then it finds an action and so it will run the given action to calculate the result. However, this is not true.
In reality, a Spark job will start creating DAG steps until it finds a step that asks it to perform action on RDD. When the job finds this step, it starts executing the DAG from the first vertex.
The following are the benefits of this approach:
Following are some benefits that Spark RDD model provides over Hadoop MapReduce Model:
Consider running a MapReduce job that reads data from HDFS and performs some aggregation and writes the output back to HDFS. Now, mapper jobs will read data from HDFS and write the output to the local filesystem after completion and Reduce pulls that data and runs the reduce process on it. After which, it writes the output to HDFS (not considering the spill mechanism of mapper and reducer).
Now, let's say you want to perform another aggregation on the output data so you will execute another MapReduce job on the output data which will go through a similar I/O process. So the following is the logical representation of how iterative operations will run in MapReduce.
On the other hand, Spark will not perform such I/O in most of the cases for the job previously described. Data will be read from HDFS once and then Spark will perform in memory transformation on RDD for every iteration. The output of every step (that is, another RDD) will be stored in the distributed cluster memory. The following is the logical representation of the same job in Spark:
Now, here is a catch. What if the size of the intermediate results is more than the distributed memory size? In that case, Spark will spill that RDD to disk.
Let's say you are trying to run some MapReduce jobs (or Hive queries) on the data to do some analysis. If you are running multiple queries on same input data, MapReduce will read the data from storage, let's say HDFS, every time you run the query. A logical representation of that can be as follows:
On the other hand, Spark provides a mechanism to persist an RDD in memory (different mechanisms of persisting RDD will be discussed later in Chapter 4, Understanding the Spark Programming Model). So, you can execute one job and save RDD in memory. Then, other analytics can be executed on the same RDD without reading the data from HDFS again. The following is the logical representation of that:
When a Spark job encounters Spark Action 1, it executes the DAG and calculates the RDD. Then the RDD will be persisted in memory and Spark Action 1 will be performed on the RDD. Afterwards, Spark Action 2 and Spark Action 3 will be performed in the same RDD. So, this model helps to save lot of I/O from the stable storage in case of interactive processing.
Apache Spark is considered as the general purpose system in the big data world. It consists of a lot of libraries that help to perform various analytics on your data. It provides built-in libraries to perform batch analytics, perform real-time analytics, apply machine learning algorithms on your data, and much more.
The following are the various built-in libraries available in Spark:
sparkR
which can be used to run R scripts on Spark Cluster. Users which are more familiar with R, can use tool such as RStudio or Rshell and can execute R scripts which will run on the Spark cluster.sqlContext
and if Hive features were also required then HiveContext
was the entry point. With Spark 2.X this ambiguity has been removed and now we have one single entry point called SparkSession
. However, it is to be noted that all the module-specific entry points are still very much around and have not been deprecated yet.sqlContext
and HiveContext
are available now, as the Catalog API can be accessed by SparkSession
.spark.ml
package which is based on Dataframe has been introduced with an objective to deprecate spark.mllib
once the newly introduced package matures enough in features to replace the old package.In this chapter, you learned about the history of Bigdata and its dimensions and basic concepts of Hadoop and Spark. After that, you were introduced to RDD, and you learned the basic concepts of the Spark RDD model. Then, you learned about the various components of the Spark Ecosystem and the newer features of Spark 2.x.
As this book focuses on implementing Spark Applications in Java, in the next chapter, we will refresh the concepts of core Java and focus on the newer feature of Java 8, which will be leveraged while developing Spark Applications.
Where there is an eBook version of a title available, you can buy it from the book details for that title. Add either the standalone eBook or the eBook and print book bundle to your shopping cart. Your eBook will show in your cart as a product on its own. After completing checkout and payment in the normal way, you will receive your receipt on the screen containing a link to a personalised PDF download file. This link will remain active for 30 days. You can download backup copies of the file by logging in to your account at any time.
If you already have Adobe reader installed, then clicking on the link will download and open the PDF file directly. If you don't, then save the PDF file on your machine and download the Reader to view it.
Please Note: Packt eBooks are non-returnable and non-refundable.
Packt eBook and Licensing When you buy an eBook from Packt Publishing, completing your purchase means you accept the terms of our licence agreement. Please read the full text of the agreement. In it we have tried to balance the need for the ebook to be usable for you the reader with our needs to protect the rights of us as Publishers and of our authors. In summary, the agreement says:
If you want to purchase a video course, eBook or Bundle (Print+eBook) please follow below steps:
Our eBooks are currently available in a variety of formats such as PDF and ePubs. In the future, this may well change with trends and development in technology, but please note that our PDFs are not Adobe eBook Reader format, which has greater restrictions on security.
You will need to use Adobe Reader v9 or later in order to read Packt's PDF eBooks.
Packt eBooks are a complete electronic version of the print edition, available in PDF and ePub formats. Every piece of content down to the page numbering is the same. Because we save the costs of printing and shipping the book to you, we are able to offer eBooks at a lower cost than print editions.
When you have purchased an eBook, simply login to your account and click on the link in Your Download Area. We recommend you saving the file to your hard drive before opening it.
For optimal viewing of our eBooks, we recommend you download and install the free Adobe Reader version 9.