Fast Data Processing with Spark

By Holden Karau
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Installing Spark and Setting Up Your Cluster

About this book

Spark is a framework for writing fast, distributed programs. Spark solves similar problems as Hadoop MapReduce does but with a fast in-memory approach and a clean functional style API. With its ability to integrate with Hadoop and inbuilt tools for interactive query analysis (Shark), large-scale graph processing and analysis (Bagel), and real-time analysis (Spark Streaming), it can be interactively used to quickly process and query big data sets.

Fast Data Processing with Spark covers how to write distributed map reduce style programs with Spark. The book will guide you through every step required to write effective distributed programs from setting up your cluster and interactively exploring the API, to deploying your job to the cluster, and tuning it for your purposes.

Fast Data Processing with Spark covers everything from setting up your Spark cluster in a variety of situations (stand-alone, EC2, and so on), to how to use the interactive shell to write distributed code interactively. From there, we move on to cover how to write and deploy distributed jobs in Java, Scala, and Python.

We then examine how to use the interactive shell to quickly prototype distributed programs and explore the Spark API. We also look at how to use Hive with Spark to use a SQL-like query syntax with Shark, as well as manipulating resilient distributed datasets (RDDs).

Publication date:
October 2013


Chapter 1. Installing Spark and Setting Up Your Cluster

This chapter will detail some common methods for setting up Spark. Spark on a single machine is excellent for testing, but you will also learn to use Spark's built-in deployment scripts to a dedicated cluster via SSH (Secure Shell). This chapter will also cover using Mesos, Yarn, Puppet, or Chef to deploy Spark. For cloud deployments of Spark, this chapter will look at EC2 (both traditional and EC2MR). Feel free to skip this chapter if you already have your local Spark instance installed and want to get straight to programming.

Regardless of how you are going to deploy Spark, you will want to get the latest version of Spark from (Version 0.7 as of this writing). For coders who live dangerously, try cloning the code directly from the repository git:// Both the source code and pre-built binaries are available. To interact with Hadoop Distributed File System (HDFS), you need to use a Spark version that is built against the same version of Hadoop as your cluster. For Version 0.7 of Spark, the pre-built package is built against Hadoop 1.0.4. If you are up for the challenge, it's recommended that you build against the source since it gives you the flexibility of choosing which HDFS version you want to support as well as apply patches. You will need the appropriate version of Scala installed and the matching JDK. For Version 0.7.1 of Spark, you require Scala 2.9.2 or a later 2.9 series release (2.9.3 works well). At the time of this writing, Ubuntu's LTS release (Precise) has Scala Version 2.9.1. Additionally, the current stable version has 2.9.2 and Fedora 18 has 2.9.2. Up-to-date package information can be found at The latest version of Scala is available from It is important to choose the version of Scala that matches the version requested by Spark, as Scala is a fast-evolving language.

The tarball file contains a bin directory that needs to be added to your path, and SCALA_HOME should be set to the path where the tarball file is extracted. Scala can be installed from source by running:

wget && tar -xvf scala-2.9.3.tgz && cd scala-2.9.3 && export PATH=`pwd`/bin:$PATH && export SCALA_HOME=`pwd`

You will probably want to add these to your .bashrc file or equivalent:

export PATH=`pwd`/bin:\$PATH
export SCALA_HOME=`pwd`

Spark is built with sbt (simple build tool, which is no longer very simple), and build times can be quite long when compiling Scala's source code. Don't worry if you don't have sbt installed; the build script will download the correct version for you.

On an admittedly under-powered core 2 laptop with an SSD, installing a fresh copy of Spark took about seven minutes. If you decide to build Version 0.7 from source, you would run:

wget && tar -xvf download-spark-0.7.0-sources-tgz && cd spark-0.7.0 && sbt/sbt package

If you are going to use a version of HDFS that doesn't match the default version for your Spark instance, you will need to edit project/SparkBuild.scala and set HADOOP_VERSION to the corresponding version and recompile it with:

sbt/sbt clean compile


The sbt tool has made great progress with dependency resolution, but it's still strongly recommended for developers to do a clean build rather than an incremental build. This still doesn't get it quite right all the time.

Once you have started the build it's probably a good time for a break, such as getting a cup of coffee. If you find it stuck on a single line that says "Resolving [XYZ]...." for a long time (say five minutes), stop it and restart the sbt/sbt package.

If you can live with the restrictions (such as the fixed HDFS version), using the pre-built binary will get you up and running far quicker. To run the pre-built version, use the following command:

wget && tar -xvf download-spark-0.7.0-prebuilt-tgz && cd spark-0.7.0


Spark has recently become a part of the Apache Incubator. As an application developer who uses Spark, the most visible changes will likely be the eventual renaming of the package to under the org.apache namespace.

Some of the useful links for references are as follows:


Running Spark on a single machine

A single machine is the simplest use case for Spark. It is also a great way to sanity check your build. In the Spark directory, there is a shell script called run that can be used to launch a Spark job. Run takes the name of a Spark class and some arguments. There is a collection of sample Spark jobs in ./examples/src/main/scala/spark/examples/.

All the sample programs take the parameter master, which can be the URL of a distributed cluster or local[N], where N is the number of threads. To run GroupByTest locally with four threads, try the following command:

./run spark.examples.GroupByTest local[4]

If you get an error, as SCALA_HOME is not set, make sure your SCALA_HOME is set correctly. In bash, you can do this using the export SCALA_HOME=[pathyouextractedscalato].

If you get the following error, it is likely you are using Scala 2.10, which is not supported by Spark 0.7:

[literal]"Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest"[/literal]

The Scala developers decided to rearrange some classes between 2.9 and 2.10 versions. You can either downgrade your version of Scala or see if the development build of Spark is ready to be built along with Scala 2.10.


Running Spark on EC2

There are many handy scripts to run Spark on EC2 in the ec2 directory. These scripts can be used to run multiple Spark clusters, and even run on-the-spot instances. Spark can also be run on Elastic MapReduce (EMR). This is Amazon's solution for MapReduce cluster management, which gives you more flexibility around scaling instances.

Running Spark on EC2 with the scripts

To get started, you should make sure that you have EC2 enabled on your account by signing up for it at It is a good idea to generate a separate access key pair for your Spark cluster, which you can do at You will also need to create an EC2 key pair, so that the Spark script can SSH to the launched machines; this can be done at by selecting Key Pairs under Network & Security. Remember that key pairs are created "per region", so you need to make sure you create your key pair in the same region as you intend to run your spark instances. Make sure to give it a name that you can remember (we will use spark-keypair in this chapter as its example key pair name) as you will need it for the scripts. You can also choose to upload your public SSH key instead of generating a new key. These are sensitive, so make sure that you keep them private. You also need to set your AWS_ACCESS_KEY and AWS_SECRET_KEY key pairs as environment variables for the Amazon EC2 scripts:

chmod 400 spark-keypair.pem
export AWS_ACCESS_KEY="..."
export AWS_SECRET_KEY="..."

You will find it useful to download the EC2 scripts provided by Amazon from Once you unzip the resulting ZIP file, you can add the bin folder to your PATH variable in a similar manner to what you did with the Spark bin folder:

cd ec2-api-tools-*
export EC2_HOME=`pwd`
export PATH=$PATH:`pwd`:/bin

The Spark EC2 script automatically creates a separate security group and firewall rules for the running Spark cluster. By default your Spark cluster will be universally accessible on port 8080, which is somewhat a poor form. Sadly, the script does not currently provide an easy way to restrict access to just your host. If you have a static IP address, I strongly recommend limiting the access in; simply replace all instances with [yourip]/32. This will not affect intra-cluster communication, as all machines within a security group can talk to one another by default.

Next, try to launch a cluster on EC2:

./ec2/spark-ec2 -k spark-keypair -i pk-[....].pem -s 1 launch myfirstcluster


If you get an error message such as "The requested Availability Zone is currently constrained and....", you can specify a different zone by passing in the --zone flag.

If you get an error about not being able to SSH to the master, make sure that only you have permission to read the private key, otherwise SSH will refuse to use it.

You may also encounter this error due to a race condition when the hosts report themselves as alive, but the Spark-ec2 script cannot yet SSH to them. There is a fix for this issue pending in For now a temporary workaround, until the fix is available in the version of Spark you are using, is to simply let the cluster sleep an extra 120 seconds at the start of setup_cluster.

If you do get a transient error when launching a cluster, you can finish the launch process using the resume feature by running:

./ec2/spark-ec2 -i ~/spark-keypair.pem launch myfirstsparkcluster --resume

If everything goes ok, you should see something like the following screenshot:

This will give you a bare-bones cluster with one master and one worker, with all the defaults on the default machine instance size. Next, verify that it has started up, and if your firewall rules were applied by going to the master on port 8080. You can see in the preceding screenshot that the name of the master is output at the end of the script.

Try running one of the example's jobs on your new cluster to make sure everything is ok:

[email protected]:~/repos/spark$ ssh -i ~/spark-keypair.pem [email protected]
Last login: Sun Apr  7 03:00:20 2013 from
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
There are 32 security update(s) out of 272 total update(s) available
Run "sudo yum update" to apply all updates.
Amazon Linux version 2013.03 is available.
[[email protected] ~]# ls
ephemeral-hdfs  hive-0.9.0-bin  mesos  mesos-ec2  persistent-hdfs  scala-2.9.2  shark-0.2  spark  spark-ec2
[[email protected] ~]# cd spark
[[email protected] spark]# ./run spark.examples.GroupByTest spark://`hostname`:7077
13/04/07 03:11:38 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/04/07 03:11:39 INFO storage.BlockManagerMaster: Registered BlockManagerMaster Actor
13/04/07 03:11:50 INFO spark.SparkContext: Job finished: count at GroupByTest.scala:35, took 1.100294766 s

Now that you've run a simple job on our EC2 cluster, it's time to configure your EC2 cluster for our Spark jobs. There are a number of options you can use to configure with the Spark-ec2 script.

First, consider what instance types you may need. EC2 offers an ever-growing collection of instance types, and you can choose a different instance type for the master and the workers. The instance type has the most obvious impact on the performance of your spark cluster. If your work needs a lot of RAM, you should choose an instance with more RAM. You can specify the instance type with --instance-type=(name of instance type). By default, the same instance type will be used for both the master and the workers. This can be wasteful if your computations are particularly intensive and the master isn't being heavily utilized. You can specify a different master instance type with --master-instance-type=(name of instance).

EC2 also has GPU instance types that can be useful for workers, but would be completely wasted on the master. This text will cover working with Spark and GPUs later on; however, it is important to note that EC2 GPU performance may be lower than what you get while testing locally, due to the higher I/O overhead imposed by the hypervisor.

Spark's EC2 scripts uses AMI (Amazon Machine Images) provided by the Spark team. These AMIs may not always be up-to-date with the latest version of Spark, and if you have custom patches (such as using a different version of HDFS) for Spark, they will not be included in the machine image. At present, the AMIs are also only available in the U.S. East region, so if you want to run it in a different region you will need to copy the AMIs or make your own AMIs in a different region.

To use Spark's EC2 scripts, you need to have an AMI available in your region. To copy the default Spark EC2 AMI to a new region, figure out what the latest Spark AMI is by looking at the script and seeing what URL the LATEST_AMI_URL points to and fetch it. For Spark 0.7, run the following command to get the latest AMI:


There is an ec2-copy-image script that you would hope provides the ability to copy the image, but sadly it doesn't work on images that you don't own. So you will need to launch an instance of the preceding AMI and snapshot it. You can describe the current image by running:

ec2-describe-images ami-a60193cf

This should show you that it is an EBS-based (Elastic Block Store) image, so you will need to follow EC2's instructions for creating EBS-based instances. Since you already have a script to launch the instance, you can just start an instance on an EC2 cluster and then snapshot it. You can find the instances you are running with:

ec2-describe-instances -H

You can copy the i-[string] instance name and save it for later use.

If you wanted to use a custom version of Spark or install any other tools or dependencies and have them available as part of our AMI, you should do that (or at least update the instance) before snapshotting.

ssh -i ~/spark-keypair.pem [email protected][hostname] "yum update"

Once you have your updates installed and any other customizations you want, you can go ahead and snapshot your instance with:

ec2-create-image -n "My customized Spark Instance" i-[instancename]

With the AMI name from the preceding code, you can launch your customized version of Spark by specifying the [cmd]--ami[/cmd] command-line argument. You can also copy this image to another region for use there:

ec2-copy-image -r [source-region] -s [ami] --region [target region]

This will give you a new AMI name, which you can use for launching your EC2 tasks. If you want to use a different AMI name, simply specify --ami [aminame].


As of this writing, there was an issue with the default AMI and HDFS. You may need to update the version of Hadoop on the AMI, as it does not match the version that Spark was compiled for. You can refer to for details.


Deploying Spark on Elastic MapReduce

In addition to Amazon's basic EC2 machine offering, Amazon offers a hosted MapReduce solution called Elastic MapReduce. Amazon provides a bootstrap script that simplifies the process of getting started using Spark on EMR. You can install the EMR tools from Amazon using the following command:

mkdir emr && cd emr && wget && unzip *.zip

So that the EMR scripts can access your AWS account, you will want to create a credentials.json file:

    "access-id": "<Your AWS access id here>",
    "private-key": "<Your AWS secret access key here>",
    "key-pair": "<The name of your ec2 key-pair here>",
    "key-pair-file": "<path to the .pem file for your ec2 key pair here>",
    "region": "<The region where you wish to launch your job flows (e.g us-east-1)>"

Once you have the EMR tools installed, you can launch a Spark cluster by running:

elastic-mapreduce --create --alive --name "Spark/Shark Cluster" \--bootstrap-action s3://elasticmapreduce/samples/spark/ \--bootstrap-name "install Mesos/Spark/Shark" \--ami-version 2.0  \--instance-type m1.large --instance-count 2

This will give you a running EC2MR instance after about five to ten minutes. You can list the status of the cluster by running elastic-mapreduce --list. Once it outputs j-[jobid], it is ready.


Deploying Spark with Chef (opscode)

Chef is an open source automation platform that has become increasingly popular for deploying and managing both small and large clusters of machines. Chef can be used to control a traditional static fleet of machines, but can also be used with EC2 and other cloud providers. Chef uses cookbooks as the basic building blocks of configuration and can either be generic or site specific. If you have not used Chef before, a good tutorial for getting started with Chef can be found at You can use a generic Spark cookbook as the basis for setting up your cluster.

To get Spark working, you need to create a role for both the master and the workers, as well as configure the workers to connect to the master. Start by getting the cookbook from The bare minimum is setting the master hostname as master (so the worker nodes can connect) and the username so that Chef can install in the correct place. You will also need to either accept Sun's Java license or switch to an alternative JDK. Most of the settings that are available in are also exposed through the cookbook's settings. You can see an explanation of the settings on configuring multiple hosts over SSH in the Set of machines over SSH section. The settings can be set per-role or you can modify the global defaults:

To create a role for the master with knife role, create spark_master_role -e [editor]. This will bring up a template role file that you can edit. For a simple master, set it to:

  "name": "spark_master_role",

  "description": "",
  "json_class": "Chef::Role",

  "default_attributes": {
  "override_attributes": {
  "chef_type": "role",
  "run_list": [
  "env_run_lists": {

Then create a role for the client in the same manner except instead of spark::server, use the spark::client recipe. Deploy the roles to the different hosts:

knife node run_list add master role[spark_master_role]
knife node run_list add worker role[spark_worker_role]

Then run chef-client on your nodes to update. Congrats, you now have a Spark cluster running!


Deploying Spark on Mesos

Mesos is a cluster management platform for running multiple distributed applications or frameworks on a cluster. Mesos can intelligently schedule and run Spark, Hadoop, and other frameworks concurrently on the same cluster. Spark can be run on Mesos either by scheduling individual jobs as separate Mesos tasks or running all of Spark as a single Mesos task. Mesos can quickly scale up to handle large clusters, beyond the size of which you would want to manage, with plain old SSH scripts. It was originally created at UC Berkley as a research project; it is currently undergoing Apache incubation and is actively used by Twitter.

To get started with Mesos, you can download the latest version from and unpack the ZIP files. Mesos has a number of different configuration scripts you can use; for an Ubuntu installation use configure.ubuntu-lucid-64, and for other cases the Mesos README file will point you at which configuration file to use. In addition to the requirements of Spark, you will need to ensure that you have the Python C header files installed (python-dev on Debian systems) or pass --disable-python to the configured script. Since Mesos needs to be installed on all the machines, you may find it easier to configure Mesos to install somewhere other than the root, most easily alongside your Spark installation as follows:

./configure --prefix=/home/sparkuser/mesos && make && make check && make install

Much like with the configuration of Spark in standalone mode with Mesos, you need to make sure the different Mesos nodes can find one another. Start with adding mesossprefix/var/mesos/deploy/masters to the hostname of the master, and then adding each worker hostname to mesossprefix/var/mesos/deploy/slaves. Then you will want to point the workers at the master (and possibly set some other values) in mesossprefix/var/mesos/conf/mesos.conf.

Once you have Mesos built, it's time to configure Spark to work with Mesos. This is as simple as copying the conf/ to conf/, and updating MESOS_NATIVE_LIBRARY to point to the path where Mesos is installed. You can find more information about the different settings in in the table shown in the next section.

You will need to install both Mesos on Spark on all the machines in your cluster. Once both Mesos and Spark are configured, you can copy the build to all the machines using pscp as shown in the following command:

pscp -v -r -h  -l sparkuser ./mesos /home/sparkuser/mesos

You can then start your Mesos clusters by using mesosprefix/sbin/, and schedule your Spark on Mesos by using mesos://[host]:5050 as the master.


Deploying Spark on YARN

YARN is Apache Hadoop's NextGen MapReduce. The Spark project provides an easy way to schedule jobs on YARN once you have a Spark assembly built. It is important that the Spark job you create uses a standalone master URL. The example Spark applications all read the master URL from the command-line arguments, so specify --args standalone.

To run the same example as in the SSH section, do the following:

sbt/sbt assembly #Build the assembly
SPARK_JAR=./core/target/spark-core-assembly-0.7.0.jar ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.2/spark-examples_2.9.2-0.7.0.jar --class spark.examples.GroupByTest --args standalone --num-workers 2 --worker-memory 1g --worker-cores 1

Deploying set of machines over SSH

If you have a set of machines without any existing cluster management software, you can deploy Spark over SSH with some handy scripts. This method is known as "standalone mode" in the Spark documentation. An individual master and worker can be started by ./run spark.deploy.master.Master and ./run spark.deploy.worker.Worker spark://MASTERIP:PORT respectively. The default port for the master is 8080. It's likely that you don't want to go to each of your machines and run these commands by hand; there are a number of helper scripts in bin/ to help you run your servers.

A prerequisite for using any of the scripts is having a password-less SSH access setup from the master to all the worker machines. You probably want to create a new user for running Spark on the machines and lock it down. This book uses the username sparkuser. On your master machine, you can run ssh-keygen to generate the SSH key and make sure that you do not set a password. Once you have generated the key, add the public one (if you generated an RSA key it would be stored in ~/.ssh/ by default) to ~/.ssh/authorized_keys2 on each of the hosts.


The Spark administration scripts require that your username matches. If this isn't the case, you can configure an alternative username in your ~/.ssh/config.

Now that you have SSH access to the machines set up, it is time to configure Spark. There is a simple template in [filepath]conf/[/filepath] that you should copy to [filepath]conf/[/filepath]. You will need to set the SCALA_HOME variable to the path where you extracted Scala to. You may also find it useful to set some (or all) of the following environment variables:





Point to match where Mesos is located



Point to where you extracted Scala

None, must be set


The IP address for the master to listen on and the IP address for the workers to connect to port #

The result of running hostname


The port # for the Spark master to listen on



The port # of the web UI on the master



The number of cores to use

All of them


The amount of memory to use

Max of system memory - (minus) 1 GB (or 512 MB)


The port # on which the worker runs on



The port # on which the worker web UI runs on



The location where to store files from the worker


Once you have your configuration all done, it's time to get your cluster up and running. You will want to copy the version of Spark and the configurations you have built to all of your machines. You may find it useful to install PSSH, a set of parallel SSH tools including PCSP. The PSCP application makes it easy to SCP (securely copy files) to a number of target hosts, although it will take a while, such as:

pscp -v -r -h conf/slaves -l sparkuser ../spark-0.7.0 ~/

If you end up changing the configuration, you need to distribute the configuration to all the workers, such as:

pscp -v -r -h conf/slaves -l sparkuser conf/ ~/spark-0.7.0/conf/


If you use a shared NFS on your cluster—although by default Spark names logfiles and similar with the shared names—you should configure a separate worker directory otherwise they will be configured to write to the same place. If you want to have your worker directories on the shared NFS, consider adding `hostname`, for example, SPARK_WORKER_DIR=~/work-`hostname`.

You should also consider having your logfiles go to a scratch directory for better performance.

If you don't have Scala installed on the remote machines yet, you can also use pssh to set it up:

pssh -P -i -h conf/slaves -l sparkuser "wget && tar -xvf scala-2.9.3.tgz && cd scala-2.9.3 && export PATH=$PATH:`pwd`/bin && export SCALA_HOME=`pwd` && echo \"export PATH=`pwd`/bin:\\\\$PATH && export SCALA_HOME=`pwd`\" >> ~/.bashrc"

Now you are ready to start the cluster. It is important to note that start-all and start-master both assume they are being run on the node, which is the master for the cluster. The start scripts all daemonize, so you don't have to worry about running them in a screen.

ssh master bin/

If you get a class not found error, such as java.lang.NoClassDefFoundError: scala/ScalaObject, check to make sure that you have Scala installed on that worker host and that the SCALA_HOME is set correctly.


The Spark scripts assume that your master has Spark installed as the same directory as your workers. If this is not the case, you should edit bin/ and set it to the appropriate directories.

The commands provided by Spark to help you administer your cluster are in the following table:



bin/ <command>

Runs the provided command on all the worker hosts. For example, bin/ uptime will show how long each of the worker hosts have been up.


Starts the master and all the worker hosts. It must be run on the master.


Starts the master host. Must be run on the master.


Starts the worker hosts.


Start a specific worker.


Stops master and workers.


Stops the master.


Stops all the workers.

You now have a running Spark cluster, as shown in the following screenshot. There is a handy web UI on the master running on port 8080; you should visit and switch on all the workers on port 8081. The web UI contains such helpful information as the current workers, and current and past jobs.

Now that you have a cluster up and running let's actually do something with it. As with the single host example, you can use the provided run script to run Spark commands. All the examples listed in examples/src/main/scala/spark/examples/ take a parameter, master, which points them to the master machine. Assuming you are on the master host you could run them like this:

./run spark.examples.GroupByTest spark://`hostname`:7077


If you run into an issue with java.lang.UnsupportedClassVersionError, you may need to update your JDK or recompile Spark if you grabbed the binary version. Version 0.7 was compiled with JDK 1.7 as the target. You can check the version of the JRE targeted by Spark with:

java -verbose -classpath ./core/target/scala-2.9.2/classes/
spark.SparkFiles | head -n 20

Version 49 is JDK1.5, Version 50 is JDK1.6, and Version 60 is JDK1.7.

If you can't connect to the localhost, make sure that you've configured your master to listen to all the IP addresses (or if you don't want to replace the localhost with the IP address configured to listen too).

If everything has worked correctly, you will see a lot of log messages output to stdout something along the lines of:

13/03/28 06:35:31 INFO spark.SparkContext: Job finished: count at GroupByTest.scala:35, took 2.482816756 s


In this chapter, we have installed Spark on our machine for local development and also set up on our cluster, so we are ready to run the applications that we write. In the next chapter, we will learn to use the Spark shell.

About the Author

  • Holden Karau

    Holden Karau is a software development engineer and is active in the open source. She has worked on a variety of search, classification, and distributed systems problems at IBM, Alpine, Databricks, Google, Foursquare, and Amazon. She graduated from the University of Waterloo with a bachelor's of mathematics degree in computer science. Other than software, she enjoys playing with fire and hula hoops, and welding.

    Browse publications by this author
Book Title
Unlock this book and the full library for FREE
Start free trial