Home Data Machine Learning with Spark - Second Edition

Machine Learning with Spark - Second Edition

By Rajdeep Dua , Manpreet Singh Ghotra
books-svg-icon Book
eBook $43.99 $29.99
Print $54.99
Subscription $15.99 $10 p/m for three months
$10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
BUY NOW $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
eBook $43.99 $29.99
Print $54.99
Subscription $15.99 $10 p/m for three months
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
  1. Free Chapter
    Getting Up and Running with Spark
About this book
This book will teach you about popular machine learning algorithms and their implementation. You will learn how various machine learning concepts are implemented in the context of Spark ML. You will start by installing Spark in a single and multinode cluster. Next you'll see how to execute Scala and Python based programs for Spark ML. Then we will take a few datasets and go deeper into clustering, classification, and regression. Toward the end, we will also cover text processing using Spark ML. Once you have learned the concepts, they can be applied to implement algorithms in either green-field implementations or to migrate existing systems to this new platform. You can migrate from Mahout or Scikit to use Spark ML. By the end of this book, you will acquire the skills to leverage Spark's features to create your own scalable machine learning applications and power a modern data-driven business.
Publication date:
April 2017
Publisher
Packt
Pages
532
ISBN
9781785889936

 

Getting Up and Running with Spark

Apache Spark is a framework for distributed computing; this framework aims to make it simpler to write programs that run in parallel across many nodes in a cluster of computers or virtual machines. It tries to abstract the tasks of resource scheduling, job submission, execution, tracking, and communication between nodes as well as the low-level operations that are inherent in parallel data processing. It also provides a higher level API to work with distributed data. In this way, it is similar to other distributed processing frameworks such as Apache Hadoop; however, the underlying architecture is somewhat different.

Spark began as a research project at the AMP lab in University of California, Berkeley (https://amplab.cs.berkeley.edu/projects/spark-lightning-fast-cluster-computing/). The university was focused on the use case of distributed machine learning algorithms. Hence, it is designed from the ground up for high performance in applications of an iterative nature, where the same data is accessed multiple times. This performance is achieved primarily through caching datasets in memory combined with low latency and overhead to launch parallel computation tasks. Together with other features such as fault tolerance, flexible distributed-memory data structures, and a powerful functional API, Spark has proved to be broadly useful for a wide range of large-scale data processing tasks, over and above machine learning and iterative analytics.

Performance wise, Spark is much faster than Hadoop for related workloads. Refer to the following graph:

Source: https://amplab.cs.berkeley.edu/wp-content/uploads/2011/11/spark-lr.png

Spark runs in four modes:

  • The standalone local mode, where all Spark processes are run within the same Java Virtual Machine (JVM) process
  • The standalone cluster mode, using Spark's own built-in, job-scheduling framework
  • Using Mesos, a popular open source cluster-computing framework
  • Using YARN (commonly referred to as NextGen MapReduce), Hadoop

In this chapter, we will do the following:

  • Download the Spark binaries and set up a development environment that runs in Spark's standalone local mode. This environment will be used throughout the book to run the example code.
  • Explore Spark's programming model and API using Spark's interactive console.
  • Write our first Spark program in Scala, Java, R, and Python.
  • Set up a Spark cluster using Amazon's Elastic Cloud Compute (EC2) platform, which can be used for large-sized data and heavier computational requirements, rather than running in the local mode.
  • Set up a Spark Cluster using Amazon Elastic Map Reduce

If you have previous experience in setting up Spark and are familiar with the basics of writing a Spark program, feel free to skip this chapter.

 

Installing and setting up Spark locally

Spark can be run using the built-in standalone cluster scheduler in the local mode. This means that all the Spark processes are run within the same JVM-effectively, a single, multithreaded instance of Spark. The local mode is very used for prototyping, development, debugging, and testing. However, this mode can also be useful in real-world scenarios to perform parallel computation across multiple cores on a single computer.

As Spark's local mode is fully compatible with the cluster mode; programs written and tested locally can be run on a cluster with just a few additional steps.

The first step in setting up Spark locally is to download the latest version http://spark.apache.org/downloads.html, which contains links to download various versions of Spark as well as to obtain the latest source code via GitHub.

The documents/docs available at http://spark.apache.org/docs/latest/ are a comprehensive resource to learn more about Spark. We highly recommend that you explore it!

Spark needs to be built against a specific version of Hadoop in order to access Hadoop Distributed File System (HDFS) as well as standard and custom Hadoop input sources Cloudera's Hadoop Distribution, MapR's Hadoop distribution, and Hadoop 2 (YARN). Unless you wish to build Spark against a specific Hadoop version, we recommend that you download the prebuilt Hadoop 2.7 package from an Apache mirror from http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz.

Spark requires the Scala programming language (version 2.10.x or 2.11.x at the time of writing this book) in order to run. Fortunately, the prebuilt binary package comes with the Scala runtime packages included, so you don't need to install Scala separately in order to get started. However, you will need to have a Java Runtime Environment (JRE) or Java Development Kit (JDK).

Refer to the software and hardware list in this book's code bundle for installation instructions. R 3.1+ is needed.

Once you have downloaded the Spark binary package, unpack the contents of the package and change it to the newly created directory by running the following commands:

  $ tar xfvz spark-2.0.0-bin-hadoop2.7.tgz
$ cd spark-2.0.0-bin-hadoop2.7

Spark places user scripts to run Spark in the bin directory. You can test whether everything is working correctly by running one of the example programs included in Spark. Run the following command:

  $ bin/run-example SparkPi 100

This will run the example in Spark's local standalone mode. In this mode, all the Spark processes are run within the same JVM, and Spark uses multiple threads for parallel processing. By default, the preceding example uses a number of threads equal to the number of cores available on your system. Once the program is executed, you should see something similar to the following lines toward the end of the output:

...
16/11/24 14:41:58 INFO Executor: Finished task 99.0 in stage 0.0
(TID 99). 872 bytes result sent to driver

16/11/24 14:41:58 INFO TaskSetManager: Finished task 99.0 in stage
0.0 (TID 99) in 59 ms on localhost (100/100)

16/11/24 14:41:58 INFO DAGScheduler: ResultStage 0 (reduce at
SparkPi.scala:38) finished in 1.988 s

16/11/24 14:41:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool

16/11/24 14:41:58 INFO DAGScheduler: Job 0 finished: reduce at
SparkPi.scala:38, took 2.235920 s

Pi is roughly 3.1409527140952713

The preceding command calls class org.apache.spark.examples.SparkPi class.

This class takes parameter in the local[N] form, where N is the number of threads to use. For example, to use only two threads, run the following command instead:N is the number of threads to use. Giving local[*] will use all of the cores on the local machine--that is a common usage.

To use only two threads, run the following command instead:

  $ ./bin/spark-submit  --class org.apache.spark.examples.SparkPi 
--master local[2] ./examples/jars/spark-examples_2.11-2.0.0.jar 100
 

Spark clusters

A Spark cluster is made up of two types of processes: a driver program and multiple executors. In the local mode, all these processes are run within the same JVM. In a cluster, these processes are usually run on separate nodes.

For example, a typical cluster that runs in Spark's standalone mode (that is, using Spark's built-in cluster management modules) will have the following:

  • A master node that runs the Spark standalone master process as well as the driver program
  • A number of worker nodes, each running an executor process

While we will be using Spark's local standalone mode throughout this book to illustrate concepts and examples, the same Spark code that we write can be run on a Spark cluster. In the preceding example, if we run the code on a Spark standalone cluster, we could simply pass in the URL for the master node, as follows:

  $ MASTER=spark://IP:PORT --class org.apache.spark.examples.SparkPi 
./examples/jars/spark-examples_2.11-2.0.0.jar 100

Here, IP is the IP address and PORT is the port of the Spark master. This tells Spark to run the program on the cluster where the Spark master process is running.

A full treatment of Spark's cluster management and deployment is beyond the scope of this book. However, we will briefly teach you how to set up and use an Amazon EC2 cluster later in this chapter.

For an overview of the Spark cluster-application deployment, take a look at the following links:

 

The Spark programming model

Before we delve into a high-level overview of Spark's design, we will introduce the SparkContext object as well as the Spark shell, which we will use to interactively explore the basics of the Spark programming model.

While this section provides a brief overview and examples of using Spark, we recommend that you read the following documentation to get a detailed understanding:

Refer to the following URLs:

SparkContext and SparkConf

The starting point of writing any Spark program is SparkContext (or JavaSparkContext in Java). SparkContext is initialized with an instance of a SparkConf object, which contains various Spark cluster-configuration settings (for example, the URL of the master node).

It is a main entry point for Spark functionality. A SparkContext is a connection to a Spark cluster. It can be used to create RDDs, accumulators, and broadcast variables on the cluster.

Only one SparkContext is active per JVM. You must call stop(), which is the active SparkContext, before creating a new one.

Once initialized, we will use the various methods found in the SparkContext object to create and manipulate distributed datasets and shared variables. The Spark shell (in both Scala and Python, which is unfortunately not supported in Java) takes care of this context initialization for us, but the following lines of code show an example of creating a context running in the local mode in Scala:

val conf = new SparkConf() 
.setAppName("Test Spark App")
.setMaster("local[4]")
val sc = new SparkContext(conf)

This creates a context running in the local mode with four threads, with the name of the application set to Test Spark App. If we wish to use the default configuration values, we could also call the following simple constructor for our SparkContext object, which works in the exact same way:

val sc = new SparkContext("local[4]", "Test Spark App")
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book from any other source, you can visithttp://www.packtpub.com/support and register to have the files e-mailed directly to you.

SparkSession

SparkSession allows programming with the DataFrame and Dataset APIs. It is a single point of entry for these APIs.

First, we need to create an instance of the SparkConf class and use it to create the SparkSession instance. Consider the following example:

val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp")
val spark = SparkSession
.builder()
.appName("SparkUserData").config(spConfig)
.getOrCreate()

Next we can use spark object to create a DataFrame:

val user_df = spark.read.format("com.databricks.spark.csv")
.option("delimiter", "|").schema(customSchema)
.load("/home/ubuntu/work/ml-resources/spark-ml/data/ml-100k/u.user")
val first = user_df.first()

The Spark shell

Spark supports writing programs interactively using the Scala, Python, or R REPL (that is, the Read-Eval-Print-Loop, or interactive shell). The shell provides instant feedback as we enter code, as this code is immediately evaluated. In the Scala shell, the return result and type is also displayed after a piece of code is run.

To use the Spark shell with Scala, simply run ./bin/spark-shell from the Spark base directory. This will launch the Scala shell and initialize SparkContext, which is available to us as the Scala value, sc. With Spark 2.0, a SparkSession instance in the form of Spark variable is available in the console as well.

Your console output should look similar to the following:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:14:25 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:14:25 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.180 instead (on
interface eth1)

16/08/06 22:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:14:26 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.

16/08/06 22:14:27 WARN SparkContext: Use an existing SparkContext,
some configuration may not take effect.

Spark context Web UI available at http://192.168.22.180:4041
Spark context available as 'sc' (master = local[*], app id = local-
1470546866779).

Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / ______/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.0.0
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM,
Java 1.7.0_60)

Type in expressions to have them evaluated.
Type :help for more information.

scala>

To use the Python shell with Spark, simply run the ./bin/pyspark command. Like the Scala shell, the Python SparkContext object should be available as the Python variable, sc. Your output should be similar to this:

~/work/spark-2.0.0-bin-hadoop2.7/bin/pyspark 
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more
information.

Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:16:15 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:16:15 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.180 instead (on
interface eth1)

16/08/06 22:16:15 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:16:16 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.

Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / ______/ __/ '_/
/__ / .__/_,_/_/ /_/_ version 2.0.0
/_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.
>>>

R is a language and has a runtime environment for statistical computing and graphics. It is a GNU project. R is a different implementation of S (a language developed by Bell Labs).

R provides statistical (linear and nonlinear modeling, classical statistical tests, time-series analysis, classification, and clustering) and graphical techniques. It is considered to be highly extensible.

To use Spark using R, run the following command to open Spark-R shell:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/sparkR
R version 3.0.2 (2013-09-25) -- "Frisbee Sailing"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Launching java with spark-submit command /home/ubuntu/work/spark-
2.0.0-bin-hadoop2.7/bin/spark-submit "sparkr-shell"
/tmp/RtmppzWD8S/backend_porta6366144af4f

Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:26:22 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:26:22 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.186 instead (on
interface eth1)

16/08/06 22:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:26:22 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.


Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ ____/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.0.0
/_/
SparkSession available as 'spark'.
During startup - Warning message:
package 'SparkR' was built under R version 3.1.1
>

Resilient Distributed Datasets

The core of Spark is a concept called the Resilient Distributed Dataset (RDD). An RDD is a collection of records (strictly speaking, objects of some type) that are distributed or partitioned across many nodes in a cluster (for the purposes of the Spark local mode, the single multithreaded process can be thought of in the same way). An RDD in Spark is fault-tolerant; this means that if a given node or task fails (for some reason other than erroneous user code, such as hardware failure, loss of communication, and so on), the RDD can be reconstructed automatically on the remaining nodes and the job will still be completed.

Creating RDDs

RDDs can be Scala Spark shells that you launched earlier:

val collection = List("a", "b", "c", "d", "e") 
val rddFromCollection = sc.parallelize(collection)

RDDs can also be created from Hadoop-based input sources, including the local filesystem, HDFS, and Amazon S3. A Hadoop-based RDD can utilize any input format that implements the Hadoop InputFormat interface, including text files, other standard Hadoop formats, HBase, Cassandra, tachyon, and many more.

The following code is an example of creating an RDD from a text file located on the local filesystem:

val rddFromTextFile = sc.textFile("LICENSE")

The preceding textFile method returns an RDD where each record is a String object that represents one line of the text file. The output of the preceding command is as follows:

rddFromTextFile: org.apache.spark.rdd.RDD[String] = LICENSE   
MapPartitionsRDD[1] at textFile at <console>:24

The following code is an example of how to create an RDD from a text file located on the HDFS using hdfs:// protocol:

val rddFromTextFileHDFS = sc.textFile("hdfs://input/LICENSE ")

The following code is an example of how to create an RDD from a text file located on the Amazon S3 using s3n:// protocol:

val rddFromTextFileS3 = sc.textFile("s3n://input/LICENSE ")

Spark operations

Once we have created an RDD, we have a distributed collection of records that we can manipulate. In Spark's programming model, operations are split into transformations and actions. Generally speaking, a transformation operation applies some function to all the records in the dataset, changing the records in some way. An action typically runs some computation or aggregation operation and returns the result to the driver program where SparkContext is running.

Spark operations are functional in style. For programmers familiar with functional programming in Scala, Python, or Lambda expressions in Java 8, these operations should seem natural. For those without experience in functional programming, don't worry; the Spark API is relatively easy to learn.

One of the most common transformations that you will use in Spark programs is the map operator. This applies a function to each record of an RDD, thus mapping the input to some new output. For example, the following code fragment takes the RDD we created from a local text file and applies the size function to each record in the RDD. Remember that we created an RDD of Strings. Using map, we can transform each string to an integer, thus returning an RDD of Ints:

val intsFromStringsRDD = rddFromTextFile.map(line => line.size)

You should see output similar to the following line in your shell; this indicates the type of the RDD:

intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = 
MapPartitionsRDD[2] at map at <console>:26

In the preceding code, we saw the use of the => syntax. This is the Scala syntax for an anonymous function, which is a function that is not a named method (that is, one defined using the def keyword in Scala or Python, for example).

While a detailed treatment of anonymous functions is beyond the scope of this book, they are used extensively in Spark code in Scala and Python, as well as in Java 8 (both in examples and real-world applications), so it is useful to cover a few practicalities.
The line => line.size syntax means that we are applying a function where => is the operator, and the output is the result of the code to the right of the => operator. In this case, the input is line, and the output is the result of calling line.size. In Scala, this function that maps a string to an integer is expressed as String => Int.
This syntax saves us from having to separately define functions every time we use methods such as map; this is useful when the function is simple and will only be used once, as in this example.

Now, we can apply a common action operation, count, to return the number of records in our RDD:

intsFromStringsRDD.count

The result should look something like the following console output:

res0: Long = 299

Perhaps we want to find the average length of each line in this text file. We can first use the sum function to add up all the lengths of all the records and then divide the sum by the number of records:

val sumOfRecords = intsFromStringsRDD.sum 
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords / numRecords

The result will be as follows:

scala> intsFromStringsRDD.count
res0: Long = 299

scala> val sumOfRecords = intsFromStringsRDD.sum
sumOfRecords: Double = 17512.0

scala> val numRecords = intsFromStringsRDD.count
numRecords: Long = 299

scala> val aveLengthOfRecord = sumOfRecords / numRecords
aveLengthOfRecord: Double = 58.5685618729097

Spark operations, in most cases, return a new RDD, with the exception of most actions, which return the result of a computation (such as Long for count and Double for sum in the preceding example). This means that we can naturally chain together operations to make our program flow more concise and expressive. For example, the same result as the one in the preceding line of code can be achieved using the following code:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

An important point to note is that Spark transformations are lazy. That is, invoking a transformation on an RDD does not immediately trigger a computation. Instead, transformations are chained together and are effectively only computed when an action is called. This allows Spark to be more efficient by only returning results to the driver when necessary so that the majority of operations are performed in parallel on the cluster.

This means that if your Spark program never uses an action operation, it will never trigger an actual computation, and you will not get any results. For example, the following code will simply return a new RDD that represents the chain of transformations:

val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)

This returns the following result in the console:

transformedRDD: org.apache.spark.rdd.RDD[Int] = 
MapPartitionsRDD[6] at map at <console>:26

Notice that no actual computation happens and no result is returned. If we now call an action, such as sum, on the resulting RDD, the computation will be triggered:

val computation = transformedRDD.sum

You will now see that a Spark job is run, and it results in the following console output:

computation: Double = 35006.0
The complete list of transformations and actions possible on RDDs, as well as a set of more detailed examples, are available in the Spark programming guide (located at http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations), and the API documentation (the Scala API documentation) is located at (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD).

Caching RDDs

One of the most powerful features of Spark is the ability to cache data in memory across a cluster. This is achieved through the use of the cache method on an RDD:

rddFromTextFile.cache
res0: rddFromTextFile.type = MapPartitionsRDD[1] at textFile at
<console>:27

Calling cache on an RDD tells Spark that the RDD should be kept in memory. The first time an action is called on the RDD that initiates a computation, the data is read from its source and put into memory. Hence, the first time such an operation is called, the time it takes to run the task is partly dependent on the time it takes to read the data from the input source. However, when the data is accessed the next time (for example, in subsequent queries in analytics or iterations in a machine learning model), the data can be read directly from memory, thus avoiding expensive I/O operations and speeding up the computation, in many cases, by a significant factor.

If we now call the count or sum function on our cached RDD, the RDD is loaded into memory:

val aveLengthOfRecordChained = rddFromTextFile.map(line => 
line.size).sum / rddFromTextFile.count
Spark also allows more fine-grained control over caching behavior. You can use the persist method to specify what approach Spark uses to cache data. More information on RDD caching can be found here:
http://spark.apache.org/docs/latest/programmingguide.html#rdd-persistence

Broadcast variables and accumulators

Another core feature of Spark is the ability to create two special types of variables--broadcast variables and accumulators.

A broadcast variable is a read-only variable that is created from the driver program object and made available to the nodes that will execute the computation. This is very useful in applications that need to make the same data available to the worker nodes in an efficient manner, such as distributed systems. Spark makes creating broadcast variables as simple as calling a method on SparkContext, as follows:

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

A broadcast variable can be accessed from nodes other than the driver program that created it (that is, the worker nodes) by calling value on the variable:

sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++  
x).collect

This code creates a new RDD with three records from a collection (in this case, a Scala List) of ("1", "2", "3"). In the map function, it returns a new collection with the relevant rom our new RDD appended to the broadcastAList that is our broadcast variable:

...
res1: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b,
c, d, e, 2), List(a, b, c, d, e, 3))

Notice the collect method in the preceding code. This is a Spark action that returns the entire RDD to the driver as a Scala (or Python or Java) collection.

We will often use when we wish to apply further processing to our results locally within the driver program.

Note that collect should generally only be used in cases where we really want to return the full result set to the driver and perform further processing. If we try to call collect on a very large dataset, we might run out of memory on the driver and crash our program.
It is preferable to perform as much heavy-duty processing on our Spark cluster as possible, preventing the driver from becoming a bottleneck. In many cases, however, such as during iterations in many machine learning models, collecting results to the driver is necessary.

On inspecting the result, we will see that for each of the three records in our new RDD, we now have a record that is our original broadcasted List, with the new element appended to it (that is, there is now "1", "2", or "3" at the end):

An accumulator is also a variable that is broadcasted to the worker nodes. The key difference between a broadcast variable and an accumulator is that while the broadcast variable is read-only, the accumulator can be added to. There are limitations to this, that is, in particular, the addition must be an associative operation so that the global accumulated value can be correctly computed in parallel and returned to the driver program. Each worker node can only access and add to its own local accumulator value, and only the driver program can access the global value. Accumulators are also accessed within the Spark code using the value method.

For more details on broadcast variables and accumulators, refer to the Shared Variables section of the Spark Programming Guide at http://spark.apache.org/docs/latest/programming-guide.html#shared-variables.
 

SchemaRDD

SchemaRDD is a combination of RDD and schema information. It also offers many rich and easy-to-use APIs (that is, the DataSet API). SchemaRDD is not used with 2.0 and is internally used by DataFrame and Dataset APIs.

A schema is used to describe how structured data is logically organized. After obtaining the schema information, the SQL engine is able to provide the structured query capability for the corresponding data. The DataSet API is a replacement for Spark SQL parser's functions. It is an API to achieve the original program logic tree. Subsequent processing steps reuse Spark SQL's core logic. We can safely consider DataSet API's processing functions as completely equivalent to that of SQL queries.

SchemaRDD is an RDD subclass. When a program calls the DataSet API, a new SchemaRDD object is created, and a logic plan attribute of the new object is created by adding a new logic operation node on the original logic plan tree. Operations of the DataSet API (like RDD) are of two types--Transformation and Action.

APIs related to the relational operations are attributed to the Transformation type.

Operations associated with data output sources are of Action type. Like RDD, a Spark job is triggered and delivered for cluster execution, only when an Action type operation is called.

 

Spark data frame

In Apache Spark, a Dataset is a distributed collection of data. The Dataset is a new interface added since Spark 1.6. It provides the benefits of RDDs with the benefits of Spark SQL's execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, and so on). The Dataset API is available only for in Scala and Java. It is not available for Python or R.

A DataFrame is a dataset with named columns. It is equivalent to a table in a relational database or a data frame in R/Python, with richer optimizations. DataFrame is constructed from structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Python, Java, and R.

A Spark DataFrame needs the Spark session instantiated first:

import org.apache.spark.sql.SparkSession 
val spark = SparkSession.builder().appName("Spark SQL").config("spark.some.config.option", "").getOrCreate()
import spark.implicits._

Next, we create a DataFrame from a Json file using the spark.read.json function:

scala> val df = spark.read.json("/home/ubuntu/work/ml-resources
/spark-ml/Chapter_01/data/example_one.json")

Note that Spark Implicits are being used to implicitly convert RDD to Data Frame types:

org.apache.spark.sql
Class SparkSession.implicits$
Object org.apache.spark.sql.SQLImplicits
Enclosing class: SparkSession

Implicit methods available in Scala for converting common Scala objects into DataFrames.

Output will be similar to the following listing:

df: org.apache.spark.sql.DataFrame = [address: struct<city: 
string, state: string>, name: string]

Now we want to see how this is actually loaded in the DataFrame:

scala> df.show
+-----------------+-------+
| address| name|
+-----------------+-------+
| [Columbus,Ohio]| Yin|
|[null,California]|Michael|
+-----------------+-------+
 

The first step to a Spark program in Scala

We will now use the ideas we introduced in the previous section to write a basic Spark program to manipulate a dataset. We will start with Scala and then write the same program in Java and Python. Our program will be based on exploring some data from an online store, about which users have purchased which products. The data is contained in a Comma-Separated-Value (CSV) file called UserPurchaseHistory.csv. This file is expected to be in the data directory.

The contents are shown in the following snippet. The first column of the CSV is the username, the second column is the product name, and the final column is the price:

John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49

For our Scala program, we need to create two files-our Scala code and our project build configuration file-using the build tool Scala Build Tool (SBT). For ease of use, we recommend that you use -spark-app for this chapter. This code also contains the CSV file under the data directory. You will need SBT installed on your system in order to run this example program (we use version 0.13.8 at the time of writing this book).

Setting up SBT is beyond the scope of this book; however, you can find more information at http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html.

Our SBT configuration file, build.sbt, looks like this (note that the empty lines between each line of code are required):

name := "scala-spark-app" 
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"

The last line adds the dependency on Spark to our project.

Our Scala program is contained in the ScalaApp.scala file. We will walk through the program piece by piece. First, we need to import the required Spark classes:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._

/**
* A simple Spark app in Scala
*/
object ScalaApp {

In our main method, we need to initialize our SparkContext object and use this to access our CSV data file with the textFile method. We will then map the raw text by splitting the string on the delimiter character (a comma in this case) and extracting the relevant records for username, product, and price:

def main(args: Array[String]) { 
val sc = new SparkContext("local[2]", "First Spark App")
// we take the raw data in CSV format and convert it into a
set of records of the form (user, product, price)
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0),
purchaseRecord(1), purchaseRecord(2)))

Now that we have an RDD, where each record is made up of (user, product, price), we can compute various interesting metrics for our store, such as the following ones:

  • The total number of purchases
  • The number of unique users who purchased
  • Our total revenue
  • Our most popular product

Let's compute the preceding metrics:

// let's count the number of purchases 
val numPurchases = data.count()
// let's count how many unique users made purchases
val uniqueUsers = data.map{ case (user, product, price) => user
}.distinct().count()
// let's sum up our total revenue
val totalRevenue = data.map{ case (user, product, price) =>
price.toDouble }.sum()
// let's find our most popular product
val productsByPopularity = data
.map{ case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)

This last piece of code to compute the most popular product is an example of the Map/Reduce pattern made popular by Hadoop. First, we mapped our records of (user, product, price) to the records of (product, 1). Then, we performed a reduceByKey operation, where we summed up the 1s for each unique product.

Once we have this transformed RDD, which contains the number of purchases for each product, we will call collect, which returns the results of the computation to the driver program as a local Scala collection. We will then sort these counts locally (note that in practice, if the amount of data is large, we will perform the sorting in parallel, usually with a Spark operation such as sortByKey).

Finally, we will print out the results of our computations to the console:

    println("Total purchases: " + numPurchases) 
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d
purchases".format(mostPopular._1, mostPopular._2))
}
}

We can run this program by running sbt run in the project's base directory or by running the program in your Scala IDE if you are using one. The output should look similar to the following:

...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases

We can see that we have 5 purchases from four different users with total revenue of 39.91. Our most popular product is an iPhone cover with 2 purchases.

 

The first step to a Spark program in Java

The Java API is very similar in principle to the Scala API. However, while Scala can call the Java code quite easily, in some cases, it is not possible to call the Scala code from Java. This is particularly the case when Scala code makes use of Scala features such as implicit conversions, default parameters, and the Scala reflection API.

Spark makes heavy use of these features in general, so it is necessary to have a separate API specifically for Java that includes Java versions of the common classes. Hence, SparkContext becomes JavaSparkContext and RDD becomes JavaRDD.

Java versions prior to version 8 do not support anonymous functions and do not have succinct syntax for functional-style programming, so functions in the Spark Java API must implement a WrappedFunction interface with the call method signature. While it is significantly more verbose, we will often create one-off anonymous classes to pass to our Spark operations, which implement this interface and the call method to achieve much the same effect as anonymous functions in Scala.

Spark provides support for Java 8's anonymous function (or lambda) syntax. Using this syntax makes a Spark program written in Java 8 look very close to the equivalent Scala program.

In Scala, an RDD of key/value pairs provides special operators (such as reduceByKey and saveAsSequenceFile, for example) that are accessed automatically via implicit conversions. In Java, special types of JavaRDD classes are required in order to access similar functions. These include JavaPairRDD to work with key/value pairs and JavaDoubleRDD to work with numerical records.

In this section, we covered the standard Java API syntax. For more details and examples related to working RDDs in Java, as well as the Java 8 lambda syntax, refer to the Java sections of the Spark Programming Guide found at http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.

We will see examples of most of these differences in the following Java program, which is included in the example code of this chapter in the directory named java-spark-app. The code directory also contains the CSV data file under the data subdirectory.

We will build and run this project with the Maven build tool, which we assume you have installed on your system.

Installing and setting up Maven is beyond the scope of this book. Usually, Maven can easily be installed using the package manager on your Linux system or HomeBrew or MacPorts on Mac OS X.
Detailed installation instructions can be found at http://maven.apache.org/download.cgi.

The project contains a Java file called JavaApp.java, which contains our program code:

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;


/**
* A simple Spark app in Java
*/
public class JavaApp {
public static void main(String[] args) {

As in our Scala example, we first need to initialize our context. Note that we will use the JavaSparkContext class here instead of the SparkContext class that we used earlier. We will use the JavaSparkContext class in the same way to access our data using textFile and then split each row into the required fields. Note how we used an anonymous class to define a split function that performs the string processing in the highlighted code:

JavaSparkContext sc = new JavaSparkContext("local[2]", 
"First Spark App");
// we take the raw data in CSV format and convert it into a
// set of records of the form (user, product, price)
JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(","));

Now, we can compute the same metrics as we did in our Scala example. Note how some methods are the same (for example, distinct and count) for the Java and Scala APIs. Also note the use of anonymous classes that we pass to the map function. This code is highlighted here:

// let's count the number of purchases 
long numPurchases = data.count();
// let's count how many unique users made purchases
long uniqueUsers = data.map(strings ->
strings[0]).distinct().count();
// let's sum up our total revenue
Double totalRevenue = data.map(strings ->
Double.parseDouble(strings[2])).reduce((Double v1,
Double v2) -> new Double(v1.doubleValue() + v2.doubleValue()));

In the following lines of code, we can see that the approach to compute the most popular product is the same as that in the Scala example. The extra code might seem complex, but it is mostly related to the Java code required to create the anonymous functions (which we have highlighted here). The actual functionality is the same:

// let's find our most popular product 
List<Tuple2<String, Integer>> pairs = data.mapToPair(strings -> new Tuple2<String, Integer>(strings[1], 1)).reduceByKey((Integer i1, Integer i2) -> i1 + i2).collect();

Map<String, Integer> sortedData = new HashMap<>();
Iterator it = pairs.iterator();
while (it.hasNext()) {
Tuple2<String, Integer> o = (Tuple2<String, Integer>) it.next();
sortedData.put(o._1, o._2);
}
List<String> sorted = sortedData.entrySet()
.stream()
.sorted(Comparator.comparing((Map.Entry<String, Integer>
entry) -> entry.getValue()).reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
String mostPopular = sorted.get(0);
int purchases = sortedData.get(mostPopular);
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product:
%s with %d purchases", mostPopular, purchases));
}
}

As can be seen, the general structure is similar to the Scala version, apart from the extra boilerplate code used to declare variables and functions via anonymous inner classes. It is a good exercise to work through both examples and compare lines of Scala code to those in Java to understand how the same result is achieved in each language.

This program can be run with the following command executed from the project's base directory:

  $ mvn exec:java -Dexec.mainClass="JavaApp"

You will see output that looks very similar to the Scala version with identical results of the computation:

...
14/01/30 17:02:43 INFO spark.SparkContext: Job finished: collect
at JavaApp.java:46, took 0.039167 s

Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
 

The first step to a Spark program in Python

Spark's Python API exposes virtually all the functionalities of Spark's Scala API in the Python language. There are some features that are not yet supported (for example, graph processing with GraphX and a few API methods here and there). Refer to the Python section of Spark Programming Guide (http://spark.apache.org/docs/latest/programming-guide.html) for more details.

PySpark is built using Spark's Java API. Data is processed in native Python, cached, and shuffled in JVM. Python driver program's SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. The driver uses Py4J for local communication between the Python and Java SparkContext objects. RDD transformations in Python map to transformations on PythonRDD objects in Java. PythonRDD object launches Python sub-processes on remote worker machines, communicate with them using pipes. These sub-processes are used to send the user's code and to process data.

Following on from the preceding examples, we will now write a Python version. We assume that you have Python version 2.6 and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).

The example program is included in the sample code for this chapter, in the directory named python-spark-app, which also contains the CSV data file under the data subdirectory. The project contains a script, pythonapp.py, provided here.

A simple Spark app in Python:

from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")
# we take the raw data in CSV format and convert it into a set of
records of the form (user, product, price)
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda
line: line.split(",")).map(lambda record: (record[0], record[1],
record[2]))
# let's count the number of purchases
numPurchases = data.count()
# let's count how many unique users made purchases
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# let's sum up our total revenue
totalRevenue = data.map(lambda record: float(record[2])).sum()
# let's find our most popular product
products = data.map(lambda record: (record[1],
1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" %
(mostPopular[0], mostPopular[1])

If you compare the Scala and Python versions of our program, you will see that generally, the syntax looks very similar. One key difference is how we express anonymous functions (also called lambda functions; hence, the use of this keyword for the Python syntax). In Scala, we've seen that an anonymous function mapping an input x to an output y is expressed as x => y, while in Python, it is lambda x: y. In the highlighted line in the preceding code, we are applying an anonymous function that maps two inputs, a and b, generally of the same type, to an output. In this case, the function that we apply is the plus function; hence, lambda a, b: a + b.

The best way to run the script is to run the following command from the base directory of the sample project:

 $SPARK_HOME/bin/spark-submit pythonapp.py

Here, the SPARK_HOME variable should be replaced with the path of the directory in which you originally unpacked the Spark prebuilt binary package at the start of this chapter.

Upon running the script, you should see output similar to that of the Scala and Java examples, with the results of our computation being the same:

...
14/01/30 11:43:47 INFO SparkContext: Job finished: collect at
pythonapp.py:14, took 0.050251 s

Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
 

The first step to a Spark program in R

SparkR is an R package which provides a frontend to use Apache Spark from R. In Spark 1.6.0; SparkR provides a distributed data frame on large datasets. SparkR also supports distributed machine learning using MLlib. This is something you should try out while reading machine learning chapters.

SparkR DataFrames

DataFrame is a collection of data organized into names columns that are distributed. This concept is very similar to a relational database or a data frame of R but with much better optimizations. Source of these data frames could be a CSV, a TSV, Hive tables, local R data frames, and so on.

Spark distribution can be run using the ./bin/sparkR shell.

Following on from the preceding examples, we will now write an R version. We assume that you have R (R version 3.0.2 (2013-09-25)-Frisbee Sailing), R Studio and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).

The example program is included in the sample code for this chapter, in the directory named r-spark-app, which also contains the CSV data file under the data subdirectory. The project contains a script, r-script-01.R, which is provided in the following. Make sure you change PATH to appropriate value for your environment.

Sys.setenv(SPARK_HOME = "/PATH/spark-2.0.0-bin-hadoop2.7") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"),
.libPaths()))
#load the Sparkr library
library(SparkR)
sc <- sparkR.init(master = "local", sparkPackages="com.databricks:spark-csv_2.10:1.3.0")
sqlContext <- sparkRSQL.init(sc)

user.purchase.history <- "/PATH/ml-resources/spark-ml/Chapter_01/r-spark-app/data/UserPurchaseHistory.csv"
data <- read.df(sqlContext, user.purchase.history, "com.databricks.spark.csv", header="false")
head(data)
count(data)

parseFields <- function(record) {
Sys.setlocale("LC_ALL", "C") # necessary for strsplit() to work correctly
parts <- strsplit(as.character(record), ",")
list(name=parts[1], product=parts[2], price=parts[3])
}

parsedRDD <- SparkR:::lapply(data, parseFields)
cache(parsedRDD)
numPurchases <- count(parsedRDD)

sprintf("Number of Purchases : %d", numPurchases)
getName <- function(record){
record[1]
}

getPrice <- function(record){
record[3]
}

nameRDD <- SparkR:::lapply(parsedRDD, getName)
nameRDD = collect(nameRDD)
head(nameRDD)

uniqueUsers <- unique(nameRDD)
head(uniqueUsers)

priceRDD <- SparkR:::lapply(parsedRDD, function(x) { as.numeric(x$price[1])})
take(priceRDD,3)

totalRevenue <- SparkR:::reduce(priceRDD, "+")

sprintf("Total Revenue : %.2f", s)

products <- SparkR:::lapply(parsedRDD, function(x) { list( toString(x$product[1]), 1) })
take(products, 5)
productCount <- SparkR:::reduceByKey(products, "+", 2L)
productsCountAsKey <- SparkR:::lapply(productCount, function(x) { list( as.integer(x[2][1]), x[1][1])})

productCount <- count(productsCountAsKey)
mostPopular <- toString(collect(productsCountAsKey)[[productCount]][[2]])
sprintf("Most Popular Product : %s", mostPopular)

Run the script with the following command on the bash terminal:

  $ Rscript r-script-01.R 

Your output will be similar to the following listing:

> sprintf("Number of Purchases : %d", numPurchases)
[1] "Number of Purchases : 5"

> uniqueUsers <- unique(nameRDD)
> head(uniqueUsers)
[[1]]
[[1]]$name
[[1]]$name[[1]]
[1] "John"
[[2]]
[[2]]$name
[[2]]$name[[1]]
[1] "Jack"
[[3]]
[[3]]$name
[[3]]$name[[1]]
[1] "Jill"
[[4]]
[[4]]$name
[[4]]$name[[1]]
[1] "Bob"

> sprintf("Total Revenue : %.2f", totalRevenueNum)
[1] "Total Revenue : 39.91"

> sprintf("Most Popular Product : %s", mostPopular)
[1] "Most Popular Product : iPad Cover"
 

Getting Spark running on Amazon EC2

The Spark project provides scripts to run a Spark cluster in the cloud on Amazon's EC2 service. These scripts are located in the ec2 directory. You can run the spark-ec2 script contained in this directory with the following command:

>./ec2/spark-ec2 

Running it in this way without an argument will show the help output:

Usage: spark-ec2 [options] <action> <cluster_name>
<action> can be: launch, destroy, login, stop, start, get-master

Options:
...

Before creating a Spark EC2 cluster, you will need to ensure that you have an
Amazon account.

If you don't have an Amazon Web Services account, you can sign up at http://aws.amazon.com/.
The AWS console is available at http://aws.amazon.com/console/.

You will also need to create an Amazon EC2 key pair and retrieve the relevant security credentials. The Spark documentation for EC2 (available at http://spark.apache.org/docs/latest/ec2-scripts.html) explains the requirements:

Create an Amazon EC2 key pair for yourself. This can be done by logging into your Amazon Web Services account through the AWS console, clicking on Key Pairs on the left sidebar, and creating and downloading a key. Make sure that you set the permissions for the private key file to 600 (that is, only you can read and write it) so that ssh will work.
Whenever you want to use the spark-ec2 script, set the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to your Amazon EC2 access key ID and secret access key, respectively. These can be obtained from the AWS homepage by clicking Account | Security Credentials | Access Credentials.

When creating a key pair, choose a name that is easy to remember. We will simply use the name spark for the key pair. The key pair file itself will be called spark.pem. As mentioned earlier, ensure that the key pair file permissions are set appropriately and that the environment variables for the AWS credentials are exported using the following commands:

  $ chmod 600 spark.pem
$ export AWS_ACCESS_KEY_ID="..."
$ export AWS_SECRET_ACCESS_KEY="..."

You should also be careful to keep your downloaded key pair file safe and not lose it, as it can only be downloaded once when it is created!

Note that launching an Amazon EC2 cluster in the following section will incur costs to your AWS account.

Launching an EC2 Spark cluster

We're now ready to launch a small Spark cluster by changing into the ec2 directory and then running the cluster launch command:

 $  cd ec2
$ ./spark-ec2 --key-pair=rd_spark-user1 --identity-file=spark.pem
--region=us-east-1 --zone=us-east-1a launch my-spark-cluster

This will launch a new Spark cluster called test-cluster with one master and one slave node of instance type m3.medium. This cluster will be launched with a Spark version built for Hadoop 2. The key pair name we used is spark, and the key pair file is spark.pem (if you gave the files different names or have an existing AWS key pair, use that name instead).

It might take quite a while for the cluster to fully launch and initialize. You should see something like the following immediately after running the launch command:

Setting up security groups...
Creating security group my-spark-cluster-master
Creating security group my-spark-cluster-slaves
Searching for existing cluster my-spark-cluster in region
us-east-1...

Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slave in us-east-1a, regid = r-5a893af2
Launched master in us-east-1a, regid = r-39883b91
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state...........
Warning: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
1.amazonaws.com port 22: Connection refused

Warning: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
1.amazonaws.com port 22: Connection refused

Warnig: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
1.amazonaws.com port 22: Connection refused

Cluster is now in 'ssh-ready' state. Waited 510 seconds.

If the cluster has launched successfully, you should eventually see a console output similar to the following listing:

./tachyon/setup.sh: line 5: /root/tachyon/bin/tachyon: 
No such file or directory

./tachyon/setup.sh: line 9: /root/tachyon/bin/tachyon-start.sh:
No such file or directory

[timing] tachyon setup: 00h 00m 01s
Setting up rstudio
spark-ec2/setup.sh: line 110: ./rstudio/setup.sh:
No such file or directory

[timing] rstudio setup: 00h 00m 00s
Setting up ganglia
RSYNC'ing /etc/ganglia to slaves...
ec2-52-91-214-206.compute-1.amazonaws.com
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Connection to ec2-52-91-214-206.compute-1.amazonaws.com closed.
Shutting down GANGLIA gmetad: [FAILED]
Starting GANGLIA gmetad: [ OK ]
Stopping httpd: [FAILED]
Starting httpd: httpd: Syntax error on line 154 of /etc/httpd
/conf/httpd.conf: Cannot load /etc/httpd/modules/mod_authz_core.so
into server: /etc/httpd/modules/mod_authz_core.so: cannot open
shared object file: No such file or directory
[FAILED]
[timing] ganglia setup: 00h 00m 03s
Connection to ec2-52-90-110-128.compute-1.amazonaws.com closed.
Spark standalone cluster started at
http://ec2-52-90-110-128.compute-1.amazonaws.com:8080

Ganglia started at http://ec2-52-90-110-128.compute-
1.amazonaws.com:5080/ganglia

Done!
ubuntu@ubuntu:~/work/spark-1.6.0-bin-hadoop2.6/ec2$

This will create two VMs - Spark Master and Spark Slave of type m1.large as shown in the following screenshot :

To test whether we can connect to our new cluster, we can run the following command:

  $ ssh -i spark.pem root@ ec2-52-90-110-128.compute-1.amazonaws.com

Remember to replace the public domain name of the master node (the address after root@ in the preceding command) with the correct Amazon EC2 public domain name that will be shown in your console output after launching the cluster.

You can also retrieve your cluster's master public domain name by running this line of code:

  $ ./spark-ec2 -i spark.pem get-master test-cluster

After successfully running the ssh command, you will be connected to your Spark master node in EC2, and your terminal output should match the following screenshot:

We can test whether our cluster is correctly set up with Spark by changing into the Spark directory and running an example in the local mode:

  $ cd spark
$ MASTER=local[2] ./bin/run-example SparkPi

You should see output similar to what you would get on running the same command on your local computer:

...
14/01/30 20:20:21 INFO SparkContext: Job finished: reduce at
SparkPi.scala:35, took 0.864044012 s

Pi is roughly 3.14032
...

Now that we have an actual cluster with multiple nodes, we can test Spark in the cluster mode. We can run the same example on the cluster, using our one slave node by passing in the master URL instead of the local version:

    $ MASTER=spark:// ec2-52-90-110-128.compute-
1.amazonaws.com:7077 ./bin/run-example SparkPi
Note that you will need to substitute the preceding master domain name with the correct domain name for your specific cluster.

Again, the output should be similar to running the example locally; however, the log messages will show that your driver program has connected to the Spark master:

...
14/01/30 20:26:17 INFO client.Client$ClientActor: Connecting to
master spark://ec2-54-220-189-136.eu-
west-1.compute.amazonaws.com:7077

14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Connected to Spark cluster with app ID app-20140130202617-0001

14/01/30 20:26:17 INFO client.Client$ClientActor: Executor added:
app-20140130202617-0001/0 on worker-20140130201049-
ip-10-34-137-45.eu-west-1.compute.internal-57119
(ip-10-34-137-45.eu-west-1.compute.internal:57119) with 1 cores

14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Granted executor ID app-20140130202617-0001/0 on hostPort
ip-10-34-137-45.eu-west-1.compute.internal:57119 with 1 cores,
2.4 GB RAM

14/01/30 20:26:17 INFO client.Client$ClientActor:
Executor updated: app-20140130202617-0001/0 is now RUNNING

14/01/30 20:26:18 INFO spark.SparkContext: Starting job: reduce at
SparkPi.scala:39

...

Feel free to experiment with your cluster. Try out the interactive console in Scala, for example:

  $ ./bin/spark-shell --master spark:// ec2-52-90-110-128.compute-
1.amazonaws.com:7077

Once you've finished, type exit to leave the console. You can also try the PySpark console by running the following command:

  $ ./bin/pyspark --master spark:// ec2-52-90-110-128.compute-
1.amazonaws.com:7077

You can use the Spark Master web interface to see the applications registered with the master. To load the Master Web UI, navigate to ec2-52-90-110-128.compute-1.amazonaws.com:8080 (again, remember to replace this domain name with your own master domain name).

Remember that you will be charged by Amazon for usage of the cluster. Don't forget to stop or terminate this test cluster once you're done with it. To do this, you can first exit the ssh session by typing exit to return to your own local system and then run the following command:

  $ ./ec2/spark-ec2 -k spark -i spark.pem destroy test-cluster

You should see the following output:

Are you sure you want to destroy the cluster test-cluster?
The following instances will be terminated:
Searching for existing cluster test-cluster...
Found 1 master(s), 1 slaves
> ec2-54-227-127-14.compute-1.amazonaws.com
> ec2-54-91-61-225.compute-1.amazonaws.com
ALL DATA ON ALL NODES WILL BE LOST!!
Destroy cluster test-cluster (y/N): y
Terminating master...
Terminating slaves...

Hit Y and then Enter to destroy the cluster.

Congratulations! You've just set up a Spark cluster in the cloud, run a fully parallel example program on this cluster, and terminated it. If you would like to try out any of the example code in the subsequent chapters (or your own Spark programs) on a cluster, feel free to experiment with the Spark EC2 scripts and launch a cluster of your chosen size and instance profile. (Just be mindful of the costs and remember to shut it down when you're done!)

 

Configuring and running Spark on Amazon Elastic Map Reduce

Launch a Hadoop cluster with Spark installed using the Amazon Elastic Map Reduce. Perform the following steps to create an EMR cluster with Spark installed:

  1. Launch an Amazon EMR Cluster.
  2. Open the Amazon EMR UI console at https://console.aws.amazon.com/elasticmapreduce/.
  3. Choose Create cluster:
  1. Choose appropriate Amazon AMI Version 3.9.0 or later as shown in the following screenshot:
  1. For the applications to be installed field, choose Spark 1.5.2 or later from the list shown on the User Interface and click on Add.
  2. Select other hardware options as necessary:
    • The Instance Type
    • The keypair to be used with SSH
    • Permissions
    • IAM roles (Default orCustom)

Refer to the following screenshot:

  1. Click on Create cluster. The cluster will start instantiating as shown in the following screenshot:
  1. Log in into the master. Once the EMR cluster is ready, you can SSH into the master:
   $ ssh -i rd_spark-user1.pem
hadoop@ec2-52-3-242-138.compute-1.amazonaws.com
The output will be similar to following listing:
     Last login: Wed Jan 13 10:46:26 2016

__| __|_ )
_| ( / Amazon Linux AMI
___|___|___|

https://aws.amazon.com/amazon-linux-ami/2015.09-release-notes/
23 package(s) needed for security, out of 49 available
Run "sudo yum update" to apply all updates.
[hadoop@ip-172-31-2-31 ~]$
  1. Start the Spark Shell:
      [hadoop@ip-172-31-2-31 ~]$ spark-shell
16/01/13 10:49:36 INFO SecurityManager: Changing view acls to:
hadoop

16/01/13 10:49:36 INFO SecurityManager: Changing modify acls to:
hadoop

16/01/13 10:49:36 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(hadoop); users with modify permissions:
Set(hadoop)

16/01/13 10:49:36 INFO HttpServer: Starting HTTP Server
16/01/13 10:49:36 INFO Utils: Successfully started service 'HTTP
class server' on port 60523.

Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ &grave;/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 1.5.2
/_/
scala> sc
  1. Run Basic Spark sample from the EMR:
    scala> val textFile = sc.textFile("s3://elasticmapreduce/samples
/hive-ads/tables/impressions/dt=2009-04-13-08-05
/ec2-0-51-75-39.amazon.com-2009-04-13-08-05.log")

scala> val linesWithCartoonNetwork = textFile.filter(line =>
line.contains("cartoonnetwork.com")).count()
Your output will be as follows:
     linesWithCartoonNetwork: Long = 9
 

UI in Spark

Spark provides a web interface which can be used to monitor jobs, see the environment, and run SQL commands.

SparkContext launches a web UI on port 4040 that displays useful information about the application. This includes the following:

  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Environmental information
  • Information about the running executors

This interface can be accessed by going to http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to ports beginning with port 4040 (4041, 4042, and so on).

The following screenshots display some of the information provided by the Web UI:

UI showing the Environment of the Spark Content
UI table showing Executors available
 

Supported machine learning algorithms by Spark

The following algorithms are supported by Spark ML:

  • Collaborative filtering
    • Alternating Least Squares (ALS): Collaborative filtering is often used for recommender systems. These techniques aim to fill the missing entries of a user-item association matrix. The spark.mllib currently supports model-based collaborative filtering. In this implementation, users and products are described by a small set of latent factors that can be used to predict missing entries. The spark.mllib uses the ALS algorithm to learn these latent factors.
  • Clustering: This is an unsupervised learning problem where the aim is to group subsets of entities with one another based on the notion of similarity. Clustering is used for exploratory analysis and as a component of a hierarchical supervised learning pipeline. When used in a learning pipeline, distinct classifiers or regression models are trained for each cluster. The following clustering techniques are implemented in Spark:
    • k-means: This is one of the commonly used clustering algorithms that cluster the data points into a predefined number of clusters. It is up to the user to choose the number of clusters. The spark.mllib implementation includes a parallelized variant of the k-means++ method (http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf).
    • Gaussian mixture: A Gaussian Mixture Model (GMM) represents a composite distribution where points are taken from one of the k Gaussian sub-distributions. Each of these distributions has its own probability. The spark.mllib implementation uses the expectation-maximization algorithm to induce the maximum-likelihood model given a set of samples.
    • Power Iteration Clustering (PIC): This is a scalable algorithm for clustering vertices of a graph given pairwise similarities as edge properties. It computes a pseudo-eigenvector of the (affinity matrix which is normalized) of the graph using power iteration.

Power iteration is an eigenvalue algorithm. Given a matrix X, the algorithm will produce a numberλ ( eigenvalue) and a non-zero vectorv (the eigenvector), such thatXv = λv.

Pseudo eigenvectors of a matrix can be thought of as the eigenvectors of a nearby matrix. More specifically, pseudo eigenvectors are defined as:

Let A be an n by n matrix. Let E be any matrix such that ||E|| = €. Then the eigenvectors of A + E are defined to be pseudo-eigenvectors of A. This eigenvector uses it to cluster graph vertices.

The spark.mllib includes an implementation of PIC using GraphX. It takes an RDD of tuples and outputs a model with the clustering assignments. The similarities must be non-negative. PIC makes the assumption that the similarity measure is symmetric.

(In statistics, a similarity measure or similarity function is a real-valued function that quantifies the similarity between two objects. Such measures are inverse of distance metrics; an example of this is the Cosine similarity)

A pair (srcId, dstId) regardless of the ordering should appear at the most once in the input data.

    • Latent Dirichlet Allocation (LDA): This is a form of a topic model that infers topics from a collection of text documents. LDA is a form clustering algorithm. The following points explain the topics:

Topics are cluster centers and documents correspond to examples in a dataset Topics and documents both exist in a feature space, where feature vectors are vectors of word counts ( also known as bag of words)
Instead of estimating a clustering using a traditional distance approach, LDA uses a function based on a model of how text documents are generated

    • Bisecting k-means: This is a type of hierarchical clustering. Hierarchical Cluster Analysis (HCA) is a method of cluster analysis that builds a hierarchy of clusterstop down. In this approach, all observations start in one cluster and splits are performed recursively as one moves down the hierarchy.

Hierarchical clustering is one of the commonly used methods of cluster analysis that seek to build a hierarchy of clusters.

    • Streaming k-means: When data arrives in a stream, we want to estimate clusters dynamically and update them as new data arrives. The spark.mllib supports streaming k-means clustering, with parameters to control the decay of the estimates. The algorithm uses a generalization of the mini-batch k-means update rule.
  • Classification
    • Decision Trees: Decision trees and their ensembles are one of the methods for classification and regression. Decision trees are popular as they are easy to interpret, handle categorical features, and extend to the multiclass classification setting. They do not require feature scaling and are also able to capture non-linearities and feature interactions. Tree ensemble algorithms, random forests and boosting are among the top performers for classification and regression scenarios.

The spark.mllib implements decision trees for binary and multiclass classification and regression. It supports both continuous and categorical features. The implementation partitions data by rows, which allows distributed training with millions of instances.

    • Naive Bayes: Naive Bayes classifiers are a family of simple probabilistic classifiers based on applying Bayes' theorem (https://en.wikipedia.org/wiki/Bayes%27_theorem) with strong (naive) independence assumptions between the features.

Naive Bayes is a multiclass classification algorithm with the assumption of independence between every pair of features. In a single pass of training data, the algorithm computes the conditional probability distribution of each feature given the label, and then it applies Bayes' theorem to compute the conditional probability distribution of a label given an observation, which is then used for prediction. The spark.mllib supports multinomial naive Bayes and Bernoulli Naive Bayes. These models are generally used for document classification.

    • Probability Classifier: In machine learning, a probabilistic classifier is a classifier that can predict, given an input, a probability distribution over a set of classes, rather than outputting the most likely class that the sample should belong to. Probabilistic classifiers provide classification with some certainty, which can be useful on its own or when combining classifiers into ensembles.
    • Logistical Regression: This is a method used to predict a binary response. Logistic regression measures the relationship between the categorical dependent variable and independent variables by estimating probabilities using a logistical function. This function is a cumulative logistic distribution.

It is a special case of Generalized Linear Models (GLM) that predicts the probability of the outcome. For more background and more details about the implementation, refer to the documentation on the logistic regression in spark.mllib.

GLM is considered a generalization of linear regression that allows for response variables that have an error distribution other than a normal distribution.

    • Random Forest: This algorithms use ensembles of decision trees to decide decision boundaries. Random forests combine many decision trees. This reduces the risk of overfitting the result.

Spark ML supports random forest for binary and multi-class classification as well as regression. It can use used for continuous or categorical values.

  • Dimensionality reduction: This is the process of reducing the number of variables on which machine learning will be done. It can be used to extract latent features from raw features or to compress data while maintaining the overall structure. MLlib provides support dimensionality reduction on top of the RowMatrix class.
    • Singular value decomposition (SVD): Singular value decomposition of a matrix M: m x n (real or complex) is a factorization of the form UΣV*, where U is anm x R matrix. Σ is an R x R rectangular diagonal matrix with non-negative real numbers on the diagonal, and V is an n x r unitary matrix. r is equal to the rank of the matrix M.
    • Principal component analysis (PCA): This is a statistical method used to find a rotation to find largest variance in the first coordinate. Each succeeding coordinate, in turn, has the largest variance possible. The columns of the rotation matrix are called principal components. PCA is used widely in dimensionality reduction.

MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format using RowMatrix.
Spark supports features extraction and transforation using TF-IDF, ChiSquare, Selector, Normalizer, and Word2Vector.

  • Frequent pattern mining:
    • FP-growth: FP stands for frequent pattern. Algorithm first counts item occurrences (attribute and value pairs) in the dataset and stores them in the header table.

In the second pass, the algorithm builds the FP-tree structure by inserting instances (made of items). Items in each instance are sorted by descending order of their frequency in the dataset; this ensures that the tree can be processed quickly. Items in each instance that do not meet minimum coverage threshold are discarded. For a use case where many instances share most frequent items, the FP-tree provides high compression close to the tree root.

    • Association rules: Association rule learning is a mechanism for discovering interesting relations between variables in large databases.

It implements a parallel rule generation algorithm for constructing rules that have a single item as the consequent.

  • PrefixSpan: This is a sequential pattern mining algorithm.
  • Evaluation metrics: The spark.mllib comes with a suite of metrics for evaluating the algorithms.
  • PMML model export: The Predictive Model Markup Language (PMML) is an XML-based predictive model interchange format. PMML provides a mechanism for analytic applications to describe and exchange predictive models produced by machine learning algorithms.

The spark.mllib allows the export of its machine learning models to PMML and their equivalent PMML models.

  • Optimization (Developer)
    • Stochastic Gradient Descent: This is used to optimize gradient descent to minimize an objective function; this function is a sum of differentiable functions.

Gradient descent methods and the Stochastic Subgradient Descent (SGD) are included as a low-level primitive in MLlib, on top of which various ML algorithms are developed.

  • Limited-Memory BFGS (L-BFGS): This is an optimization algorithm and belongs to the family of quasi-Newton methods that approximates the Broyden-Fletcher-Goldfarb-Shanno (BFGS) algorithm. It uses a limited amount of computer memory. It is used for parameter estimation in machine learning.

The BFGS method approximates Newton's method, which is a class of hill-climbing optimization techniques that seeks a stationary point of a function. For such problems, a necessary optimal condition is that the gradient should be zero.

 

Benefits of using Spark ML as compared to existing libraries

AMQ Lab at Berkley Evaluated Spark, and RDDs were evaluated through a series of experiments on Amazon EC2 as well as benchmarks of user applications.

  • Algorithms used: Logistical Regression and k-means
  • Use case: First iteration, multiple iterations.

All the tests used m1.xlarge EC2 nodes with 4 cores and 15 GB of RAM. HDFS was for storage with 256 MB blocks. Refer to the following graph:

The preceding graph shows the comparison between the performance of Hadoop and Spark for the first and subsequent iteration for Logistical Regression:

The preceding graph shows the comparison between the performance of Hadoop and Spark for the first and subsequent iteration for K Means clustering algorithm.

The overall results show the following:

  • Spark outperforms Hadoop by up to 20 times in iterative machine learning and graph applications. The speedup comes from avoiding I/O and deserialization costs by storing data in memory as Java objects.
  • The applications written perform and scale well. Spark can speed up an analytics report that was running on Hadoop by 40 times.
  • When nodes fail, Spark can recover quickly by rebuilding only the lost RDD partitions.
  • Spark was be used to query a 1-TB dataset interactively with latencies of 5-7 seconds.

Spark versus Hadoop for a SORT Benchmark--In 2014, the Databricks team participated in a SORT benchmark test (http://sortbenchmark.org/). This was done on a 100-TB dataset. Hadoop was running in a dedicated data center and a Spark cluster of over 200 nodes was run on EC2. Spark was run on HDFS distributed storage.

Spark was 3 times faster than Hadoop and used 10 times fewer machines. Refer to the following graph:

 

Spark Cluster on Google Compute Engine - DataProc

Cloud Dataproc is a Spark and Hadoop service running on Google Compute Engine. It is a managed service. Cloud Dataproc automation helps create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them.

In this section, we will learn how to create a Spark cluster using DataProc and running a Sample app on it.

Make sure that you have created a Google Compute Engine account and installed Google Cloud SDK (https://cloud.google.com/sdk/gcloud/).

Hadoop and Spark Versions

DataProc supports the following Hadoop and Spark versions. Note that this will change with time as new versions come out:

For more information, go to http://cloud.google.com/dataproc-versions.

In the following steps, we will use Google Cloud Console (the user interface used to create a Spark Cluster and submit a job).

Creating a Cluster

You can create a Spark cluster by going to the Cloud Platform Console. Select the project, and then click Continue to open the Clusters page. You would see the Cloud Dataproc clusters that belong to your project, if you have created any.

Click on the Create a cluster button to open the Create a Cloud Data pros cluster page. Refer to the following screenshot:

Once you click on Create a cluster, a detailed form, which is as shown in the following screenshot, shows up:

The previous screenshot shows the Create a Cloud Dataproc cluster page with the default fields automatically filled in for a new cluster-1 cluster. Take a look at the following screenshot:

You can expand the workers, bucket, network, version, initialization, and access options panel to specify one or more worker nodes, a staging bucket, network, initialization, the Cloud Dataproc image version, actions, and project-level access for your cluster. Providing these values is optional.

The default cluster is created with no worker nodes, an auto-created staging bucket, and a default network It also has the latest released Cloud Dataproc image version. You can change these default settings:

Once you have configured all fields on the page, click on the Create button to create the cluster. The cluster name created appears on the Clusters page. The status is updated to Running once the spark cluster is created.

Click on the cluster name created earlier to open the cluster details page. It also has a Overview tab and the CPU utilization graph selected.

You can examine jobs, instances, and so on for the cluster from the other tabs.

Submitting a Job

To submit a job from the Cloud Platform Console to the cluster, go to the Cloud Platform UI. Select the appropriate project and then click on Continue. The first time you submit a job, the following dialog appears:

Click on Submit a job:

To submit a Spark sample job, fill the fields on the Submit a job page, as follows:

  1. Select a cluster name from the cluster list on the screen.
  2. Set Job type toSpark.
  3. Add file:///usr/lib/spark/lib/spark-examples.jar to Jar files. Here, file:/// denotes a Hadoop LocalFileSystem scheme; Cloud Dataproc installs /usr/lib/spark/lib/spark-examples.jar on the cluster's master node when it creates the cluster. Alternatively, you can specify a Cloud Storage path (gs://my-bucket/my-jarfile.jar) or an HDFS path (hdfs://examples/myexample.jar) to one of the custom jars.
  4. Set Main class or jar to org.apache.spark.examples.SparkPi.
  5. Set Arguments to the single argument 1000.

Click on Submit to start the job.

Once the job starts, it is added to the Jobs list. Refer to the following screenshot:

Once the job is complete, its status changes:

Take a look at the job output as listed here.

Execute the command from the terminal with the appropriate Job ID.

In our case, the Job ID was 1ed4d07f-55fc-45fe-a565-290dcd1978f7 and project-ID was rd-spark-1; hence, the command looks like this:

  $ gcloud beta dataproc --project=rd-spark-1 jobs wait 1ed4d07f-
55fc-45fe-a565-290dcd1978f7

The (abridged) output is shown here:

Waiting for job output...
16/01/28 10:04:29 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger
started

16/01/28 10:04:29 INFO Remoting: Starting remoting
...
Submitted application application_1453975062220_0001
Pi is roughly 3.14157732

You can also SSH into the Spark Instance and run spark-shell in the interactive mode.

 

Summary

In this chapter, we covered how to set up Spark locally on our own computer as well as in the cloud as a cluster running on Amazon EC2. You learned how to run Spark on top of Amazon's Elastic Map Reduce (EMR). You also learned how to use Google Compute Engine's Spark Service to create a cluster and run a simple job. We discussed the basics of Spark's programming model and API using the interactive Scala console, and we wrote the same basic Spark program in Scala, Java, R, and Python. We also compared the performance metrics of Hadoop versus Spark for different machine learning algorithms as well as SORT benchmark tests.

In the next chapter, we will consider how to go about using Spark to create a machine learning system.

About the Authors
  • Rajdeep Dua

    Rajdeep Dua has over 18 years experience in the cloud and big data space. He has taught Spark and big data at some of the most prestigious tech schools in India: IIIT Hyderabad, ISB, IIIT Delhi, and Pune College of Engineering. He currently leads the developer relations team at Salesforce India. He has also presented BigQuery and Google App Engine at the W3C conference in Hyderabad. He led the developer relations teams at Google, VMware, and Microsoft, and has spoken at hundreds of other conferences on the cloud. Some of the other references to his work can be seen at Your Story and on ACM digital library. His contributions to the open source community relate to Docker, Kubernetes, Android, OpenStack, and Cloud Foundry.

    Browse publications by this author
  • Manpreet Singh Ghotra

    Manpreet Singh Ghotra has more than 15 years experience in software development for both enterprise and big data software. He is currently working at Salesforce on developing a machine learning platform/APIs using open source libraries and frameworks such as Keras, Apache Spark, and TensorFlow. He has worked on various machine learning systems, including sentiment analysis, spam detection, and anomaly detection. He was part of the machine learning group at one of the largest online retailers in the world, working on transit time calculations using Apache Mahout, and the R recommendation system, again using Apache Mahout. With a master's and postgraduate degree in machine learning, he has contributed to, and worked for, the machine learning community.

    Browse publications by this author
Latest Reviews (3 reviews total)
I ordered a book on 8 February 2018 and I have not received the book yet.
Nice starter for developers.
This book is marked by its authors' apparent intention to provide a lot of background information on the statistics and algebra behind machine learning. That, however, is about the only good thing I have to say about it. The book starts with a homily on mathematical concepts that get lost in their application, which is based on some (who knows how) chosen Scala library for algebra. There was a ton of prelusory content that would have been correctly replaced with sufficient attention to a more pragmatic presentation of the main topic. Next comes the actual technical content. The code snippets in the book have Scala, almost always. Whether they pretend to or not, the authors seem to have made no effort to be consistent in the use of applicable programming languages in code listings. This inconsistency is so strong that it invaded even the code base. There's no way one can learn with the intention to code in languages other than Scala. The worst, of course, is how the book was edited. There are many lines that seem to have escaped review. Math equations, diagrams, and charts are some poor copy-and-paste screen shots from other programs. They're so blurry one wouldn't want to see them at any zoom level. If you're a Scala developer, you may possibly still find use for this book (I hope). If you are not, I recommend you skip this one.
Machine Learning with Spark - Second Edition
Unlock this book and the full library FREE for 7 days
Start now