"We call this the problem of big data."
Arguably, the first time big data was being talked about in a context we know now was in July, 1997. MichaelCox and DavidEllsworth, scientists/researchers from NASA, described the problem they faced when processing humongous amounts of data with the traditional computers of that time. In the early 2000s, Lexis Nexis designed a proprietary system, which later went on to become the High-PerformanceComputingCluster (HPCC), to address the growing need of processing data on a cluster. It was later open sourced in 2011.
It was an era of dot coms and Google was challenging the limits of the internet by crawling and indexing the entire internet. With the rate at which the internet was expanding, Google knew it would be difficult if not impossible to scale vertically to process data of that size. Distributed computing, though still in its infancy, caught Google's attention. They not only developed a distributed fault tolerant filesystem, Google File System (GFS), but also a distributed processing engine/system called MapReduce. It was then in 2003-2004 that Google released the white paper titled The Google File System by SanjayGhemawat, HowardGobioff, and Shun-TakLeung, and shortly thereafter they released another white paper titled MapReduce: Simplified Data Processing on Large Clusters by JeffreyDean and SanjayGhemawat.
Doug Cutting, an open source contributor, around the same time was looking for ways to make an open source search engine and like Google was failing to process the data at the internet scale. By 1999, Doug Cutting had developed Lucene, a Java library with the capability of text/web searching among other things. Nutch, an open source web crawler and data indexer built by Doug Cutting along with Mike Cafarella, was not scaling well. As luck would have it, Google's white paper caught Doug Cutting's attention. He began working on similar concepts calling them Nutch Distributed File System (NDFS) and Nutch MapReduce. By 2005, he was able to scale Nutch, which could index from 100 million pages to multi-billion pages using the distributed platform.
However, it wasn't just Doug Cutting but Yahoo! too who became interested in the development of the MapReduce computing framework to serve its processing capabilities. It is here that Doug Cutting refactored the distributed computing framework of Nutch and named it after his kid's elephant toy, Hadoop. By 2008, Yahoo! was using Hadoop in its production cluster to build its search index and metadata called web map. Despite being a direct competitor to Google, one distinct strategic difference that Yahoo! took while co-developing Hadoop was the nature in which the project was to be developed: they open sourced it. And the rest, as we know is history!
In this chapter, we will cover the following topics:
- What is big data?
- Why Apache Spark?
- RDD the first citizen of Spark
- Spark ecosystem -- Spark SQL, Spark Streaming, Milb, Graphx
- What's new in Spark 2.X?
Big data can be best described by using its dimensions. Those dimensions are called the Vs of big data. To categorize a problem as a big data problem, it should lie in one or more of these dimensions.
The big data world started with three dimensions or 3Vs of big data, which are as follows:
Let us now take a look at each one in detail:
- Volume: The amount of data being generated in the world is increasing at an exponential rate. Let's take an example of social community websites such as Facebook or Twitter. They are dealing with billions of customers all around the world. So, to analyze the amount of data being generated, they need to find a solution out of the existing RDBMS world. Moreover, not only such big giants, but also other organizations, such as banks, telecom companies, and so on, are dealing with huge numbers of customers. Performing analytics on such a humongous amount of data is a big data problem. So, according to this dimension, if you are dealing with a high volume of data, which can't be handled by traditional database systems, then it's imperative to move to big data territory.
- Variety: There was a time when only structured data was meant to be processed. But, to keep yourself ahead of your competitor, you need to analyze every sort of data which can increase value. For example, which products on a portal are more popular than others? So, you are analyzing user clicks. Now, data from these various sources that you need to use to keep yourself ahead can be structured or unstructured. It can be XML, JSON, CSV, or even plain text. So, now the data that you may need to deal with can be of different varieties. So, if you have such an issue, realize that this is a big data problem.
- Velocity: Data is not only increasing in size but the rate at which it is arriving is also increasing rapidly. Take the example of Twitter: billions of users are tweeting at a time. Twitter has to handle such a high velocity of data in almost real time. Also, you can think of YouTube. A lot of videos are being uploaded or streamed from YouTube every minute. Even look at online portals of news channels; they are being updated every second or minute to cope up with incoming data of news from all over the world. So, this dimension of big data deals with a high velocity of data and helps to provide persistence or analyze the data in near real time so as to generate real value.
Then, with time, our 3D world changed to a 7D world, with the following newer dimensions:
- Veracity: The truthfulness and completeness of the data are equally important. Take an example of a machine learning algorithm that involves automated decision making based on the data it analyzes. If the data is not accurate, this system can be disastrous. An example of such a system can be predictive analytics based on the online shopping data of end users. Using the analytics, you want to send offers to users. If the data that is fed to such a system is inaccurate or incomplete, analytics will not be meaningful or beneficial for the system. So, as per this dimension, before processing/analyzing, data should be validated. Processing high volume or high velocity data can only be meaningful if the data is accurate and complete, so before processing the data, it should be validated as well.
- Variability: This dimension of big data mainly deals with natural language processing or sentiment analytics. In language, one word can have multiple usages based on the sentiments of the user. So, to find sentiments, you should be able to comprehend the exact meaning. Let's say your favorite football team is not playing well and you posted a sarcastic tweet saying "What a great performance today by our team!!" Now looking at this sentence, it seems you are loving the way your team is performing but in reality it is the opposite. So to analyze the sentiments, the system should be fed with lot of other information such as the statistics of the match, and so on. Another example, the sentence This is too good to be true is negative but it consists of all positive words. Semantic analytics or natural language processing can only be accurate if you can understand sentiments behind the data.
- Value: There is lot of cost involved in performing big data analytics: the cost of getting the data, the cost for arranging hardware on which this data is saved and be analyzed, the cost of employees and time that goes into these analytics. All these costs are justified if the analytics provide value to the organization. Think of a healthcare company performing analytics on e-commerce data. They may be able to perform the analytics by getting data from the internet but it does not have value for them. Also, performing analytics on data which is not accurate or complete is not of any value. On the contrary, it can be harmful, as the analytics performed are misleading. So, value becomes an important dimension of big data because valuable analytics can be useful.
- Visualization: Visualization is another important aspect of the analytics. No work can be useful until it is visualized in a proper manner. Let's say engineers of your company have performed real accurate analytics but the output of them are stored in some JSON files or even in databases. The business analyst of your company, not being hard core technical, is not able to understand the outcome of the analytics thoroughly as the outcome is not visualized in a proper manner. So the analytics, even though they are correct, cannot be of much value to your organization. On the other hand, if you have created proper graphs or charts or effective visualization on the outcome, it can be much easier to understand and can be really valuable. So, visualization is a really important aspect of big data analytics because things can only be highlighted if they are visible.
In a classical sense, if we are to talk of Hadoop then it comprises of two components: a storage layer called HDFS and a processing layer called MapReduce. Resource management task prior to Hadoop 2.X was done using the MapReduce framework of Hadoop itself. However, that changed with the introduction of YARN. In Hadoop 2.0, YARN was introduced as the third component of Hadoop to manage the resources of the Hadoop cluster and make it more MapReduce agnostic.
The Hadoop Distributed File System (HDFS), as the name suggests, is a distributed filesystem based on the lines of the Google File System written in Java. In practice, HDFS resembles closely any other UNIX filesystem with support for common file operations such as
cat, and so on. However what makes HDFS stand out, despite its simplicity, is its mechanism to handle node failure in the Hadoop cluster without effectively changing the search time for accessing stored files. The HDFS cluster consists of two major components: DataNodes and NameNode.
HDFS has a unique way of storing data on HDFS clusters (cheap commodity networked commodity computers). It splits the regular file in smaller chunks called blocks and then makes an exact number of copies of such chunks depending on the replication factor for that file. After that, it copies such chunks to different DataNodes of the cluster.
The NameNode is responsible for managing the metadata of the HDFS cluster, such as lists of files and folders that exist in a cluster, the number of splits each file is divided into, and their replication and storage at different DataNodes. It also maintains and manages the namespace and file permission of all the files available in the HDFS cluster. Apart from bookkeeping, NameNode also has a supervisory role that keeps a watch on the replication factor of all the files and if some block goes missing, then it issue commands to replicate the missing block of data. It also generates reports to ascertain cluster health. It is important to note that all the communication for a supervisory task happens from DataNode to NameNode; that is, DataNode sends reports (block reports) to NameNode and it is then that NameNode responds to them by issuing different commands or instructions as the need may be.
An HDFS read operation from a client involves the following:
- The client requests NameNode to determine where the actual data blocks are stored for a given file.
- NameNode obliges by providing the block IDs and locations of the hosts (DataNode) where the data can be found.
- The client contacts DataNode with the respective block IDs to fetch the data from DataNode while preserving the order of the block files.
An HDFS write operation from a client involves the following:
- The client contacts NameNode to update the namespace with the filename and verify the necessary permissions.
- If the file exists, then NameNode throws an error; otherwise, it returns the client
FSDataOutputStreamwhich points to the data queue.
- The data queue negotiates with the NameNode to allocate new blocks on suitable DataNodes.
- The data is then copied to that DataNode, and, as per the replication strategy, the data is further copied from that DataNode to the rest of the DataNodes.
- It's important to note that the data is never moved through the NameNode as it would caused a performance bottleneck.
The simplest way to understand YARN (YetAnotherResourceManager) is to think of it as an operating system on a cluster; provisioning resources, scheduling jobs and node maintenance. With Hadoop 2.x, the MapReduce model of processing the data and managing the cluster (Job Tracker/Task Tracker) was divided. While data processing was still left to MapReduce, the cluster's resource allocation (or rather, scheduling) task was assigned to a new component called YARN. Another objective that YARN met was that it made MapReduce one of the techniques to process the data rather than being the only technology to process data on HDFS, as was the case in Hadoop 1.x systems. This paradigm shift opened the floodgates for the development of interesting applications around Hadoop and a new ecosystem other than the classical MapReduce processing system evolved. It didn't take much time after that for Apache Spark to break the hegemony of classical MapReduce and become arguably the most popular processing framework for parallel computing as far as active development and adoption is concerned.
In order to serve multi-tenancy, fault tolerance, and resource isolation in YARN, it developed the following components to manage the cluster seamlessly:
- The ResourceManager: This negotiates resources for different compute programs on a Hadoop cluster while guaranteeing the following: resource isolation, data locality, fault tolerance, task prioritization, and effective cluster capacity utilization. A configurable scheduler allows Resource Manager the flexibility to schedule and prioritize different applications as per the requirements.
- Tasks served by the RM while serving clients: A client or APIs user can submit or terminate an application. The user can also gather statistics on submitted applications cluster, and queue information. RM also priorities ADMIN tasks over any other task to perform a clean up or maintenance activities on a cluster, such as refreshing the node-list, the queues' configuration, and so on.
- Tasks served by RM while serving cluster nodes: Provisioning and de-provisioning of new nodes forms an important task of RM. Each node sends a heartbeat at a configured interval, the default being 10 minutes. Any failure of a node in doing so is treated as a dead node. As a clean-up activity, all the supposedly running process, including containers, are marked as dead too.
- Tasks served by the RM while serving the Application Master: The RM registers a new the AM while terminating the successfully executed ones. Just like cluster nodes, if the heartbeat of an AM is not received within a preconfigured duration, the default value being 10 minutes, then the AM is marked dead and all the associated containers are also marked dead. But since YARN is reliable as far as the application execution is concerned, a new AM is rescheduled to try another execution on a new container until it reaches the retry configurable default count of four.
- Scheduling and other miscellaneous tasks served by the RM: RM maintains a list of running, submitted and executed applications along with its statistics such as execution time, status, and so on. The privileges of the user as well as of applications are maintained and compared while serving various requests of the user per application life cycle. The RM scheduler oversees the resource allocation for the application, such as memory allocation. Two common scheduling algorithms used in YARN are fair scheduling and capacity scheduling algorithms.
- NodeManager: An NM exist per node of the cluster on a slightly similar fashion as to what slave nodes are in the master slave architecture. When an NM starts, it sends the information to RM for its availability to share its resources for upcoming jobs. Then NM sends a periodic signal, also called a heartbeat, to RM informing it of its status as being alive in the cluster. Primarily, an NM is responsible for launching containers that have been requested by an AM with certain resource requirements such as memory, disk, and so on. Once the containers are up and running, the NM keeps a watch not on the status of the container's task but on the resource utilization of the container and kills it if the container starts utilizing more resources than it has been provisioned for. Apart from managing the life cycle of the container, the NM also keeps RM informed about the node's health.
- ApplicationMaster: An AM gets launched per submitted application and manages the life cycle of the submitted application. However, the first and foremost task an AM does is to negotiate resources from RM to launch task-specific containers at different nodes. Once containers are launched, the AM keeps track of all the container's task statuses. If any node goes down or the container gets killed because of using excess resources or otherwise, in such cases the AM renegotiates resources from RM and launches those pending tasks again. The AM also keeps reporting the status of the submitted application directly to the user and other such statistics to RM. ApplicationMaster implementation is framework specific and it is because of this reason that application/framework specific code is transferred to the AM and the AM that distributes it further. This important feature also makes YARN technology agnostic, as any framework can implement its ApplicationMaster and then utilize the resources of the YARN cluster seamlessly.
- Containers: A container in an abstract sense is a set of minimal resources such as CPU, RAM, Disk I/O, disk space, and so on, that are required to run a task independently on a node. The first container after submitting the job is launched by RM to host ApplicationMaster. It is the AM which then negotiates resources from RM in the form of containers, which then gets hosted in different nodes across the Hadoop cluster.
The following steps follow the flow of application submission in YARN:
- Using a client or APIs, the user submits the application; let's say a Spark job jar. ResourceManager, whose primary task is to gather and report all the applications running on the entire Hadoop cluster and available resources on respective Hadoop nodes, depending on the privileges of the user submitting the job, accepts the newly submitted task.
- After this RM delegates the task to a scheduler, the scheduler then searches for a container which can host the application-specific Application Master. While the scheduler does take into consideration parameters such as availability of resources, task priority, data locality, and so on, before scheduling or launching an Application Master, it has no role in monitoring or restarting a failed job. It is the responsibility of RM to keep track of an AM and restart it in a new container if it fails.
- Once the ApplicationMaster gets launched it becomes the prerogative of the AM to oversee the resources negotiation with RM for launching task-specific containers. Negotiations with RM are typically over:
- The priority of the tasks at hand.
- The number of containers to be launched to complete the tasks.
- The resources needed to execute the tasks, such as RAM and CPU (since Hadoop 3.x).
- The available nodes where job containers can be launched with the required resources.
Depending on the priority and availability of resources the RM grants containers represented by the container ID and hostname of the node on which it can be launched.
- The AM then requests the NM of the respective hosts to launch the containers with specific IDs and resource configuration. The NM then launches the containers but keeps a watch on the resources usage of the task. If, for example, the container starts utilizing more resources than it has been provisioned then that container is killed by the NM. This greatly improves the job isolation and fair sharing of resources guarantee that YARN provides as, otherwise, it would have impacted the execution of other containers. However, it is important to note that the job status and application status as a whole are managed by the AM. It falls in the domain of the AM to continuously monitor any delay or dead containers, simultaneously negotiating with RM to launch new containers to reassign the task of dead containers.
- The containers executing on different nodes send application-specific statistics to the AM at specific intervals.
- The AM also reports the status of the application directly to the client that submitted the specific application, in our case a Spark job.
- The NM monitors the resources being utilized by all the containers on the respective nodes and keeps sending a periodic update to RM.
- The AM sends periodic statistics such application status, task failure, and log information to RM.
Before delving deep into MapReduce implementation in Hadoop, let's first understand MapReduce as a concept in parallel computing and why it is a preferred way of computing. MapReduce comprises two mutually exclusive but dependent phases, each capable of running on two different machines or nodes:
- Map: In the Map phase, the transformation of the data takes place. It splits data into key value pairs by splitting it on a keyword.
- Suppose we have a text file and we would want to do an analysis such as counting the total number of words or even the frequency with which the word has occurred in the text file. This is the classical word count problem of MapReduce. To address this problem, first we will have to identify the splitting keyword so that the data can be spilt and be converted into a key value pair.
Let's begin with John Lennon's song, Imagine.
Imagine there's no heaven It's easy if you try No hell below us Above us only sky Imagine all the people living for today
After running the Map phase on the sampled text and splitting it over
<space>, it will get converted to a key value pair as shown here:
<imagine, 1> <there's, 1> <no, 1> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <no, 1> <hell, 1> <below, 1> <us, 1> <above, 1> <us, 1> <only, 1> <sky, 1> <imagine, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
The key here represents the word and the value represents the count. Also it should be noted that we have converted all the keys to lowercase to reduce any further complexity arising out of matching case sensitive keys.
- Reduce: The Reduce phase deals with aggregation of the Map phase results and hence all the key value pairs are aggregated over the key.
- So the Map output of the text would get aggregated as follows:
[<imagine, 2> <there's, 1> <no, 2> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <hell, 1> <below, 1> <us, 2> <above, 1> <only, 1> <sky, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
As we can see, both the Map and Reduce phases can be run exclusively and hence can use independent nodes in the cluster to process the data. This approach of separation of tasks into smaller units called Map and Reduce has revolutionized general purpose distributed/parallel computing, which we now know as MapReduce.
Apache Hadoop's MapReduce has been implemented pretty much the same way as discussed, except for adding extra features into how the data from the Map phase of each node gets transferred to their designated Reduce phase node.
Hadoop's implementation of MapReduce enriches the Map and Reduce phases by adding a few more concrete steps in between to make it fault tolerant and truly distributed. We can describe MR jobs on YARN in five stages:
- Job Submission Stage: When a client submits an MR job, the following things happen:
- The RM is requested for an application ID
- The input data location is checked and if present then the file split size is computed
- The job's output location needs to exist as well
If all the three conditions are met, then the MR job jar along with its configuration details of input split are copied to HDFS in a directory named the application ID provided by RM. Then the job is submitted to RM to launch a job-specific Application Master,
- MAP Stage: Once RM receives the client's request for launching
MRAppMaster, a call is made to the YARN scheduler for assigning a container. As per the resource availability, the container is granted and hence the
MRAppMasteris launched at the designated node with provisioned resources. After this,
MRAppMasterfetches input split information from the HDFS path that was submitted by the client and computes the number of mapper tasks that will be launched based on the splits. Depending on the number of mappers, it also calculates the required number of reducers as per the configuration, If
MRAppMasternow finds the number of mapper, reducer and size of input files to be small enough to be run in the same JVM, then it goes ahead in doing so. Such tasks are called Uber tasks. However, in other scenarios,
MRAppMasternegotiates container resources from RM for running these tasks, albeit mapper tasks have a higher order and priority. This is why Mapper tasks must finish before the sorting phase can start.
Data locality is another concern for containers hosting mappers, as local data nodes are preferred over rack locals, with the least preference being given to remote node hosted data. But when it comes to the Reduce phase no such preference of data locality exists for containers. Containers hosting function mappers first copy
mapReduce JAR and configuration files locally and then launch a class called
YarnChild in the JVM. The mapper then starts reading the input file, processes them by making key value pairs, and writes them in a circular buffer.
- Shuffle andSort Phase: Considering that circular buffers have a size constraint, after a certain percentage, the default being 80, a thread gets spawned which spills the data from the buffer. But, before copying the spilled data to disk, it is first partitioned with respect to its reducer and then the background thread also sorts the partitioned data on a key and if the combiner is mentioned it then combines the data too. This process optimizes the data once it is copied to its respective partitioned folder. This process is continued until all the data from circular buffer gets written to disk. A background thread again checks if the number of spilled files in each partition is within the range of the configurable parameter or else the files are merged and the combiner is run over them until it falls within the limit of the parameter.
A Map task keeps updating the status to ApplicationMaster for its entire life cycle. It is only when 5 percent of a Map task has been completed that the Reduce task starts. An auxiliary service in the NodeManager serving the Reduce task starts a Netty web server that makes a request to MRAppMaster for Mapper hosts having specific Mapper partitioned files. All the partitioned files that pertain to the Reducer are copied to their respective nodes in a similar fashion. Since multiple files get copied as data from various nodes representing that Reduce nods gets collected, a background thread merges the sorted map file and again sorts them and if the combiner is configured, then combines the result too.
- Reduce Stage: It is important to note here that at this stage every input file of each reducer should have been sorted by key. This is the presumption with which the reducer starts processing these records and converts the key value pair into an aggregated list. Once the reducer has processed the data, it writes them to the output folder as was mentioned during the job submission.
- Clean-up Stage: Each reducer sends a periodic update to
MRAppMasterabout the task completion. Once the Reduce task is over, the ApplicationMaster starts the clean-up activity. The submitted job status is changed from running to successful, and all the temporary and intermediate files and folders are deleted .The application statistics are archived to a job history server.
MapReduce is on its way to being a legacy. We've got Spark, says the man behind Apache Hadoop, Doug Cutting. MapReduce is an amazingly popular distributed framework but it comes with its fair amount of criticism as well. Since its inception, MapReduce has been built to run jobs in batch mode. Although it supports streaming, it's not very well suited for ad-hoc queries, machine learning, and so on. Apache Spark is a distributed in-memory computing framework, and somehow tries to address some of the major concern that surrounds MapReduce:
- Performance: A major bottleneck in MapReduce jobs are disk I/Os, and it is considerably visible during the shuffle and sort phase of MR, as data is written to disk. The guiding principal that Spark follows is simple: share the memory across the cluster and keep everything in memory as long as possible. This greatly enhances the performance of Spark jobs to the tune of 100X when compared to MR (as claimed by their developers).
- Fault tolerance: Both MR and Spark have different approaches in handling fault tolerance. The AM keeps a track of mappers and reducers while executing MR jobs. As and when these containers stop responding or fail upfront, the AM after requesting the RM, launches a separate JVM to run such tasks. While this approach achieves fault tolerance, it is both time and resource consuming. Apache Spark's approach of handling fault tolerance is different; it uses Resilient Distributed Datasets (RDD), a read only fault-tolerant parallel collection. RDD maintains the lineage graph so that whenever its partition gets lost it recovers the lost data by re-computing from the previous stage and thus making it more resilient.
- DAG: Chaining of MapReduce jobs is a difficult task and along with that it also has to deal with the burden of writing intermediate results on HDFS before the next job starts its execution. Spark is actually a DirectedAcyclicGraph (DAG) engine, so chaining any number of job can easily be achieved. All the intermediate results are shared across memory, avoiding multiple disk I/O. Also these jobs are lazily evaluated and hence only those paths are processed which are explicitly called for computation. In Spark such triggers are called actions.
- Data processing: Spark (aka Spark Core) is not an isolated distributed compute framework. A whole lot of Spark modules have been built around Spark Core to make it more general purpose. RDD forms the main abstract in all these modules. With recent development dataframe and dataset have also been developed, which enriches RDD by providing it a schema and type safety. Nevertheless, the universality of RDD is ubiquitous across all the modules of Spark making it simple to use and it can easily be cross-referenced in different modules. Whether it is streaming, querying capability, machine learning, or graph processing, the same data can be referenced by RDD and can be interchangeably used. This is a unique appeal of RDD, which is lacking in MR, as different concept was required to handle machine learning jobs in ApacheMahout than was required in ApacheGiraph.
- Compatibility: Spark has not been developed to keep only the YARN cluster in mind. It has amazing compatibility to run on Hadoop, Mesos, and even a standalone cluster mode. Similarly, Spark has not been built around HDFS and has a wide variety of acceptability as far as different filesystems are concerned. All Apache Spark does is provide compute capabilities while leaving the choice of choosing the cluster and filesystem to the use case being worked upon.
- Spark APIs: Spark APIs have wide coverage as far as functionality and programming languages are concerned. Spark's APIs have huge similarities with the Scala collection, the language in which Apache Spark has been majorly implemented. It is this richness of functional programming that makes Apache spark avoid much of the boilerplate code that eclipsed MR jobs. Unlike MR, which dealt with low level programming, Spark exposes APIs at a higher abstraction level with a scope of overriding any bare metal code if ever required.
The very first paper on RDD Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing described it as follows:
ResilientDistributedDatasets (RDDs), a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. As Spark is written in a functional programming paradigm, one of the key concepts of functional programming is immutable objects. Resilient Distributed Dataset is also an immutable dataset.
Formally, we can define an RDD as an immutable distributed collection of objects. It is the primary data type of Spark. It leverages cluster memory and is partitioned across the cluster.
The following is the logical representation of RDD:
RDDs can consist of (key, value) pairs as well. The following is the logical representation of pair of RDDs:
Also, as mentioned, RDD can be partitioned across the cluster. So the following is the logical representation of partitioned RDDs in a cluster:
An RDD supports only two types of operations. One is called transformation and the other is called action. The following are the explanations of both of these:
- Transformation: If an operation on an RDD gives you another RDD, then it is a transformation. Consider you have an RDD of strings and want to filter out all values that start with H as follows:
So, a filter operation on an RDD will return another RDD with all the values that passes through the filter condition. So, a filter is an example of a transformation
- Action: If an operation on an RDD gives you a result other than an RDD, it is called an action: for example, the sum of all values in an RDD, or the count of all the values or retrieving all values of RDD in form of a list, and so on. The following is the logical representation of an action sum of an RDD:
So, the rule is if after an operation on an RDD, you get an RDD then it is a transformation; otherwise, it is an action. We will discuss all the available transformations and actions that can be performed on an RDD, with the coding examples, in Chapter 4, Understanding the Spark Programming Model and Chapter 7, Spark Programming Model - Advanced.
Another important thing to understand about RDD is Lazy evaluation. Spark creates a DAG, also called the lineage graph, of all the operations you perform on an RDD. Execution of the graph starts only when an action is performed on RDD. Let's consider an example of DAG operations on RDD:
Here, first an RDD is calculated by reading data from a stable storage and two of the transformations are performed on the RDD and then finally an action is performed to get the result.
Look at the previous diagram; one would infer that RDD1 will be created as soon as a Spark job finds the step to create it from the database and then it will find the transformation steps, so it will perform transformations. Then it finds an action and so it will run the given action to calculate the result. However, this is not true.
In reality, a Spark job will start creating DAG steps until it finds a step that asks it to perform action on RDD. When the job finds this step, it starts executing the DAG from the first vertex.
The following are the benefits of this approach:
- Fault tolerance: The lineage graph of the operations on an RDD, makes it fault tolerant. Since Spark is well aware of the steps it needs to perform to create an RDD, it can recalculate the RDD or its partitions in case of failure of the previous step instead of repeating the whole process again. For example, with DAG, if a partition of RDD is lost while processing, it can be calculated from RDD2, instead of repeating the process of calculating it from the database and performing two transformations. This gives a huge benefit of saving time and resources in case of failures.
- Optimizing resource usage: As Spark knows all the steps to be performed to calculate the end result in advance, it can leverage this information to use the cluster resources in a most optimized manner.
Following are some benefits that Spark RDD model provides over Hadoop MapReduce Model:
- Iterative processing: One of the biggest issue, with MapReduce processing is the IO (Input/Output) involved. It really slows down the process of MapReduce if you are running iterative operations where you would basically chain MapReduce jobs to perform multiple aggregations.
Consider running a MapReduce job that reads data from HDFS and performs some aggregation and writes the output back to HDFS. Now, mapper jobs will read data from HDFS and write the output to the local filesystem after completion and Reduce pulls that data and runs the reduce process on it. After which, it writes the output to HDFS (not considering the spill mechanism of mapper and reducer).
Now, let's say you want to perform another aggregation on the output data so you will execute another MapReduce job on the output data which will go through a similar I/O process. So the following is the logical representation of how iterative operations will run in MapReduce.
On the other hand, Spark will not perform such I/O in most of the cases for the job previously described. Data will be read from HDFS once and then Spark will perform in memory transformation on RDD for every iteration. The output of every step (that is, another RDD) will be stored in the distributed cluster memory. The following is the logical representation of the same job in Spark:
Now, here is a catch. What if the size of the intermediate results is more than the distributed memory size? In that case, Spark will spill that RDD to disk.
- Interactive Processing: Another benefit of the data structure of Spark over MapReduce or Hadoop can be seen when the user wants to run some ad-hoc queries on the data placed on some stable storage.
Let's say you are trying to run some MapReduce jobs (or Hive queries) on the data to do some analysis. If you are running multiple queries on same input data, MapReduce will read the data from storage, let's say HDFS, every time you run the query. A logical representation of that can be as follows:
On the other hand, Spark provides a mechanism to persist an RDD in memory (different mechanisms of persisting RDD will be discussed later in Chapter 4, Understanding the Spark Programming Model). So, you can execute one job and save RDD in memory. Then, other analytics can be executed on the same RDD without reading the data from HDFS again. The following is the logical representation of that:
When a Spark job encounters Spark Action 1, it executes the DAG and calculates the RDD. Then the RDD will be persisted in memory and Spark Action 1 will be performed on the RDD. Afterwards, Spark Action 2 and Spark Action 3 will be performed in the same RDD. So, this model helps to save lot of I/O from the stable storage in case of interactive processing.
Apache Spark is considered as the general purpose system in the big data world. It consists of a lot of libraries that help to perform various analytics on your data. It provides built-in libraries to perform batch analytics, perform real-time analytics, apply machine learning algorithms on your data, and much more.
The following are the various built-in libraries available in Spark:
- Spark Core: As its name says, the Spark Core library consists of all the core modules of Spark. It consists of the basics of the Spark model, including RDD and various transformation and actions that can be performed with it. Basically, all the batch analytics that can be performed with the Spark programming model using the MapReduce paradigm is the part of this library. It also helps to analyze different varieties of data.
- Spark Streaming: The Spark Streaming library consists of modules that help users to run near real-time streaming processing on the incoming data. It helps to handle the velocity part of the big data territory. It consists of a lot of modules that help to listen to various streaming sources and perform analytics in near real time on the data received from those sources.
- Spark SQL: The Spark SQL library helps to analyze structured data using the very popular SQL queries. It consists of a dataset library which helps to view the structured data in the form of a table and provides the capabilities of running SQL on top of it. The SQL library consists of a lot of functions which are available in SQL of RDBMS. It also provides an option to write your own function, called the UserDefinedFunction (UDF).
- MLlib: Spark MLlib helps to apply various machine learning techniques on your data, leveraging the distributed and scalable capability of Spark. It consists of a lot of learning algorithms and utilities and provides algorithms for classification, regression, clustering, decomposition, collaborative filtering, and so on.
- GraphX: The Spark GraphX library provides APIs for graph-based computations. With the help of this library, the user can perform parallel computations on graph-based data. GraphX is the one of the fastest ways of performing graph-based computations.
- Spark-R: The Spark R library is used to run R scripts or commands on Spark Cluster. This helps to provide distributed environment for R scripts to execute. Spark comes with a shell called
sparkRwhich can be used to run R scripts on Spark Cluster. Users which are more familiar with R, can use tool such as RStudio or Rshell and can execute R scripts which will run on the Spark cluster.
- Unified DataFrame and Dataset: The Spark 2.X release has unified both the APIs. Now Dataframe is just a row in Dataset without any data type information implicitly attached.
- SparkSession: Prior to Spark 2.X, there were different entry points for different Spark jobs; that is, for Spark SQL we had
sqlContextand if Hive features were also required then
HiveContextwas the entry point. With Spark 2.X this ambiguity has been removed and now we have one single entry point called
SparkSession. However, it is to be noted that all the module-specific entry points are still very much around and have not been deprecated yet.
- Catalog API: Spark 2.X has introduced the Catalog API for accessing metadata information in Spark SQL. It can be seen as parallel to Hcatalog in Hive. It is a great step in unifying the metadata structure around Spark SQL so that the very same metadata can be exposed to non-Spark SQL applications. It is also helpful in debugging the temporary registered table in a Spark SQL session. Metadata of both
HiveContextare available now, as the Catalog API can be accessed by
- Structured streaming: Structured streaming makes Spark SQL available in streaming job by continuously running the Spark SQL job and aggregating the updated results on a streaming datasets. The Dataframe and Dataset are available for operations in structured streaming along with the windowing function.
- Whole-stage code generation: The code generation engine has been modified to generate more performance-oriented code by avoiding virtual function dispatches, transfer of intermediate operation data to memory, and so on.
- Accumulator API: A new simpler and more performant Accumulator API has been added to the Spark 2.X release and the older API has been deprecated.
- A native SQL parser that supports both ANSI-SQL as well as Hive SQL has been introduced in the current Spark build.
- Hive-style bucketing support too has been added to the list of supported SQL functions in Spark SQL.
- Subquery support has been added in Spark SQL and supports other variations of the clause such as NOT IN, IN, EXISTS, and so on.
- Native CSV data source, based on the databricks implementation has been incorporated in Spark.
- The new
spark.mlpackage which is based on Dataframe has been introduced with an objective to deprecate
spark.mllibonce the newly introduced package matures enough in features to replace the old package.
- Machine learning pipelines and models can now be persisted across all languages supported by Spark.
- Mastering Apache Solr: A practical guide to get to grips with Apache Solr by Mr. Mathieu Nayrolles [Page 6]
- Nutch: A Flexible and Scalable Open-Source Web Search Engine, http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.105.5978&rep=rep1&type=pdf
In this chapter, you learned about the history of Bigdata and its dimensions and basic concepts of Hadoop and Spark. After that, you were introduced to RDD, and you learned the basic concepts of the Spark RDD model. Then, you learned about the various components of the Spark Ecosystem and the newer features of Spark 2.x.
As this book focuses on implementing Spark Applications in Java, in the next chapter, we will refresh the concepts of core Java and focus on the newer feature of Java 8, which will be leveraged while developing Spark Applications.