Apache Spark is a distributed and highly scalable in-memory data analytics system, providing you with the ability to develop applications in Java, Scala, and Python, as well as languages such as R. It has one of the highest contribution/involvement rates among the Apache top-level projects at this time. Apache systems, such as Mahout, now use it as a processing engine instead of MapReduce. It is also possible to use a Hive context to have the Spark applications process data directly to and from Apache Hive.
Initially, Apache Spark provided four main submodules--SQL, MLlib, GraphX, and Streaming. They will all be explained in their own chapters, but a simple overview would be useful here. The modules are interoperable, so data can be passed between them. For instance, streamed data can be passed to SQL and a temporary table can be created. Since version 1.6.0, MLlib has a sibling called SparkML with a different API, which we will cover in later chapters.
The following figure explains how this book will address Apache Spark and its modules:
The top two rows show Apache Spark and its submodules. Wherever possible, we will try to illustrate by giving an example of how the functionality may be extended using extra tools.
We infer that Spark is an in-memory processing system. When used at scale (it cannot exist alone), the data must reside somewhere. It will probably be used along with the Hadoop tool set and the associated ecosystem.
Luckily, Hadoop stack providers, such as IBM and Hortonworks, provide you with an open data platform, a Hadoop stack, and cluster manager, which integrates with Apache Spark, Hadoop, and most of the current stable toolset fully based on open source.
During this book, we will use Hortonworks Data Platform (HDP®) Sandbox 2.6.
You can use an alternative configuration, but we find that the open data platform provides most of the tools that we need and automates the configuration, leaving us more time for development.
In the following sections, we will cover each of the components mentioned earlier in more detail before we dive into the material starting in the next chapter:
- Spark Machine Learning
- Spark Streaming
- Spark SQL
- Spark Graph Processing
- Extended Ecosystem
- Updates in Apache Spark
- Cluster design
- Cloud-based deployments
- Performance parameters
Machine learning is the real reason for Apache Spark because, at the end of the day, you don't want to just ship and transform data from A to B (a process called ETL (Extract Transform Load)). You want to run advanced data analysis algorithms on top of your data, and you want to run these algorithms at scale. This is where Apache Spark kicks in.
Apache Spark, in its core, provides the runtime for massive parallel data processing, and different parallel machine learning libraries are running on top of it. This is because there is an abundance on machine learning algorithms for popular programming languages like R and Python but they are not scalable. As soon as you load more data to the available main memory of the system, they crash.
Apache Spark in contrast can make use of multiple computer nodes to form a cluster and even on a single node can spill data transparently to disk therefore avoiding the main memory bottleneck. Two interesting machine learning libraries are shipped with Apache Spark, but in this work we'll also cover third-party machine learning libraries.
The Spark MLlib module, Classical MLlib, offers a growing but incomplete list of machine learning algorithms. Since the introduction of the DataFrame-based machine learning API called SparkML, the destiny of MLlib is clear. It is only kept for backward compatibility reasons.
This is indeed a very wise decision, as we will discover in the next two chapters that structured data processing and the related optimization frameworks are currently disrupting the whole Apache Spark ecosystem. In SparkML, we have a machine learning library in place that can take advantage of these improvements out of the box, using it as an underlying layer.
SparkML will eventually replace MLlib. Apache SystemML introduces the first library running on top of Apache Spark that is not shipped with the Apache Spark distribution. SystemML provides you with an execution environment of R-style syntax with a built-in cost-based optimizer. Massive parallel machine learning is an area of constant change at a high frequency. It is hard to say where that the journey goes, but it is the first time where advanced machine learning at scale is available to everyone using open source and cloud computing.
Deep learning on Apache Spark uses H20, Deeplearning4j and Apache SystemML, which are other examples of very interesting third-party machine learning libraries that are not shipped with the Apache Spark distribution.
While H20 is somehow complementary to MLlib, Deeplearning4j only focuses on deep learning algorithms. Both use Apache Spark as a means for parallelization of data processing. You might wonder why we want to tackle different machine learning libraries.
The reality is that every library has advantages and disadvantages with the implementation of different algorithms. Therefore, it often depends on your data and Dataset size which implementation you choose for best performance.
However, it is nice that there is so much choice and you are not locked in a single library when using Apache Spark. Open source means openness, and this is just one example of how we are all benefiting from this approach in contrast to a single vendor, single product lock-in. Although recently Apache Spark integrated GraphX, another Apache Spark library into its distribution, we don't expect this will happen too soon. Therefore, it is most likely that Apache Spark as a central data processing platform and additional third-party libraries will co-exist, like Apache Spark being the big data operating system and the third-party libraries are the software you install and run on top of it.
Stream processing is another big and popular topic for Apache Spark. It involves the processing of data in Spark as streams and covers topics such as input and output operations, transformations, persistence, and checkpointing, among others.
Apache Spark Streaming will cover the area of processing, and we will also see practical examples of different types of stream processing. This discusses batch and window stream configuration and provides a practical example of checkpointing. It also covers different examples of stream processing, including Kafka and Flume.
There are many ways in which stream data can be used. Other Spark module functionality (for example, SQL, MLlib, and GraphX) can be used to process the stream. You can use Spark Streaming with systems such as MQTT or ZeroMQ. You can even create custom receivers for your own user-defined data sources.
From Spark version 1.3, data frames have been introduced in Apache Spark so that Spark data can be processed in a tabular form and tabular functions (such as
groupBy) can be used to process data. The Spark SQL module integrates with Parquet and JSON formats to allow data to be stored in formats that better represent the data. This also offers more options to integrate with external systems.
The idea of integrating Apache Spark into the Hadoop Hive big data database can also be introduced. Hive context-based Spark applications can be used to manipulate Hive-based table data. This brings Spark's fast in-memory distributed processing to Hive's big data storage capabilities. It effectively lets Hive use Spark as a processing engine.
Additionally, there is an abundance of additional connectors to access NoSQL databases outside the Hadoop ecosystem directly from Apache Spark. In Chapter 2, Apache Spark SQL, we will see how the Cloudant connector can be used to access a remote ApacheCouchDB NoSQL database and issue SQL statements against JSON-based NoSQL document collections.
Graph processing is another very important topic when it comes to data analysis. In fact, a majority of problems can be expressed as a graph.
A graph is basically a network of items and their relationships to each other. Items are called nodes and relationships are called edges. Relationships can be directed or undirected. Relationships, as well as items, can have properties. So a map, for example, can be represented as a graph as well. Each city is a node and the streets between the cities are edges. The distance between the cities can be assigned as properties on the edge.
The Apache Spark GraphX module allows Apache Spark to offer fast big data in-memory graph processing. This allows you to run graph algorithms at scale.
One of the most famous algorithms, for example, is the traveling salesman problem. Consider the graph representation of the map mentioned earlier. A salesman has to visit all cities of a region but wants to minimize the distance that he has to travel. As the distances between all the nodes are stored on the edges, a graph algorithm can actually tell you the optimal route. GraphX is able to create, manipulate, and analyze graphs using a variety of built-in algorithms.
It introduces two new data types to support graph processing in Spark--VertexRDD and EdgeRDD--to represent graph nodes and edges. It also introduces graph processing algorithms, such as PageRank and triangle processing. Many of these functions will be examined in Chapter 11, Apache Spark GraphX and Chapter 12, Apache Spark GraphFrames.
When examining big data processing systems, we think it is important to look at not just the system itself, but also how it can be extended and how it integrates with external systems so that greater levels of functionality can be offered. In a book of this size, we cannot cover every option, but by introducing a topic, we can hopefully stimulate the reader's interest so that they can investigate further.
We have used the H2O machine learning library, SystemML and Deeplearning4j, to extend Apache Spark's MLlib machine learning module. We have shown that Deeplearning and highly performant cost-based optimized machine learning can be introduced to Apache Spark. However, we have just scratched the surface of all the frameworks' functionality.
Since Apache Spark V2, many things have changed. This doesn't mean that the API has been broken. In contrast, most of the V1.6 Apache Spark applications will run on Apache Spark V2 with or without very little changes, but under the hood, there have been a lot of changes.
The first and most interesting thing to mention is the newest functionalities of the Catalyst Optimizer, which we will cover in detail in Chapter 3, The Catalyst Optimizer. Catalyst creates a Logical Execution Plan (LEP) from a SQL query and optimizes this LEP to create multiple Physical Execution Plans (PEPs). Based on statistics, Catalyst chooses the best PEP to execute. This is very similar to cost-based optimizers in Relational Data Base Management Systems (RDBMs). Catalyst makes heavy use of Project Tungsten, a component that we will cover in Chapter 4, Apache Spark Streaming.
Although the Java Virtual Machine (JVM) is a masterpiece on its own, it is a general-purpose byte code execution engine. Therefore, there is a lot of JVM object management and garbage collection (GC) overhead. So, for example, to store a 4-byte string, 48 bytes on the JVM are needed. The GC optimizes on object lifetime estimation, but Apache Spark often knows this better than JVM. Therefore, Tungsten disables the JVM GC for a subset of privately managed data structures to make them L1/L2/L3 Cache-friendly.
In addition, code generation removed the boxing of primitive types polymorphic function dispatching. Finally, a new first-class citizen called Dataset unified the RDD and DataFrame APIs. Datasets are statically typed and avoid runtime type errors. Therefore, Datasets can be used only with Java and Scala. This means that Python and R users still have to stick to DataFrames, which are kept in Apache Spark V2 for backward compatibility reasons.
As we have already mentioned, Apache Spark is a distributed, in-memory, parallel processing system, which needs an associated storage system. So, when you build a big data cluster, you will probably use a distributed storage system such as Hadoop, as well as tools to move data such as Sqoop, Flume, and Kafka.
We wanted to introduce the idea of edge nodes in a big data cluster. These nodes in the cluster will be client-facing, on which reside the client-facing components such as Hadoop NameNode or perhaps the Spark master. Majority of the big data cluster might be behind a firewall. The edge nodes would then reduce the complexity caused by the firewall as they would be the only points of contact accessible from outside. The following figure shows a simplified big data cluster:
It shows five simplified cluster nodes with executor JVMs, one per CPU core, and the Spark Driver JVM sitting outside the cluster. In addition, you see the disk directly attached to the nodes. This is called the JBOD (just a bunch of disks) approach. Very large files are partitioned over the disks and a virtual filesystem such as HDFS makes these chunks available as one large virtual file. This is, of course, stylized and simplified, but you get the idea.
The following simplified component model shows the driver JVM sitting outside the cluster. It talks to the Cluster Manager in order to obtain permission to schedule tasks on the worker nodes because the Cluster Manager keeps track of resource allocation of all processes running on the cluster.
As we will see later, there is a variety of different cluster managers, some of them also capable of managing other Hadoop workloads or even non-Hadoop applications in parallel to the Spark Executors. Note that the Executor and Driver have bidirectional communication all the time, so network-wise, they should also be sitting close together:
Figure source: https://spark.apache.org/docs/2.0.2/cluster-overview.html
Generally, firewalls, while adding security to the cluster, also increase the complexity. Ports between system components need to be opened up so that they can talk to each other. For instance, Zookeeper is used by many components for configuration. Apache Kafka, the publish/subscribe messaging system, uses Zookeeper to configure its topics, groups, consumers, and producers. So, client ports to Zookeeper, potentially across the firewall, need to be open.
Finally, the allocation of systems to cluster nodes needs to be considered. For instance, if Apache Spark uses Flume or Kafka, then in-memory channels will be used. The size of these channels, and the memory used, caused by the data flow, need to be considered. Apache Spark should not be competing with other Apache components for memory usage. Depending upon your data flows and memory usage, it might be necessary to have Spark, Hadoop, Zookeeper, Flume, and other tools on distinct cluster nodes. Alternatively, resource managers such as YARN, Mesos, or Docker can be used to tackle this problem. In standard Hadoop environments, YARN is most likely.
Generally, the edge nodes that act as cluster NameNode servers or Spark master servers will need greater resources than the cluster processing nodes within the firewall. When many Hadoop ecosystem components are deployed on the cluster, all of them will need extra memory on the master server. You should monitor edge nodes for resource usage and adjust in terms of resources and/or application location as necessary. YARN, for instance, is taking care of this.
This section has briefly set the scene for the big data cluster in terms of Apache Spark, Hadoop, and other tools. However, how might the Apache Spark cluster itself, within the big data cluster, be configured? For instance, it is possible to have many types of the Spark cluster manager. The next section will examine this and describe each type of the Apache Spark cluster manager.
The Spark context, as you will see in many of the examples in this book, can be defined via a Spark configuration object and Spark URL. The Spark context connects to the Spark cluster manager, which then allocates resources across the worker nodes for the application. The cluster manager allocates executors across the cluster worker nodes. It copies the application JAR file to the workers and finally allocates tasks.
The following subsections describe the possible Apache Spark cluster manager options available at this time.
By specifying a Spark configuration local URL, it is possible to have the application run locally. By specifying
local[n], it is possible to have Spark use n threads to run the application locally. This is a useful development and test option because you can also test some sort of parallelization scenarios but keep all log files on a single machine.
Standalone mode uses a basic cluster manager that is supplied with Apache Spark. The spark master URL will be as follows:
<hostname> is the name of the host on which the Spark master is running. We have specified
7077 as the port, which is the default value, but this is configurable. This simple cluster manager currently supports only FIFO (first-in first-out) scheduling. You can contrive to allow concurrent application scheduling by setting the resource configuration options for each application; for instance, using
spark.core.max to share cores between applications.
At a larger scale, when integrating with Hadoop YARN, the Apache Spark cluster manager can be YARN and the application can run in one of two modes. If the Spark master value is set as
yarn-cluster, then the application can be submitted to the cluster and then terminated. The cluster will take care of allocating resources and running tasks. However, if the application master is submitted as
yarn-client, then the application stays alive during the life cycle of processing, and requests resources from YARN.
Apache Mesos is an open source system for resource sharing across a cluster. It allows multiple frameworks to share a cluster by managing and scheduling resources. It is a cluster manager that provides isolation using Linux containers and allowing multiple systems such as Hadoop, Spark, Kafka, Storm, and more to share a cluster safely. It is highly scalable to thousands of nodes. It is a master/slave-based system and is fault tolerant, using Zookeeper for configuration management.
For a single master node Mesos cluster, the Spark master URL will be in this form:
<hostname> is the hostname of the Mesos master server; the port is defined as
5050, which is the default Mesos master port (this is configurable). If there are multiple Mesos master servers in a large-scale high availability Mesos cluster, then the Spark master URL would look as follows:
So, the election of the Mesos master server will be controlled by Zookeeper. The
<hostname> will be the name of a host in the Zookeeper quorum. Also, the port number,
2181, is the default master port for Zookeeper.
There are three different abstraction levels of cloud systems--Infrastructure as a Service (IaaS), Platform as a Service (PaaS), and Software as a Service (SaaS). We will see how to use and install Apache Spark on all of these.
The new way to do IaaS is Docker and Kubernetes as opposed to virtual machines, basically providing a way to automatically set up an Apache Spark cluster within minutes. This will be covered inChapter 14, Apache Spark on Kubernetes.The advantage of Kubernetes is that it can be used among multiple different cloud providers as it is an open standard and also based on open source.
You even can use Kubernetes in a local data center and transparently and dynamically move workloads between local, dedicated, and public cloud data centers. PaaS, in contrast, takes away from you the burden of installing and operating an Apache Spark cluster because this is provided as a service.
There is an ongoing discussion whether Docker is IaaS or PaaS but, in our opinion, this is just a form of a lightweight preinstalled virtual machine. We will cover more on PaaS inChapter 13, Apache Spark with Jupyter Notebooks on IBM DataScience Experience.This is particularly interesting because the offering is completely based on open source technologies, which enables you to replicate the system on any other data center.
One of the open source components we'll introduce is Jupyter notebooks, a modern way to do data science in a cloud based collaborative environment. But in addition to Jupyter, there is also Apache Zeppelin, which we'll cover briefly in Chapter 14, Apache Spark on Kubernetes.
Before moving on to the rest of the chapters covering functional areas of Apache Spark and extensions, we will examine the area of performance. What issues and areas need to be considered? What might impact the Spark application performance starting at the cluster level and finishing with actual Scala code? We don't want to just repeat what the Spark website says, so take a look at this URL:
<version> relates to the version of Spark that you are using; that is, either the latest or something like
1.6.1 for a specific version. So, having looked at this page, we will briefly mention some of the topic areas. We will list some general points in this section without implying an order of importance.
The size and structure of your big data cluster is going to affect performance. If you have a cloud-based cluster, your IO and latency will suffer in comparison to an unshared hardware cluster. You will be sharing the underlying hardware with multiple customers and the cluster hardware may be remote. There are some exceptions to this. The IBM cloud, for instance, offers dedicated bare metal high performance cluster nodes with an InfiniBand network connection, which can be rented on an hourly basis.
Additionally, the positioning of cluster components on servers may cause resource contention. For instance, think carefully about locating Hadoop NameNodes, Spark servers, Zookeeper, Flume, and Kafka servers in large clusters. With high workloads, you might consider segregating servers to individual systems. You might also consider using an Apache system such as Mesos that provides better distributions and assignment of resources to the individual processes.
Consider potential parallelism as well. The greater the number of workers in your Spark cluster for large Datasets, the greater the opportunity for parallelism. One rule of thumb is one worker per hyper-thread or virtual core respectively.
You might consider using an alternative to HDFS, depending upon your cluster requirements. For instance, IBM has the GPFS (General Purpose File System) for improved performance.
The reason why GPFS might be a better choice is that, coming from the high performance computing background, this filesystem has a full read write capability, whereas HDFS is designed as a write once, read many filesystem. It offers an improvement in performance over HDFS because it runs at the kernel level as opposed to HDFS, which runs in a Java Virtual Machine (JVM) that in turn runs as an operating system process. It also integrates with Hadoop and the Spark cluster tools. IBM runs setups with several hundred petabytes using GPFS.
Another commercial alternative is the MapR file system that, besides performance improvements, supports mirroring, snapshots, and high availability.
Ceph is an open source alternative to a distributed, fault-tolerant, and self-healing filesystem for commodity hard drives like HDFS. It runs in the Linux kernel as well and addresses many of the performance issues that HDFS has. Other promising candidates in this space are Alluxio (formerly Tachyon), Quantcast, GlusterFS, and Lustre.
Finally, Cassandra is not a filesystem but a NoSQL key value store and is tightly integrated with Apache Spark and is therefore traded as a valid and powerful alternative to HDFS--or even to any other distributed filesystem--especially as it supports predicate push-down using
ApacheSparkSQL and the Catalyst optimizer, which we will cover in the following chapters.
The key for good data processing performance is avoidance of network transfers. This was very true a couple of years ago but is less relevant for tasks with high demands on CPU and low I/O, but for low demand on CPU and high I/O demand data processing algorithms, this still holds.
We can conclude from this that HDFS is one of the best ways to achieve data locality as chunks of files are distributed on the cluster nodes, in most of the cases, using hard drives directly attached to the server systems. This means that those chunks can be processed in parallel using the CPUs on the machines where individual data chunks are located in order to avoid network transfer.
Another way to achieve data locality is using
ApacheSparkSQL. Depending on the connector implementation, SparkSQL can make use of data processing capabilities of the source engine. So for example when using MongoDB in conjunction with SparkSQL parts of the SQL statement are preprocessed by MongoDB before data is sent upstream to Apache Spark.
In order to avoid OOM (Out of Memory) messages for the tasks on your Apache Spark cluster, please consider a number of questions for the tuning:
- Consider the level of physical memory available on your Spark worker nodes. Can it be increased? Check on the memory consumption of operating system processes during high workloads in order to get an idea of free memory. Make sure that the workers have enough memory.
- Consider data partitioning. Can you increase the number of partitions? As a rule of thumb, you should have at least as many partitions as you have available CPU cores on the cluster. Use the
repartitionfunction on the RDD API.
- Can you modify the storage fraction and the memory used by the JVM for storage and caching of RDDs? Workers are competing for memory against data storage. Use the Storage page on the Apache Spark user interface to see if this fraction is set to an optimal value. Then update the following properties:
In addition, the following two things can be done in order to improve performance:
- Consider using Parquet as a storage format, which is much more storage effective than CSV or JSON
- Consider using the DataFrame/Dataset API instead of the RDD API as it might resolve in more effective executions (more about this in the next three chapters)
Try to tune your code to improve the Spark application performance. For instance, filter your application-based data early in your ETL cycle. One example is when using raw HTML files, detag them and crop away unneeded parts at an early stage. Tune your degree of parallelism, try to find the resource-expensive parts of your code, and find alternatives.
ETL is one of the first things you are doing in an analytics project. So you are grabbing data from third-party systems, either by directly accessing relational or NoSQL databases or by reading exports in various file formats such as, CSV, TSV, JSON or even more exotic ones from local or remote filesystems or from a staging area in HDFS: after some inspections and sanity checks on the files an ETL process in Apache Spark basically reads in the files and creates RDDs or DataFrames/Datasets out of them.
They are transformed so they fit to the downstream analytics application running on top of Apache Spark or other applications and then stored back into filesystems as either JSON, CSV or PARQUET files, or even back to relational or NoSQL databases.
Finally, I can recommend the following resource for any performance-related problems with Apache Spark: https://spark.apache.org/docs/latest/tuning.html.
Although parts of this book will concentrate on examples of Apache Spark installed on physically server-based clusters, we want to make a point that there are multiple cloud-based options out there that imply many benefits. There are cloud-based systems that use Apache Spark as an integrated component and cloud-based systems that offer Spark as a service. Even though this book cannot cover all of them in depth, it would be useful to mention some of them:
- Chapter 13, Apache Spark with Jupyter Notebooks on IBM DataScience Experience, is an example of a completely open source based offering from IBM
- Chapter 14, Apache Spark on Kubernetes, covers the Kubernetes Cloud Orchestrator, which is available as offering of on many cloud services (including IBM) and also rapidly becoming state-of-the-art in enterprise data centers
In closing this chapter, we invite you to work your way through each of the Scala code-based examples in the following chapters. The rate at which Apache Spark has evolved is impressive, and important to note is the frequency of the releases. So even though, at the time of writing, Spark has reached 2.2, we are sure that you will be using a later version.
If you encounter problems, report them at www.stackoverflow.com and tag them accordingly; you'll receive feedback within minutes--the user community is very active. Another way of getting information and help is subscribing to the Apache Spark mailing list:
By the end of this chapter, you should have a good idea what's waiting for you in this book. We've dedicated our effort to showing you practical examples that are, on the one hand, practical recipes to solve day-to-day problems, but on the other hand, also support you in understanding the details of things taking place behind the scenes. This is very important for writing good data products and a key differentiation from others.
The next chapter focuses on
ApacheSparkSQL. We believe that this is one of the hottest topics that has been introduced to Apache Spark for two reasons.
First, SQL is a very old and established language for data processing. It was invented by IBM in the 1970s and soon will be nearly half a century old. However, what makes SQL different from other programming languages is that, in SQL, you don't declare how something is done but what should be achieved. This gives a lot of room for downstream optimizations.
This leads us to the second reason. As structured data processing continuously becomes the standard way of data analysis in Apache Spark, optimizers such as Tungsten and Catalyst play an important role; so important that we've dedicated two entire chapters to the topic. So stay tuned and enjoy!