In this chapter, we will set up Spark and configure it. This chapter contains the following recipes:
Apache Spark is a general-purpose cluster computing system to process big data workloads. What sets Spark apart from its predecessors, such as Hadoop MapReduce, is its speed, ease of use, and sophisticated analytics.
It was originally developed at AMPLab, UC Berkeley, in 2009. It was made open source in 2010 under the BSD license and switched to the Apache 2.0 license in 2013. Toward the later part of 2013, the creators of Spark founded Databricks to focus on Spark's development and future releases.
Databricks offers Spark as a service in the Amazon Web Services(AWS) Cloud, called Databricks Cloud. In this book, we are going to maximize the use of AWS as a data storage layer.
Talking about speed, Spark can achieve subsecond latency on big data workloads. To achieve such low latency, Spark makes use of memory for storage. In MapReduce, memory is primarily used for the actual computation. Spark uses memory both to compute and store objects.
Spark also provides a unified runtime connecting to various big data storage sources, such as HDFS, Cassandra, and S3. It also provides a rich set of high-level libraries for different big data compute tasks, such as machine learning, SQL processing, graph processing, and real-time streaming. These libraries make development faster and can be combined in an arbitrary fashion.
Though Spark is written in Scala--and this book only focuses on recipes on Scala--it also supports Java, Python, and R.
Spark is an open source community project, and everyone uses the pure open source Apache distributions for deployments, unlike Hadoop, which has multiple distributions available with vendor enhancements.
The following figure shows the Spark ecosystem:
Spark's runtime runs on top of a variety of cluster managers, including YARN (Hadoop's compute framework), Mesos, and Spark's own cluster manager called Standalone mode. Alluxio is a memory-centric distributed file system that enables reliable file sharing at memory speed across cluster frameworks. In short, it is an off-heap storage layer in memory that helps share data across jobs and users. Mesos is a cluster manager, which is evolving into a data center operating system. YARN is Hadoop's compute framework and has a robust resource management feature that Spark can seamlessly use.
Apache Spark, initially devised as a replacement of MapReduce, had a good proportion of workloads running in an on-premises manner. Now, most of the workloads have been moved to public clouds (AWS, Azure, and GCP). In a public cloud, we see two types of applications:
For outcome-driven applications, where the goal is to derive a predefined signal/outcome from the given data, Databricks Cloud fits the bill perfectly. For traditional data transformation pipelines, Amazon's Elastic MapReduce (EMR) does a great job.
Databricks is the company behind Spark. It has a cloud platform that takes out all of the complexity of deploying Spark and provides you with a ready-to-go environment with notebooks for various languages. Databricks Cloud also has a community edition that provides one node instance with 6 GB of RAM for free. It is a great starting place for developers. The Spark cluster that is created also terminates after 2 hours of sitting idle.
All the recipes in this book can be run on either the InfoObjects
Sandbox or Databricks Cloud community edition. The entire data for the recipes in this book has also been ported to a public bucket called sparkcookbook
on S3. Just put these recipes on the Databricks Cloud community edition, and they will work seamlessly.
Sign Up
:
COMMUNITY EDITION
(or full platform): Clusters
, then Create Cluster
(showing community edition below it):myfirstcluster
, and choose Availability Zone (more about AZs in the next recipe). Then click on Create Cluster
:Home
and click on Notebook
. Choose an appropriate notebook name, for example, config
, and choose Scala
as the language:ACCESS_KEY
: This is referred to as fs.s3n.awsAccessKeyId
in SparkContext's Hadoop configuration.SECRET_KEY
: This is referred to as fs.s3n.awsSecretAccessKey
in SparkContext's Hadoop configuration.ACCESS_KEY
in the config
notebook: sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<replace
with your key>")
SECRET_KEY
in the config
notebook: sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","
<replace with your secret key>")
sparkcookbook
bucket (all of the data for the recipes in this book are available in this bucket: val yelpdata =
spark.read.textFile("s3a://sparkcookbook/yelpdata")
DBFS is Databricks Cloud's internal file system. It is a layer above S3, as you can guess. It mounts S3 buckets in a user's workspace as well as caches frequently accessed data on worker nodes.
val accessKey = "<your access key>"
val secretKey = "<your secret key>".replace("/","%2F")
val bucket = "sparkcookbook"
mount
name in the Scala notebook: val mount = "cookbook"
dbutils.fs.mount(s"s3a://$accessKey:$secretKey@$bucket",
s"/mnt/$mount")
display(dbutils.fs.ls(s"/mnt/$mount"))
Let's look at the key concepts in Databricks Cloud.
The concept of clusters is self-evident. A cluster contains a master node and one or more slave nodes. These nodes are EC2 nodes, which we are going to learn more about in the next recipe.
Notebook is the most powerful feature of Databricks Cloud. You can write your code in Scala/Python/R or a simple SQL notebook. These notebooks cover the whole 9 yards. You can use notebooks to write code like a programmer, use SQL like an analyst, or do visualization like a Business Intelligence (BI) expert.
There is a reason why deploying Spark on Amazon EMR is added as one of the first recipes in this edition of the book. The majority of the production deployments of Spark happen on EMR (in fact, the majority, and increasingly so, big data deployments happen on EMR). If you understand this recipe well, you may skip the rest of the recipes in this chapter, unless you are doing an on-premises deployment.
Since this topic is of paramount importance in the current context, a lot more theory is being provided than what a typical cookbook would have. You can skip the theory section and directly go to the How to do it.. section, but I encourage you not to do so.
What EMR represents is far more than meets the eye. Most of the enterprise workloads are migrating to public clouds at an accelerated pace. Once migrated, these workloads get rearchitected to leverage cloud-based services as opposed to simply using it as Infrastructure as a Service (IaaS). EC2 is an IaaS compute service of AWS, while EMR is the leading Platform as a Service (PaaS) service of AWS, with more big data workloads running on EMR than the alternatives combined.
Hadoop's core feature is data locality, that is, taking compute to where the data is. AWS disrupts this concept by separating storage and compute. AWS has multiple storage options, including the following:
Amazon S3 is the cheapest and most reliable cloud storage available, and this makes it the first choice, unless there is a compelling reason not to do so. EMR also supports attaching elastic block storage (EBS) volumes to compute instances (EC2) in order to provide a lower latency option.
Which option to choose depends upon what type of cluster is being created. There are two types of clusters:
Services
and select/search for EMR: Create cluster
and select the last option in the Applications
option box:Create Cluster
and the cluster will start as follows:My Cluster
status will change to Waiting
, as shown in the following screenshot:io.playground
bucket:Let's look at the options shown in step 3:
EC2 instances are the most expensive part of a company's AWS bill. So, selecting the right instance type is the key through which you can optimize your bill. The following section is a quick overview of the different instance types. Instance types, both in the cloud and on premises, are defined by four factors:
To see a quick illustration of how these factors affect each other, visit http://youtube.com/infoobjects.
In the EC2 world, these factors have been modified slightly to vCPU. vCPU is a virtualized unit of:
Instance type families are defined by the ratio of these factors, especially vCPU to memory. In a given family, this ratio remains unchanged (T2 excluded). Different instance families serve different purposes, almost like different types of automobiles. In fact, we are going to use the automobile metaphor in this section to illustrate these families.
The T2 instance type is a gateway drug in the AWS world, the reason being it belongs to Free Tier. Developers who sign up for AWS get this instance type for up to a year. This tier has six subtypes:
Instance Type | vCPUs | CPU Credit/Hr | Memory (GiB) |
t2.micro | 1 | 6 | 1 |
t2.small | 1 | 12 | 2 |
t2.medium | 2 | 24 | 4 |
t2.large | 2 | 36 | 6 |
t2.xlarge | 4 | 54 | 6 |
t2.2xlarge | 8 | 81 | 32 |
M4 is the instance type you use when in doubt. Developers who sign up for AWS get this instance type for up to a year. This tier has six subtypes:
Instance Type | vCPUs | Memory (GiB) | Dedicated Bandwidth |
m4.large | 2 | 8 | 450 mbps |
m4.xlarge | 4 | 16 | 750 mbps |
m4.2xlarge | 8 | 32 | 1,000 mbps |
m4.4xlarge | 16 | 64 | 2,000 mbps |
m4.10xlarge | 40 | 160 | 4,000 mbps |
m4.16xlarge | 64 | 256 | 10,000 mbps |
This tier has five subtypes:
Instance Type | vCPUs | Memory (GiB) | Dedicated Bandwidth |
c4.large | 2 | 3.75 | 500 mbps |
c4.xlarge | 4 | 7.5 | 750 mbps |
c4.2xlarge | 8 | 15 | 1,000 mbps |
c4.4xlarge | 16 | 30 | 2,000 mbps |
c4.8xlarge | 36 | 60 | 4,000 mbps |
This tier has two subtypes:
Instance Type | vCPUs | Memory (GiB) | Dedicated Bandwidth |
x1.16xlarge | 2 | 8 | 450 mbps |
x1.32xlarge | 4 | 16 | 750 mbps |
This tier has six subtypes:
Instance Type | vCPUs | Memory (GiB) | Dedicated Bandwidth |
r4.large | 2 | 15.25 | 10 gbps |
r4.xlarge | 4 | 30.5 | 10 gbps |
r4.2xlarge | 8 | 61 | 10 gbps |
r4.4xlarge | 16 | 122 | 10 gbps |
r4.8xlarge | 32 | 244 | 10 gbps |
r4.16xlarge | 64 | 488 | 20 gbps |
This tier has three subtypes:
Instance Type | vCPUs | Memory (GiB) | GPUs | GPU Memory (GiB) |
p2.xlarge | 4 | 61 | 1 | 12 |
p2.8xlarge | 32 | 488 | 8 | 96 |
p2.16xlarge | 64 | 732 | 16 | 192 |
This tier has six subtypes:
Instance Type | vCPUs | Memory (GiB) | Storage (GB) |
i3.large | 2 | 15.25 | 475 NVMe SSD |
i3.xlarge | 4 | 30.5 | 950 NVMe SSD |
i3.2xlarge | 8 | 61 | 1,900 NVMe SSD |
i3.4xlarge | 16 | 122 | 2x1,900 NVMe SSD |
i3.8xlarge | 32 | 244 | 4x1,900 NVMe SSD |
i3.16xlarge | 64 | 488 | 8x1,900 NVMe SSD |
You can build Spark from the source code, or you can download precompiled binaries from http://spark.apache.org. For a standard use case, binaries are good enough, and this recipe will focus on installing Spark using binaries.
At the time of writing, Spark's current version is 2.1. Please check the latest version from Spark's download page at http://spark.apache.org/downloads.html. Binaries are developed with the most recent and stable version of Hadoop. To use a specific version of Hadoop, the recommended approach is that you build it from sources, which we will cover in the next recipe.
All the recipes in this book are developed using Ubuntu Linux, but they should work fine on any POSIX environment. Spark expects Java to be installed and the JAVA_HOME
environment variable set.
In Linux/Unix systems, there are certain standards for the location of files and directories, which we are going to follow in this book. The following is a quick cheat sheet:
Directory | Description |
| This stores essential command binaries |
| This is where host-specific system configurations are located |
| This is where add-on application software packages are located |
| This is where variable data is located |
| This stores the temporary files |
| This is where user home directories are located |
Here are the installation steps:
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar -zxf spark-2.1.0-bin-hadoop2.7.tgz
$ sudo mv spark-2.1.0-bin-hadoop2.7 spark
/etc
folder so that it can be turned into a symbolic link later:$ sudo mv spark/conf/* /etc/spark
/opt
. As the recipes in this book are tested on the infoobjects
sandbox, use infoobjects
as the directory name. Create the /opt/infoobjects
directory:$ sudo mkdir -p /opt/infoobjects
spark
directory to /opt/infoobjects
, as it's an add-on software package:$ sudo mv spark /opt/infoobjects/
spark
home directory, namely 0755 = user:read-write-execute group:read-execute world:read-execute
:$ sudo chmod -R 755 /opt/infoobjects/spark
spark
home directory:$ cd /opt/infoobjects/spark
$ sudo ln -s /etc/spark conf
PATH
in .bashrc
:$ echo "export PATH=$PATH:/opt/infoobjects/spark/bin" >> /home/hduser/.bashrc
log
directory in /var
:$ sudo mkdir -p /var/log/spark
hduser
the owner of Spark's log
directory:$ sudo chown -R hduser:hduser /var/log/spark
tmp
directory:$ mkdir /tmp/spark
$ cd /etc/spark $ echo "export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop" >> spark-env.sh $ echo "export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop" >> spark-env.sh $ echo "export SPARK_LOG_DIR=/var/log/spark" >> spark-env.sh $ echo "export SPARK_WORKER_DIR=/tmp/spark" >> spark-env.sh
spark
home directory to root
:$ sudo chown -R root:root /opt/infoobjects/spark
Installing Spark using binaries works fine in most cases. For advanced cases, such as the following (but not limited to), compiling from the source code is a better option:
The following are the prerequisites for this recipe to work:
The following are the steps to build the Spark source code with Maven:
MaxPermSize
of the heap:$ echo "export _JAVA_OPTIONS="-XX:MaxPermSize=1G"" >>
/home/hduser/.bashrc
$ wget https://github.com/apache/spark/archive/branch-2.1.zip
$ unzip branch-2.1.zip
spark
: $ mv spark-branch-2.1 spark
spark
directory:$ cd spark
$ mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -
DskipTests clean package
conf
folder to the etc
folder so that it can be turned into a symbolic link:$ sudo mv spark/conf /etc/
spark
directory to /opt
as it's an add-on software package:$ sudo mv spark /opt/infoobjects/spark
spark
home directory to root
:$ sudo chown -R root:root /opt/infoobjects/spark
spark
home directory, namely 0755 = user:rwx group:r-x world:r-x
:$ sudo chmod -R 755 /opt/infoobjects/spark
spark
home directory:$ cd /opt/infoobjects/spark
$ sudo ln -s /etc/spark conf
.bashrc
:$ echo "export PATH=$PATH:/opt/infoobjects/spark/bin" >>
/home/hduser/.bashrc
log
directory in /var
:$ sudo mkdir -p /var/log/spark
hduser
the owner of Spark's log
directory:$ sudo chown -R hduser:hduser /var/log/spark
tmp
directory:$ mkdir /tmp/spark
$ cd /etc/spark $ echo "export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop" >> spark-env.sh $ echo "export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop" >> spark-env.sh $ echo "export SPARK_LOG_DIR=/var/log/spark" >> spark-env.sh $ echo "export SPARK_WORKER_DIR=/tmp/spark" >> spark-env.sh
Amazon Elastic Compute Cloud (Amazon EC2) is a web service that provides resizable compute instances in the cloud. Amazon EC2 provides the following features:
EC2 provides different types of instances to meet all your compute needs, such as general-purpose instances, microinstances, memory-optimized instances, storage-optimized instances, and others. They also have a Free Tier of microinstances for trial purposes.
The spark-ec2
script comes bundled with Spark and makes it easy to launch, manage, and shut down clusters on Amazon EC2.
Before you start, do the following things: log in to the Amazon AWS account via http://aws.amazon.com.
Security Credentials
under your account name in the top-right corner.Access Keys
and Create New Access Key
:/home/hduser/kp
folder as spark-kp1.pem
).600
.access key ID
and secret access key
(replace the sample values with your own values):$ echo "export AWS_ACCESS_KEY_ID="AKIAOD7M2LOWATFXFKQ"" >> /home/hduser/.bashrc $ echo "export AWS_SECRET_ACCESS_KEY="+Xr4UroVYJxiLiY8DLT4DLT4D4sxc3ijZGMx1D3pfZ2q"" >> /home/hduser/.bashrc $ echo "export PATH=$PATH:/opt/infoobjects/spark/ec2" >> /home/hduser/.bashrc
$ cd /home/hduser $ spark-ec2 -k <key-pair> -i <key-file> -s <num-slaves> launch <cluster-name> <key-pair> - name of EC2 keypair created in AWS <key-file> the private key file you downloaded <num-slaves> number of slave nodes to launch <cluster-name> name of the cluster
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem --hadoop-major-
version 2 -s 3 launch spark-cluster
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -z us-east-1b --
hadoop-major-version 2 -s 3 launch spark-cluster
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem --hadoop-major-
version 2 -ebs-vol-size 10 -s 3 launch spark-cluster
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -spot-price=0.15
--hadoop-major-version 2 -s 3 launch spark-cluster
Spot instances allow you to name your own price for Amazon EC2's computing capacity. You simply bid for spare Amazon EC2 instances and run them whenever your bid exceeds the current spot price, which varies in real time and is based on supply and demand (source: www.amazon.com).
$ spark-ec2 -k spark-kp1 -i /home/hduser/kp/spark-kp1.pem login spark-cluster
Directory | Description |
| This is the Hadoop instance for which data is ephemeral and gets deleted when you stop or restart the machine. |
| Each node has a very small amount of persistent storage (approximately 3 GB). If you use this instance, data will be retained in that space. |
| This refers to the native libraries that support Hadoop, such as snappy compression libraries. |
| This refers to the Scala installation. |
| This refers to the Shark installation (Shark is no longer supported and is replaced by Spark SQL). |
| This refers to the Spark installation. |
| This refers to the files that support this cluster deployment. |
| This refers to the Tachyon installation. |
$ ephemeral-hdfs/bin/hadoop version Hadoop 2.0.0-chd4.2.0
$ persistent-hdfs/bin/hadoop version Hadoop 2.0.0-chd4.2.0
$ cd spark/conf
log4.properties
file by renaming the template:$ mv log4j.properties.template log4j.properties
log4j.properties
in vi or your favorite editor:$ vi log4j.properties
| log4j.rootCategory=INFO, console
to | log4j.rootCategory=ERROR, console
.$ spark-ec2/copydir spark/conf
$ spark-ec2 destroy spark-cluster
Compute resources in a distributed environment need to be managed so that resource utilization is efficient and every job gets a fair chance to run. Spark comes with its own cluster manager, which is conveniently called standalone mode. Spark also supports working with YARN and Mesos cluster managers.
The cluster manager you choose should be mostly driven by both legacy concerns and whether other frameworks, such as MapReduce, share the same compute resource pool. If your cluster has legacy MapReduce jobs running and all of them cannot be converted into Spark jobs, it is a good idea to use YARN as the cluster manager. Mesos is emerging as a data center operating system to conveniently manage jobs across frameworks, and it is very compatible with Spark.
If the Spark framework is the only framework in your cluster, then the standalone mode is good enough. As Spark is evolving as a technology, you will see more and more use cases of Spark being used as the standalone framework, serving all your big data compute needs. For example, some jobs may use Apache Mahout at present because MLlib
does not have a specific machine-learning library, which the job needs. As soon as MLlib
gets its library, this particular job can be moved to Spark.
Let's consider a cluster of six nodes as an example setup--one master and five slaves (replace them with the actual node names in your cluster):
Master m1.zettabytes.com Slaves s1.zettabytes.com s2.zettabytes.com s3.zettabytes.com s4.zettabytes.com s5.zettabytes.com
/opt/infoobjects/spark/sbin
in the path on every node:$ echo "export PATH=$PATH:/opt/infoobjects/spark/sbin" >> /home/hduser/.bashrc
hduser@m1.zettabytes.com~] start-master.sh
Master, by default, starts on port 7077
, which slaves use to connect to it. It also has a web UI at port 8088
.
hduser@s1.zettabytes.com~] spark-class org.apache.spark.deploy.worker.Worker
spark://m1.zettabytes.com:7077
Argument | Meaning |
| IP address/DNS service to listen on |
| Port for the service to listen on |
| This is the port for the web UI (by default, 8080 is for the master and 8081 for the worker) |
| These refer to the total CPU core Spark applications that can be used on a machine (worker only) |
| These refer to the total RAM Spark applications that can be used on a machine (worker only) |
| These refer to the directory to use for scratch space and job output logs |
For fine-grained configuration, the above parameters work with both master and slaves. Rather than manually starting master and slave daemons on each node, it can also be accomplished using cluster launch scripts. Cluster launch scripts are outside the scope of this book. Please refer to books about Chef or Puppet.
conf/slaves
file on a master node and add one line per slave hostname (using an example of five slave nodes, replace the following slave DNS with the DNS of the slave nodes in your cluster):hduser@m1.zettabytes.com~] echo "s1.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s2.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s3.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s4.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s5.zettabytes.com" >> conf/slaves
Once the slave machine is set up, you can call the following scripts to start/stop the cluster:
Script name | Purpose |
| Starts a master instance on the host machine |
| Starts a slave instance on each node of the slaves file |
| Starts both the master and slaves |
| Stops the master instance on the host machine |
| Stops the slave instance on all the nodes of the slaves file |
| Stops both the master and slaves |
Scala code
: val sparkContext = new SparkContext(new
SparkConf().setMaster("spark://m1.zettabytes.com:7077")Setting master URL for
spark-shell
Spark shell
:$ spark-shell --master spark://master:7077
In standalone mode, Spark follows the master-slave architecture, very much like Hadoop, MapReduce, and YARN. The compute master daemon is called Spark master and runs on one master node. Spark master can be made highly available using ZooKeeper. You can also add more standby masters on the fly if needed.
The compute slave daemon is called a worker, and it exists on each slave node. The worker daemon does the following:
There is, at most, one executor per application, per slave machine.
Both Spark master and the worker are very lightweight. Typically, memory allocation between 500 MB to 1 GB is sufficient. This value can be set in conf/spark-env.sh
by setting the SPARK_DAEMON_MEMORY
parameter. For example, the following configuration will set the memory to 1 gigabits for both the master and worker daemon. Make sure you have sudo
as the super user before running it:
$ echo "export SPARK_DAEMON_MEMORY=1g" >> /opt/infoobjects/spark/conf/spark-env.sh
By default, each slave node has one worker instance running on it. Sometimes, you may have a few machines that are more powerful than others. In that case, you can spawn more than one worker on that machine with the following configuration (only on those machines):
$ echo "export SPARK_WORKER_INSTANCES=2" >> /opt/infoobjects/spark/conf/spark-env.sh
The Spark worker, by default, uses all the cores on the slave machine for its executors. If you would like to limit the number of cores the worker could use, you can set it to the number of your choice (for example, 12), using the following configuration:
$ echo "export SPARK_WORKER_CORES=12" >> /opt/infoobjects/spark/conf/spark-env.sh
The Spark worker, by default, uses all of the available RAM (1 GB for executors). Note that you cannot allocate how much memory each specific executor will use (you can control this from the driver configuration). To assign another value to the total memory (for example, 24 GB) to be used by all the executors combined, execute the following setting:
$ echo "export SPARK_WORKER_MEMORY=24g" >> /opt/infoobjects/spark/conf/spark-env.sh
There are some settings you can do at the driver level:
spark.cores.max
configuration in Spark submit
or Spark shell
as follows:$ spark-submit --conf spark.cores.max=12
spark.executor.memory
configuration in Spark submit
or Spark shell
as follows:$ spark-submit --conf spark.executor.memory=8g
The following diagram depicts the high-level architecture of a Spark cluster:
To find more configuration options, refer to the following URL:
Mesos is slowly emerging as a data center operating system for managing all the compute resources across a data center. Mesos runs on any computer running the Linux operating system. It is built using the same principles as the Linux kernel. Let's see how we can install Mesos.
Mesosphere provides a binary distribution of Mesos. The most recent package of the Mesos
distribution can be installed from the Mesosphere repositories by performing the following steps:
$ sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF DISTRO=$(lsb_release -is | tr '[:upper:]' '[:lower:]') CODENAME=$(lsb_release -cs) $ sudo vi /etc/apt/sources.list.d/mesosphere.list deb http://repos.mesosphere.io/Ubuntu trusty main
$ sudo apt-get -y update
$ sudo apt-get -y install mesos
$ hdfs dfs -put spark-2.1.0-bin-hadoop2.7.tgz spark-2.1.0-bin-hadoop2.7.tgz
mesos://host:5050
; the master URL of a ZooKeeper-managed Mesos cluster is mesos://zk://host:2181
.spark-env.sh
:$ sudo vi spark-env.sh export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI= hdfs://localhost:9000/user/hduser/spark-2.1.0-bin- hadoop2.7.tgz
Val conf = new SparkConf().setMaster("mesos://host:5050") Val sparkContext = new SparkContext(conf)
$ spark-shell --master mesos://host:5050
Mesos has two run modes:
spark.mesos.coarse
property:Conf.set("spark.mesos.coarse","true")
Yet Another Resource Negotiator (YARN) is Hadoop's compute framework that runs on top of HDFS, which is Hadoop's storage layer.
YARN follows the master-slave architecture. The master daemon is called ResourceManager
and the slave daemon is called NodeManager
. Besides this application, life cycle management is done by ApplicationMaster
, which can be spawned on any slave node and would be alive during the lifetime of an application.
When Spark is run on YARN, ResourceManager
performs the role of the Spark master and NodeManagers
works as executor nodes.
While running Spark with YARN, each Spark executor is run as a YARN container.
Running Spark on YARN requires a binary distribution of Spark that has YARN support. In both the Spark installation recipes, we have taken care of this.
HADOOP_CONF_DIR: to write to HDFS YARN_CONF_DIR: to connect to YARN ResourceManager $ cd /opt/infoobjects/spark/conf (or /etc/spark) $ sudo vi spark-env.sh export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop
yarn-client
mode:$ spark-submit --class path.to.your.Class --master yarn --deploy-mode client
[options] <app jar> [app options]
Here's an example:
$ spark-submit --class com.infoobjects.TwitterFireHose --master yarn --deploy-
mode client --num-executors 3 --driver-memory 4g --executor-memory 2g --
executor-cores 1 target/sparkio.jar 10
Spark shell
in the yarn-client
mode:$ spark-shell --master yarn --deploy-mode client
yarn-cluster
mode is as follows:$ spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster
[options] <app jar> [app options]
Here's an example:
$ spark-submit --class com.infoobjects.TwitterFireHose --master yarn --deploy-
mode cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --
executor-cores 1 target/sparkio.jar 10
Spark applications on YARN run in two modes:
yarn-client
: Spark Driver runs in the client process outside of the YARN cluster, and ApplicationMaster
is only used to negotiate the resources from ResourceManager
.yarn-cluster
: Spark Driver runs in ApplicationMaster
, spawned by NodeManager
on a slave node.The yarn-cluster
mode is recommended for production deployments, while the yarn-client
mode is good for development and debugging, where you would like to see the immediate output. There is no need to specify the Spark master in either mode as it's picked from the Hadoop configuration, and the master parameter is either yarn-client
or yarn-cluster
.
The following figure shows how Spark is run with YARN in the client mode:
The following figure shows how Spark is run with YARN in the cluster mode:
In the YARN mode, the following configuration parameters can be set:
--num-executors
: To configure how many executors will be allocated--executor-memory
: RAM per executor--executor-cores
: CPU cores per executorSparkContext and SparkSession are the entry points into the world of Spark, so it is important you understand both well.
SparkContext is the first object that a Spark program must create to access the cluster. In spark-shell
, it is directly accessible via spark.sparkContext
. Here's how you can programmatically create SparkContext in your Scala code:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf val conf = new SparkConf().setAppName("my app").setMaster("master url") new SparkContext(conf)
SparkContext, though still supported, was more relevant in the case of RDD (covered in the next recipe). As you will see in the rest of the book, different libraries have different wrappers around SparkContext, for example, HiveContext/SQLContext for Spark SQL, StreamingContext for Streaming, and so on. As all the libraries are moving toward DataSet/DataFrame, it makes sense to have a unified entry point for all these libraries as well, and that is SparkSession
. SparkSession is available as spark
in the spark-shell
. Here's how you do it:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val sparkSession = SparkSession.builder.master("master url").appName("my app").getOrCreate()
Though RDD is getting replaced with DataFrame/DataSet-based APIs, there are still a lot of APIs that have not been migrated yet. In this recipe, we will look at how the concept of lineage works in RDD.
Externally, RDD is a distributed, immutable collection of objects. Internally, it consists of the following five parts:
rdd.getPartitions
)rdd.dependencies
)rdd.partitioner
)rdd.preferredLocations
)The first three are needed for an RDD to be recomputed in case data is lost. When combined, it is called lineage. The last two parts are optimizations.
A set of partitions is how data is divided into nodes. In the case of HDFS, it means InputSplits
, which are mostly the same as the block (except when a record crosses block boundaries; in that case, it will be slightly bigger than a block).
Let's revisit our word count example to understand these five parts. This is how an RDD graph looks for wordCount at the dataset level view:
Basically, this is how the flow goes:
words
folder as an RDD:scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
The following are the five parts of the words
RDD:
Part | Description |
Partitions | One partition per HDFS inputsplit/block ( |
Dependencies | None |
Compute function | To read the block |
Preferred location | The HDFS block's location |
Partitioner | None |
words
RDD with each word on a separate line:scala> val wordsFlatMap = words.flatMap(_.split("W+"))
The following are the five parts of the wordsFlatMap
RDD:
Part | Description |
Partitions | Same as the parent RDD, that is, |
Dependencies | Same as the parent RDD, that is, |
Compute function | To compute the parent and split each element, which flattens the results |
Preferred location | Ask parent RDD |
Partitioner | None |
wordsFlatMap
RDD into the (word,1) tuple:scala> val wordsMap = wordsFlatMap.map( w => (w,1))
The following are the five parts of the wordsMap
RDD:
Part | Description |
Partitions | Same as the parent RDD, that is, wordsFlatMap (org.apache.spark.rdd.HadoopPartition) |
Dependencies | Same as the parent RDD, that is, wordsFlatMap (org.apache.spark.OneToOneDependency) |
Compute function | To compute the parent and map it to PairRDD |
Preferred Location | Ask parent RDD |
Partitioner | None |
scala> val wordCount = wordsMap.reduceByKey(_+_)
The following are the five parts of the wordCount
RDD:
Part | Description |
Partitions | One per reduce task ( |
Dependencies | Shuffle dependency on each parent ( |
Compute function | To perform additions on shuffled data |
Preferred location | None |
Partitioner | HashPartitioner ( |
This is how an RDD graph of wordcount
looks at the partition level view:
Where there is an eBook version of a title available, you can buy it from the book details for that title. Add either the standalone eBook or the eBook and print book bundle to your shopping cart. Your eBook will show in your cart as a product on its own. After completing checkout and payment in the normal way, you will receive your receipt on the screen containing a link to a personalised PDF download file. This link will remain active for 30 days. You can download backup copies of the file by logging in to your account at any time.
If you already have Adobe reader installed, then clicking on the link will download and open the PDF file directly. If you don't, then save the PDF file on your machine and download the Reader to view it.
Please Note: Packt eBooks are non-returnable and non-refundable.
Packt eBook and Licensing When you buy an eBook from Packt Publishing, completing your purchase means you accept the terms of our licence agreement. Please read the full text of the agreement. In it we have tried to balance the need for the ebook to be usable for you the reader with our needs to protect the rights of us as Publishers and of our authors. In summary, the agreement says:
If you want to purchase a video course, eBook or Bundle (Print+eBook) please follow below steps:
Our eBooks are currently available in a variety of formats such as PDF and ePubs. In the future, this may well change with trends and development in technology, but please note that our PDFs are not Adobe eBook Reader format, which has greater restrictions on security.
You will need to use Adobe Reader v9 or later in order to read Packt's PDF eBooks.
Packt eBooks are a complete electronic version of the print edition, available in PDF and ePub formats. Every piece of content down to the page numbering is the same. Because we save the costs of printing and shipping the book to you, we are able to offer eBooks at a lower cost than print editions.
When you have purchased an eBook, simply login to your account and click on the link in Your Download Area. We recommend you saving the file to your hard drive before opening it.
For optimal viewing of our eBooks, we recommend you download and install the free Adobe Reader version 9.