Apache Spark is a distributed and highly scalable in-memory data analytics system, providing the ability to develop applications in Java, Scala, Python, as well as languages like R. It has one of the highest contribution/involvement rates among the Apache top level projects at this time. Apache systems, such as Mahout, now use it as a processing engine instead of MapReduce. Also, as will be shown in Chapter 4, Apache Spark SQL, it is possible to use a Hive context to have the Spark applications process data directly to and from Apache Hive.
Apache Spark provides four main submodules, which are SQL, MLlib, GraphX, and Streaming. They will all be explained in their own chapters, but a simple overview would be useful here. The modules are interoperable, so data can be passed between them. For instance, streamed data can be passed to SQL, and a temporary table can be created.
The following figure explains how this book will address Apache Spark and its modules. The top two rows show Apache Spark, and its four submodules described earlier. However, wherever possible, I always try to show by giving an example how the functionality may be extended using the extra tools:
For instance, the data streaming module explained in Chapter 3, Apache Spark Streaming, will have worked examples, showing how data movement is performed using Apache Kafka and Flume. The MLlib or the machine learning module will have its functionality examined in terms of the data processing functions that are available, but it will also be extended using the H2O system and deep learning.
The previous figure is, of course, simplified. It represents the system relationships presented in this book. For instance, there are many more routes between Apache Spark modules and HDFS than the ones shown in the preceding diagram.
The Spark SQL chapter will also show how Spark can use a Hive Context. So, a Spark application can be developed to create Hive-based objects, and run Hive QL against Hive tables, stored in HDFS.
Chapter 5, Apache Spark GraphX, and Chapter 6, Graph-based Storage, will show how the Spark GraphX module can be used to process big data scale graphs, and how they can be stored using the Titan graph database. It will be shown that Titan will allow big data scale graphs to be stored, and queried as graphs. It will show, by an example, that Titan can use both, HBase and Cassandra as a storage mechanism. When using HBase, it will be shown that implicitly, Titan uses HDFS as a cheap and reliable distributed storage mechanism.
So, I think that this section has explained that Spark is an in-memory processing system. When used at scale, it cannot exist aloneâthe data must reside somewhere. It will probably be used along with the Hadoop tool set, and the associated eco-system. Luckily, Hadoop stack providers, such as Cloudera, provide the CDH Hadoop stack and cluster manager, which integrates with Apache Spark, Hadoop, and most of the current stable tool set. During this book, I will use a small CDH 5.3 cluster installed on CentOS 6.5 64 bit servers. You can use an alternative configuration, but I find that CDH provides most of the tools that I need, and automates the configuration, leaving me more time for development.
Having mentioned the Spark modules and the software that will be introduced in this book, the next section will describe the possible design of a big data cluster.
In this section, I wish to provide an overview of the functionality that will be introduced in this book in terms of Apache Spark, and the systems that will be used to extend it. I will also try to examine the future of Apache Spark, as it integrates with cloud storage.
When you examine the documentation on the Apache Spark website (http://spark.apache.org/), you will see that there are topics that cover SparkR and Bagel. Although I will cover the four main Spark modules in this book, I will not cover these two topics. I have limited time and scope in this book so I will leave these topics for reader investigation or for a future date.
The Spark MLlib module offers machine learning functionality over a number of domains. The documentation available at the Spark website introduces the data types used (for example, vectors and the LabeledPoint structure). This module offers functionality that includes:
Frequent Pattern Mining
The Scala-based practical examples of KMeans, NaÃ¯ve Bayes, and Artificial Neural Networks have been introduced and discussed in Chapter 2, Apache Spark MLlib of this book.
Stream processing is another big and popular topic for Apache Spark. It involves the processing of data in Spark as streams, and covers topics such as input and output operations, transformations, persistence, and check pointing among others.
Chapter 3, Apache Spark Streaming, covers this area of processing, and provides practical examples of different types of stream processing. It discusses batch and window stream configuration, and provides a practical example of checkpointing. It also covers different examples of stream processing, including Kafka and Flume.
There are many more ways in which stream data can be used. Other Spark module functionality (for example, SQL, MLlib, and GraphX) can be used to process the stream. You can use Spark streaming with systems such as Kinesis or ZeroMQ. You can even create custom receivers for your own user-defined data sources.
From Spark version 1.3 data frames have been introduced into Apache Spark so that Spark data can be processed in a tabular form and tabular functions (like select, filter, groupBy) can be used to process data. The Spark SQL module integrates with Parquet and JSON formats to allow data to be stored in formats that better represent data. This also offers more options to integrate with external systems.
The idea of integrating Apache Spark into the Hadoop Hive big data database can also be introduced. Hive context-based Spark applications can be used to manipulate Hive-based table data. This brings Spark's fast in-memory distributed processing to Hive's big data storage capabilities. It effectively lets Hive use Spark as a processing engine.
The Apache Spark GraphX module allows Spark to offer fast, big data in memory graph processing. A graph is represented by a list of vertices and edges (the lines that connect the vertices). GraphX is able to create and manipulate graphs using the property, structural, join, aggregation, cache, and uncache operators.
It introduces two new data types to support graph processing in Spark: VertexRDD and EdgeRDD to represent graph vertexes and edges. It also introduces graph processing example functions, such as PageRank and triangle processing. Many of these functions will be examined in Chapter 5, Apache Spark GraphX.
When examining big data processing systems, I think it is important to look at not just the system itself, but also how it can be extended, and how it integrates with external systems, so that greater levels of functionality can be offered. In a book of this size, I cannot cover every option, but hopefully by introducing a topic, I can stimulate the reader's interest, so that they can investigate further.
I have used the H2O machine learning library system to extend Apache Spark's machine learning module. By using an H2O deep learning Scala-based example, I have shown how neural processing can be introduced to Apache Spark. I am, however, aware that I have just scratched the surface of H2O's functionality. I have only used a small neural cluster and a single type of classification functionality. Also, there is a lot more to H2O than deep learning.
As graph processing becomes more accepted and used in the coming years, so will graph based storage. I have investigated the use of Spark with the NoSQL database Neo4J, using the Mazerunner prototype application. I have also investigated the use of the Aurelius (Datastax) Titan database for graph-based storage. Again, Titan is a database in its infancy, which needs both community support and further development. But I wanted to examine the future options for Apache Spark integration.
The next section will show that the Apache Spark release contains scripts to allow a Spark cluster to be created on AWS EC2 storage. There are a range of options available that allow the cluster creator to define attributes such as cluster size and storage type. But this type of cluster is difficult to resize, which makes it difficult to manage changing requirements. If the data volume changes or grows over time a larger cluster maybe required with more memory.
Luckily, the people that developed Apache Spark have created a new start-up called Databricks https://databricks.com/, which offers web console-based Spark cluster management, plus a lot of other functionality. It offers the idea of work organized by notebooks, user access control, security, and a mass of other functionality. It is described at the end of this book.
It is a service in its infancy, currently only offering cloud-based storage on Amazon AWS, but it will probably extend to Google and Microsoft Azure in the future. The other cloud-based providers, that is, Google and Microsoft Azure, are also extending their services, so that they can offer Apache Spark processing in the cloud.
As I already mentioned, Apache Spark is a distributed, in-memory, parallel processing system, which needs an associated storage mechanism. So, when you build a big data cluster, you will probably use a distributed storage system such as Hadoop, as well as tools to move data like Sqoop, Flume, and Kafka.
I wanted to introduce the idea of edge nodes in a big data cluster. Those nodes in the cluster will be client facing, on which reside the client facing components like the Hadoop NameNode or perhaps the Spark master. The majority of the big data cluster might be behind a firewall. The edge nodes would then reduce the complexity caused by the firewall, as they would be the only nodes that would be accessible. The following figure shows a simplified big data cluster:
It shows four simplified cluster racks with switches and edge node computers, facing the client across the firewall. This is, of course, stylized and simplified, but you get the idea. The general processing nodes are hidden behind a firewall (the dotted line), and are available for general processing, in terms of Hadoop, Apache Spark, Zookeeper, Flume, and/or Kafka. The following figure represents a couple of big data cluster edge nodes, and attempts to show what applications might reside on them.
The edge node applications will be the master applications similar to the Hadoop NameNode, or the Apache Spark master server. It will be the components that are bringing the data into and out of the cluster such as Flume, Sqoop, and Kafka. It can be any component that makes a user interface available to the client user similar to Hive:
Generally, firewalls, while adding security to the cluster, also increase the complexity. Ports between system components need to be opened up, so that they can talk to each other. For instance, Zookeeper is used by many components for configuration. Apache Kafka, the publish subscribe messaging system, uses Zookeeper for configuring its topics, groups, consumers, and producers. So client ports to Zookeeper, potentially across the firewall, need to be open.
Finally, the allocation of systems to cluster nodes needs to be considered. For instance, if Apache Spark uses Flume or Kafka, then in-memory channels will be used. The size of these channels, and the memory used, caused by the data flow, need to be considered. Apache Spark should not be competing with other Apache components for memory usage. Depending upon your data flows and memory usage, it might be necessary to have the Spark, Hadoop, Zookeeper, Flume, and other tools on distinct cluster nodes.
Generally, the edge nodes that act as cluster NameNode servers, or Spark master servers, will need greater resources than the cluster processing nodes within the firewall. For instance, a CDH cluster node manager server will need extra memory, as will the Spark master server. You should monitor edge nodes for resource usage, and adjust in terms of resources and/or application location as necessary.
This section has briefly set the scene for the big data cluster in terms of Apache Spark, Hadoop, and other tools. However, how might the Apache Spark cluster itself, within the big data cluster, be configured? For instance, it is possible to have many types of Spark cluster manager. The next section will examine this, and describe each type of Apache Spark cluster manager.
The following diagram, borrowed from the spark.apache.org website, demonstrates the role of the Apache Spark cluster manager in terms of the master, slave (worker), executor, and Spark client applications:
The Spark context, as you will see from many of the examples in this book, can be defined via a Spark configuration object, and a Spark URL. The Spark context connects to the Spark cluster manager, which then allocates resources across the worker nodes for the application. The cluster manager allocates executors across the cluster worker nodes. It copies the application jar file to the workers, and finally it allocates tasks.
By specifying a Spark configuration local URL, it is possible to have the application run locally. By specifying local[n], it is possible to have Spark use
<n> threads to run the application locally. This is a useful development and test option.
<hostname> is the name of the host on which the Spark master is running. I have specified
7077 as the port, which is the default value, but it is configurable. This simple cluster manager, currently, only supports FIFO (first in first out) scheduling. You can contrive to allow concurrent application scheduling by setting the resource configuration options for each application. For instance, using
spark.core.max to share cores between applications.
At a larger scale when integrating with Hadoop YARN, the Apache Spark cluster manager can be YARN, and the application can run in one of two modes. If the Spark master value is set as yarn-cluster, then the application can be submitted to the cluster, and then terminated. The cluster will take care of allocating resources and running tasks. However, if the application master is submitted as yarn-client, then the application stays alive during the life cycle of processing, and requests resources from YARN.
Apache Mesos is an open source system for resource sharing across a cluster. It allows multiple frameworks to share a cluster by managing and scheduling resources. It is a cluster manager, which provides isolation using Linux containers, allowing multiple systems, like Hadoop, Spark, Kafka, Storm, and more to share a cluster safely. It is highly scalable to thousands of nodes. It is a master slave-based system, and is fault tolerant, using Zookeeper for configuration management.
<hostname> is the host name of the Mesos master server, the port is defined as
5050, which is the default Mesos master port (this is configurable). If there are multiple Mesos master servers in a large scale high availability Mesos cluster, then the Spark master URL would look like this:
So, the election of the Mesos master server will be controlled by Zookeeper. The
<hostname> will be the name of a host in the Zookeeper quorum. Also, the port number
2181 is the default master port for Zookeeper.
The Apache Spark release contains scripts for running Spark in the cloud against Amazon AWS EC2-based servers. The following listing, as an example, shows Spark 1.3.1 installed on a Linux CentOS server, under the directory called
/usr/local/spark/. The EC2 resources are available in the Spark release EC2 subdirectory:
[[email protected] ec2]$ pwd /usr/local/spark/ec2 [[email protected] ec2]$ ls deploy.generic README spark-ec2 spark_ec2.py
In order to use Apache Spark on EC2, you will need to set up an Amazon AWS account. You can set up an initial free account to try it out here: http://aws.amazon.com/free/.
If you take a look at Chapter 8, Spark Databricks you will see that such an account has been set up, and is used to access https://databricks.com/. The next thing that you will need to do is access your AWS IAM Console, and select the Users option. You either create or select a user. Select the User Actions option, and then select Manage Access Keys. Then, select Create Access Key, and then Download Credentials. Make sure that your downloaded key file is secure, assuming that you are on Linux chmod the file with permissions
= 600 for user-only access.
You will now have your Access Key ID, Secret Access Key, key file, and key pair name. You can now create a Spark EC2 cluster using the
spark-ec2 script as follows:
export AWS_ACCESS_KEY_ID="QQpl8Exxx" export AWS_SECRET_ACCESS_KEY="0HFzqt4xxx" ./spark-ec2 \ --key-pair=pairname \ --identity-file=awskey.pem \ --region=us-west-1 \ --zone=us-west-1a \ launch cluster1
<pairname> is the key pair name that you gave when your access details were created;
<awskey.pem> is the file that you downloaded. The name of the cluster that you are going to create is called
<cluster1>. The region chosen here is in the western USA,
us-west-1. If you live in the Pacific, as I do, it might be wiser to choose a nearer region like
ap-southeast-2. However, if you encounter allowance access issues, then you will need to try another zone. Remember also that using cloud-based Spark clustering like this will have higher latency and poorer I/O in general. You share your cluster hosts with multiple users, and your cluster maybe in a remote region.
You can use a series of options to this basic command to configure the cloud-based Spark cluster that you create. The
âs option can be used:
This allows you to define how many worker nodes to create in your Spark EC2 cluster, that is,
âs 5 for a six node cluster, one master, and five slave workers. You can define the version of Spark that your cluster runs, rather than the default latest version. The following option starts a cluster with Spark version 1.3.1:
The instance type used to create the cluster will define how much memory is used, and how many cores are available. For instance, the following option will set the instance type to be
The current instance types for Amazon AWS can be found at: http://aws.amazon.com/ec2/instance-types/.
The following figure shows the current (as of July 2015) AWS M3 instance types, model details, cores, memory, and storage. There are many instance types available at this time; for instance, T2, M4, M3, C4, C3, R3, and more. Examine the current availability and choose appropriately:
Pricing is also very important. The current AWS storage type prices can be found at: http://aws.amazon.com/ec2/pricing/.
The prices are shown by region with a drop-down menu, and a price by hour. Remember that each storage type is defined by cores, memory, and physical storage. The prices are also defined by operating system type, that is, Linux, RHEL, and Windows. Just select the OS via a top-level menu.
The following figure shows an example of pricing at the time of writing (July 2015); it is just provided to give an idea. Prices will differ over time, and by service provider. They will differ by the size of storage that you need, and the length of time that you are willing to commit to.
Be aware also of the costs of moving your data off of any storage platform. Try to think long term. Check whether you will need to move all, or some of your cloud-based data to the next system in, say, five years. Check the process to move data, and include that cost in your planning.
As described, the preceding figure shows the costs of AWS storage types by operating system, region, storage type, and hour. The costs are measured per unit hour, so systems such as https://databricks.com/ do not terminate EC2 instances, until a full hour has elapsed. These costs will change with time and need to be monitored via (for AWS) the AWS billing console.
You may also have problems when wanting to resize your Spark EC2 cluster, so you will need to be sure of the master slave configuration before you start. Be sure how many workers you are going to require, and how much memory you need. If you feel that your requirements are going to change over time, then you might consider using https://databricks.com/, if you definitely wish to work with Spark in the cloud. Go to Chapter 8, Spark Databricks and see how you can set up, and use https://databricks.com/.
In the next section, I will examine Apache Spark cluster performance, and the issues that might impact it.
Before moving on to the rest of the chapters covering functional areas of Apache Spark and extensions to it, I wanted to examine the area of performance. What issues and areas need to be considered? What might impact Spark application performance starting at the cluster level, and finishing with actual Scala code? I don't want to just repeat what the Spark website says, so have a look at the following URL:
<version> relates to the version of Spark that you are using, that is, latest, or 1.3.1 for a specific version. So, having looked at that page, I will briefly mention some of the topic areas. I am going to list some general points in this section without implying an order of importance.
The size and structure of your big data cluster is going to affect performance. If you have a cloud-based cluster, your IO and latency will suffer in comparison to an unshared hardware cluster. You will be sharing the underlying hardware with multiple customers, and that the cluster hardware maybe remote.
Also, the positioning of cluster components on servers may cause resource contention. For instance, if possible, think carefully about locating Hadoop NameNodes, Spark servers, Zookeeper, Flume, and Kafka servers in large clusters. With high workloads, you might consider segregating servers to individual systems. You might also consider using an Apache system such as Mesos in order to share resources.
Also, consider potential parallelism. The greater the number of workers in your Spark cluster for large data sets, the greater the opportunity for parallelism.
You might consider using an alternative to HDFS, depending upon your cluster requirements. For instance, MapR has the MapR-FS NFS-based read write file system for improved performance. This file system has a full read write capability, whereas HDFS is designed as a write once, read many file system. It offers an improvement in performance over HDFS. It also integrates with Hadoop and the Spark cluster tools. Bruce Penn, an architect at MapR, has written an interesting article describing its features at: https://www.mapr.com/blog/author/bruce-penn.
Just look for the blog post entitled
Comparing MapR-FS and HDFS NFS and Snapshots. The links in the article describe the MapR architecture, and possible performance gains.
As the previous tuning link mentions, if the data is remote, then functionality and data must be brought together for processing. Spark will try to use the best data locality level possible for task processing.
Consider the level of physical memory available on your Spark worker nodes. Can it be increased?
Consider data partitioning. Can you increase the number of partitions in the data used by your Spark application code?
Can you increase the storage fraction, the memory used by the JVM for storage and caching of RDD's?
Consider tuning data structures used to reduce memory.
Consider serializing your RDD storage to reduce the memory usage.
Although, most of this book will concentrate on examples of Apache Spark installed on physically server-based clusters (with the exception of https://databricks.com/), I wanted to make the point that there are multiple cloud-based options out there. There are cloud-based systems that use Apache Spark as an integrated component, and cloud-based systems that offer Spark as a service. Even though this book cannot cover all of them in depth, I thought that it would be useful to mention some of them:
Databricks is covered in two chapters in this book. It offers a Spark cloud-based service, currently using AWS EC2. There are plans to extend the service to other cloud suppliers (https://databricks.com/).
At the time of writing (July 2015) this book, Microsoft Azure has been extended to offer Spark support.
Apache Spark and Hadoop can be installed on Google Cloud.
The Oryx system has been built at the top of Spark and Kafka for real-time, large-scale machine learning (http://oryx.io/).
The velox system for serving machine learning prediction is based upon Spark and KeystoneML (https://github.com/amplab/velox-modelserver).
PredictionIO is an open source machine learning service built on Spark, HBase, and Spray (https://prediction.io/).
SeldonIO is an open source predictive analytics platform, based upon Spark, Kafka, and Hadoop (http://www.seldon.io/).
In closing this chapter, I would invite you to work your way through each of the Scala code-based examples in the following chapters. I have been impressed by the rate at which Apache Spark has evolved, and I am also impressed at the frequency of the releases. So, even though at the time of writing, Spark has reached 1.4, I am sure that you will be using a later version. If you encounter problems, tackle them logically. Try approaching the Spark user group for assistance (
<[email protected]>), or check the Spark website at http://spark.apache.org/.
I am always interested to hear from people, and connect with people on sites such as LinkedIn. I am keen to hear about the projects that people are involved with and new opportunities. I am interested to hear about Apache Spark, the ways that you use it and the systems that you build being used at scale. I can be contacted on LinkedIn at: linkedin.com/profile/view?id=73219349.