Hands-On Deep Learning with Apache Spark

By Guglielmo Iozzia
    What do you get with a Packt Subscription?

  • Instant access to this title and 7,500+ eBooks & Videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. The Apache Spark Ecosystem

About this book

Deep learning is a subset of machine learning where datasets with several layers of complexity can be processed. Hands-On Deep Learning with Apache Spark addresses the sheer complexity of technical and analytical parts and the speed at which deep learning solutions can be implemented on Apache Spark.

The book starts with the fundamentals of Apache Spark and deep learning. You will set up Spark for deep learning, learn principles of distributed modeling, and understand different types of neural nets. You will then implement deep learning models, such as convolutional neural networks (CNNs), recurrent neural networks (RNNs), and long short-term memory (LSTM) on Spark.

As you progress through the book, you will gain hands-on experience of what it takes to understand the complex datasets you are dealing with. During the course of this book, you will use popular deep learning frameworks, such as TensorFlow, Deeplearning4j, and Keras to train your distributed models.

By the end of this book, you'll have gained experience with the implementation of your models on a variety of use cases.

Publication date:
January 2019
Publisher
Packt
Pages
322
ISBN
9781788994613

 

The Apache Spark Ecosystem

Apache Spark (http://spark.apache.org/) is an open source, fast cluster-computing platform. It was originally created by AMPLab at the University of California, Berkeley. Its source code was later donated to the Apache Software Foundation (https://www.apache.org/). Spark comes with a very fast computation speed because data is loaded into distributed memory (RAM) across a cluster of machines. Not only can data be quickly transformed, but also cached on demand for a variety of use cases. Compared to Hadoop MapReduce, it runs programs up to 100 times faster when the data fits in memory, or 10 times faster on disk. Spark provides support for four programming languages: Java, Scala, Python, and R. This book covers the Spark APIs (and deep learning frameworks) for Scala (https://www.scala-lang.org/) and Python (https://www.python.org/) only.

This chapter will cover the following topics:

  • Apache Spark fundamentals
  • Getting Spark
  • Resilient Distributed Dataset (RDD) programming
  • Spark SQL, Datasets, and DataFrames
  • Spark Streaming
  • Cluster mode using a different manager
 

Apache Spark fundamentals

This section covers the Apache Spark fundamentals. It is important to become very familiar with the concepts that are presented here before moving on to the next chapters, where we'll be exploring the available APIs.

As mentioned in the introduction to this chapter, the Spark engine processes data in distributed memory across the nodes of a cluster. The following diagram shows the logical structure of how a typical Spark job processes information:

Figure 1.1

Spark executes a job in the following way:

Figure 1.2

The Master controls how data is partitioned and takes advantage of data locality while keeping track of all the distributed data computation on the Slave machines. If a certain Slave machine becomes unavailable, the data on that machine is reconstructed on another available machine(s). In standalone mode, the Master is a single point of failure. This chapter's Cluster mode using different managers section covers the possible running modes and explains fault tolerance in Spark.

Spark comes with five major components:

Figure 1.3

These components are as follows:

  • The core engine.
  • Spark SQL: A module for structured data processing.
  • Spark Streaming: This extends the core Spark API. It allows live data stream processing. Its strengths include scalability, high throughput, and fault tolerance.
  • MLib: The Spark machine learning library.
  • GraphX: Graphs and graph-parallel computation algorithms.

Spark can access data that's stored in different systems, such as HDFS, Cassandra, MongoDB, relational databases, and also cloud storage services such as Amazon S3 and Azure Data Lake Storage.

 

Getting Spark

Now, let's get hands-on with Spark so that we can go deeper into the core APIs and libraries. In all of the chapters of this book, I will be referring to the 2.2.1 release of Spark, however, several examples that are presented here should work with the 2.0 release or later. I will put a note when an example is specifically for 2.2+ releases only.

First of all, you need to download Spark from its official website (https://spark.apache.org/downloads.html). The download page should look like this:

Figure 1.4

You need to have JDK 1.8+ and Python 2.7+ or 3.4+ (only if you need to develop using this language). Spark 2.2.1 supports Scala 2.11. The JDK needs to be present on your user path system variable, though, alternatively, you could have your user JAVA_HOME environment variable pointing to a JDK installation.

Extract the content of the downloaded archive to any local directory. Move to the $SPARK_HOME/bin directory. There, among the other executables, you will find the interactive Spark shells for Scala and Python. They are the best way to get familiar with this framework. In this chapter, I am going to present examples that you can run through these shells.

You can run a Scala shell using the following command:

$SPARK_HOME/bin/spark-shell.sh

If you don't specify an argument, Spark assumes that you're running locally in standalone mode. Here's the expected output to the console:

Spark context Web UI available at http://10.72.0.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1518131682342).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

The web UI is available at the following URL: http://<host>:4040.

It will give you the following output:

Figure 1.5

From there, you can check the status of your jobs and executors.

From the output of the console startup, you will notice that two built-in variables, sc and spark, are available. sc represents the SparkContext (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext), which in Spark < 2.0 was the entry point for each application. Through the Spark context (and its specializations), you can get input data from data sources, create and manipulate RDDs (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), and attain the Spark primary abstraction before 2.0. The RDD programming section will cover this topic and other operations in more detail. Starting from release 2.0, a new entry point, SparkSession (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession), and a new main data abstraction, the Dataset (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), were introduced. More details on them are presented in the following sections. The SparkContext is still part of the Spark API so that compatibility with existing frameworks not supporting Spark sessions is ensured, but the direction the project has taken is to move development to use the SparkSession.

Here's an example of how to read and manipulate a text file and put it into a Dataset using the Spark shell (the file used in this example is part of the resources for the examples that are bundled with the Spark distribution):

scala> spark.read.textFile("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
res5: org.apache.spark.sql.Dataset[String] = [value: string]

The result is a Dataset instance that contains the file lines. You can then make several operations on this Dataset, such as counting the number of lines:

scala> res5.count()
res6: Long = 3

You can also get the first line of the Dataset:

scala> res5.first()
res7: String = Michael, 29

In this example, we used a path on the local filesystem. In these cases, the file should be accessible from the same path by all of the workers, so you will need to copy the file across all workers or use a network-mounted shared filesystem.

To close a shell, you can type the following:

:quit

To see the list of all of the available shell commands, type the following:

scala> :help

All commands can be abbreviated, for example, :he instead of :help.

The following is the list of commands:

Commands Purpose
:edit <id>|<line> Edit history
:help [command] Prints summary or command-specific help
:history [num] Shows history (optional num is commands to show)
:h? <string> Search history
:imports [name name ...] Show import history, identifying the sources of names
:implicits [-v] Show the implicits in scope
:javap <path|class> Disassemble a file or class name
:line <id>|<line> Place line(s) at the end of history
:load <path> Interpret lines in a file
:paste [-raw] [path] Enter paste mode or paste a file
:power Enable power user mode
:quit Exit the interpreter
:replay [options] Reset the repl and replay on all previous commands
:require <path> Add a jar to the classpath
:reset [options] Reset the repl to its initial state, forgetting all session entries
:save <path> Save the replayable session to a file
:sh <command line> Run a shell command (the result is implicitly => List[String])
:settings <options> Update compiler options, if possible; see reset
:silent Disable or enable the automatic printing of results
:type [-v] <expr> Display the type of expression without evaluating it
:kind [-v] <expr> Display the kind of expression
:warnings Show the suppressed warnings from the most recent line that had any

Like Scala, an interactive shell is available for Python. You can run it using the following command:

$SPARK_HOME/bin/pyspark.sh

A built-in variable named spark representing the SparkSession is available. You can do the same things as for the Scala shell:

>>> textFileDf = spark.read.text("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
>>> textFileDf.count()
3
>>> textFileDf.first()
Row(value='Michael, 29')

Unlike Java and Scala, Python is more dynamic and is not strongly typed. Therefore, a DataSet in Python is a DataSet[Row], but you can call it a DataFrame so that it's consistent with the DataFrame concept of the Pandas framework (https://pandas.pydata.org/).

To close a Python shell, you can type the following:

quit()

Interactive shells aren't the only choice for running code in Spark. It is also possible to implement self-contained applications. Here's an example of reading and manipulating a file in Scala:

import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/usr/spark-2.2.1/examples/src/main/resources/people.txt"
val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}

Applications should define a main() method instead of extending scala.App. Note the code to create SparkSession:

val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()

It follows the builder factory design pattern.

Always explicitly close the session before ending the program execution:

spark.stop()

To build the application, you can use a build tool of your choice (Maven, sbt, or Gradle), adding the dependencies from Spark 2.2.1 and Scala 2.11. Once a JAR file has been generated, you can use the $SPARK_HOME/bin/spark-submit command to execute it, specifying the JAR filename, the Spark master URL, and a list of optional parameters, including the job name, the main class, the maximum memory to be used by each executor, and many others.

The same self-contained application could have been implemented in Python as well:

from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

This can be saved in a .py file and submitted through the same $SPARK_HOME/bin/spark-submit command for execution.

 

RDD programming

In general, every Spark application is a driver program that runs the logic that has been implemented for it and executes parallel operations on a cluster. In accordance with the previous definition, the main abstraction provided by the core Spark framework is the RDD. It is an immutable distributed collection of data that is partitioned across machines in a cluster. Operations on RDDs can happen in parallel.

Two types of operations are available on an RDD:

  • Transformations
  • Actions

A transformation is an operation on an RDD that produces another RDD, while an action is an operation that triggers some computation and then returns a value to the master or can be persisted to a storage system. Transformations are lazy—they aren't executed until an action is invoked. Here's the strength point of Spark—Spark masters and their drivers both remember the transformations that have been applied to an RDD, so if a partition is lost (for example, a slave goes down), it can be easily rebuilt on some other node of the cluster.

The following table lists some of the common transformations supported by Spark:

Transformation Purpose
map(func) Returns a new RDD by applying the func function on each data element of the source RDD.
filter(func) Returns a new RDD by selecting those data elements for which the applied func function returns true.
flatMap(func) This transformation is similar to map: the difference is that each input item can be mapped to zero or multiple output items (the applied func function should return a Seq).
union(otherRdd) Returns a new RDD that contains the union of the elements in the source RDD and the otherRdd argument.
distinct([numPartitions]) Returns a new RDD that contains only the distinct elements of the source RDD.
groupByKey([numPartiotions]) When called on an RDD of (K, V) pairs, it returns an RDD of (K, Iterable<V>) pairs. By default, the level of parallelism in the output RDD depends on the number of partitions of the source RDD. You can pass an optional numPartitions argument to set a different number of partitions.
reduceByKey(func, [numPartitions])

When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs, where the values for each key are aggregated using the given reduce func function, which must be of type (V,V) => V. The same as for the groupByKey transformation, the number of reduce partitions is configurable through an optional numPartitions second argument.

sortByKey([ascending], [numPartitions]) When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument. The number of partitions for the output RDD is configurable through an optional numPartitions second argument.
join(otherRdd, [numPartitions]) When called on RDDs of type (K, V) and (K, W), it returns an RDD of (K, (V, W)) pairs with all pairs of elements for each key. It supports left outer join, right outer join, and full outer join. The number of partitions for the output RDD is configurable through an optional numPartitions second argument.

The following table lists some of the common actions supported by Spark:

Action Purpose
reduce(func) Aggregates the elements of an RDD using a given function, func (this takes two arguments and returns one). To ensure the correct parallelism at compute time, the reduce function, func, has to be commutative and associative.
collect() Returns all the elements of an RDD as an array to the driver.
count() Returns the total number of elements in an RDD.
first() Returns the first element of an RDD.
take(n) Returns an array containing the first n elements of an RDD.
foreach(func) Executes the func function on each element of an RDD.
saveAsTextFile(path) Writes the elements of an RDD as a text file in a given directory (with the absolute location specified through the path argument) in the local filesystem, HDFS, or any other Hadoop-supported filesystem. This is available for Scala and Java only.
countByKey() This action is only available on RDDs of type (K, V) it returns a hashmap of (K, Int) pairs, where K is a key of the source RDD and its value is the count for that given key, K.

Now, let's understand the concepts of transformation and action through an example that could be executed in the Scala shell—this finds the N most commonly used words in an input text file. The following diagram depicts a potential implementation for this problem:

Figure 1.6

Let's translate this into code.

First of all, let's load the content of a text file into an RDD of strings:

scala> val spiderman = sc.textFile("/usr/spark-2.2.1/tests/spiderman.txt")
spiderman: org.apache.spark.rdd.RDD[String] = /usr/spark-2.2.1/tests/spiderman.txt MapPartitionsRDD[1] at textFile at <console>:24

Then, we will apply the necessary transformations and actions:

scala> val topWordCount = spiderman.flatMap(str=>str.split(" ")).filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case(word, count) => (count, word)}.sortByKey(false)
topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[9] at sortByKey at <console>:26

Here, we have the following:

  • flatMap(str=>str.split(" ")): Splits each line into single words
  • filter(!_.isEmpty): Removes empty strings
  • map(word=>(word,1)): Maps each word into a key-value pair
  • reduceByKey(_+_): Aggregates the count
  • map{case(word, count) => (count, word)}: Reverses the (word, count) pairs to (count, word)
  • sortByKey(false): Sorts by descending order

Finally, print the five most used words in the input content to the console:

scala> topWordCount.take(5).foreach(x=>println(x))
(34,the)
(28,and)
(19,of)
(19,in)
(16,Spider-Man)

The same could be achieved in Python in the following way:

from operator import add
spiderman = spark.read.text("/usr/spark-2.2.1/tests/spiderman.txt")
lines = spiderman.rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: (x[1],x[0])) \
.sortByKey(False)

The result, of course, is the same as for the Scala example:

>> counts.take(5)
[(34, 'the'), (28, 'and'), (19, 'in'), (19, 'of'), (16, 'Spider-Man')]

Spark can persist RDDs (and Datasets as well) in memory while executing operations on them. Persisting and caching are synonyms in Spark. When persisting an RDD, each node of the cluster stores the RDD partitions that it needs to compute in memory and reuses them in further actions on the same RDD (or RDDs that have been derived from it through some transformations). This is the reason why future actions execute much faster. It is possible to mark an RDD to be persisted using its persist() method. The first time an action is executed on it, it will be kept in memory on the cluster's nodes. The Spark cache is fault-tolerant—this means that, if for any reason all of the partitions of an RDD are lost, it will be automatically recalculated using the transformations that created it. A persisted RDD can be stored using different storage levels. Levels can be set by passing a StorageLevel object to the persist() method of the RDD. The following table lists all of the available storage levels and their meanings:

Storage Level Purpose
MEMORY_ONLY This is the default storage level. It stores RDDs as deserialized Java objects in memory. In those cases where an RDD shouldn't fit in memory, some of its partitions won't be cached and will be recalculated on the fly when needed.
MEMORY_AND_DISK It stores RDDs as deserialized Java objects in memory first, but, in those cases where an RDD shouldn't fit in memory, it stores some partitions on disk (this is the main difference between MEMORY_ONLY), and reads them from there when needed.
MEMORY_ONLY_SER It stores RDDs as serialized Java objects. Compared to MEMORY_ONLY, this is more space-efficient, but more CPU-intensive in read operations. This is available for JVM languages only.
MEMORY_AND_DISK_SER Is similar to MEMORY_ONLY_SER (it stores RDDs as serialized Java objects), with the main difference being that it stores partitions that don't fit in memory to disk. This is available only for JVM languages.
DISK_ONLY It stores the RDD partitions on disk only.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, and so on The same as the two preceding levels (MEMORY_ONLY and MEMORY_AND_DISK), but each partition is replicated on two cluster nodes.
OFF_HEAP Similar to MEMORY_ONLY_SER, but it stores data in off-heap memory (assuming off-heap memory is enabled). Please be careful when using this storage level as it is still experimental.

When a function is passed to a Spark operation, it is executed on a remote cluster node that will work on separate copies of all the variables that are used in the function. Once done, the variables will be copied to each machine. There will be no updates to the variables on the remote machine when propagated back to the driver program. It would be inefficient to support general, read-write shared variables across tasks.

However, there are two limited types of shared variables that are available in Spark for two common usage patterns broadcast variables and accumulators.

One of the most common operations in Spark programming is to perform joins on RDDs to consolidate data by a given key. In these cases, it is quite possible to have large Datasets sent around to slave nodes that host the partitions to be joined. You can easily understand that this situation presents a huge performance bottleneck, as network I/O is 100 times slower than RAM access. To mitigate this issue, Spark provides broadcast variables, which are broadcast to slave nodes. RDD operations on the nodes can quickly access the broadcast variable value. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication costs. Broadcast variables are created from a variable, v, by calling the SparkContext.broadcast(v) method. The broadcast variable is a wrapper around v, and its value can be obtained by calling the value method. Here's an example in Scala that you can run through the Spark shell:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

After its creation, the broadcast variable, broadcastVar, can be used in any function that's executed on the cluster, but not the initial value, v, as this prevents v being shipped to all the nodes more than once. To ensure that all the nodes get the same value of the broadcast variable, v must not be modified after broadcastVar has been broadcast.

Here's the code for the same example in Python:

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

To aggregate information across executors in a Spark cluster, accumulator variables should be used. The fact that they are added through an associative and commutative operation ensures their efficient support in parallel computation. Spark natively provides support for the accumulators of numeric types—they can be created by calling SparkContext.longAccumulator() (to accumulate values of type Long) or SparkContext.doubleAccumulator() (to accumulate values of type Double) methods.

However, it is possible to programmatically add support for other types. Any task running on a cluster can add to an accumulator using the add method, but they cannot read its value this operation is only allowed for the driver program, which uses its value method. Here's a code example in Scala:

scala> val accum = sc.longAccumulator("First Long Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some
(First Long Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
[Stage 0:> (0 + 0) / 8]


scala> accum.value
res1: Long = 10

In this case, an accumulator has been created, and has assigned a name to it. It is possible to create unnamed accumulators, but a named accumulator will display in the web UI for the stage that modifies that accumulator:

Figure 1.7

This can be helpful for understanding the progress of running stages.

The same example in Python is as follows:

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
>>> accum.value
10

Tracking accumulators in the web UI isn't supported for Python.

Please be aware that Spark guarantees to update accumulators inside actions only. When restarting a task, the accumulators will be updated only once. The same isn't true for transformations.

 

Spark SQL, Datasets, and DataFrames

Spark SQL is the Spark module for structured data processing. The main difference between this API and the RDD API is that the provided Spark SQL interfaces give more information about the structure of both the data and the performed computation. This extra information is used by Spark internally to add extra optimizations through the Catalyst optimization engine, which is the same execution engine that's used regardless of whatever API or programming language is involved.

Spark SQL is commonly used to execute SQL queries (even if this isn't the only way to use it). Whatever programming language supported by Spark encapsulates the SQL code to be executed, the results of a query are returned as a Dataset. A Dataset is a distributed collection of data, and was added as an interface in Spark 1.6. It combines the benefits of RDDs (such as strong typing and the ability to apply useful lambda functions) with the benefits of Spark SQL's optimized execution engine (Catalyst, https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). You can construct a Dataset by starting with Java/Scala objects and then manipulating it through the usual functional transformations. The Dataset API is available in Scala and Java, while Python doesn't have support for it. However, due to the dynamic nature of this programming language, many of the benefits of the Dataset API are already available for it.
Starting from Spark 2.0, the DataFrame and Dataset APIs have been merged into the Dataset API, so a DataFrame is just a Dataset that's been organized into named columns and is conceptually equivalent to a table in an RDBMS, but with better optimizations under the hood (being part of the Dataset API, the Catalyst optimization engine works behind the scenes for DataFrames, too). You can construct a DataFrame from diverse sources, such as structured data files, Hive tables, database tables, and RDDs, to name a few. Unlike the Dataset API, the DataFrame API is available in any of the programming languages that are supported by Spark.

Let's start and get hands-on so that we can better understand the concepts behind Spark SQL. The first full example I am going to show is Scala-based. Start a Scala Spark shell to run the following code interactively.

Let's use people.json as a data source. One of the files that's available as a resource for this example has been shipped along with the Spark distribution and can be used to create a DataFrame that's a Dataset of Rows (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row):

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can print the content of the DataFrame to the console to check that it is what you expected:

scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Before you perform DataFrame operations, you need to import the implicit conversions (such as converting RDDs to DataFrames) and use the $ notation:

import spark.implicits._

Now, you can print the DataFrame schema in a tree format:

scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

Select a single column (let's say name):

scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+

Filter the data:

scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

Then add a groupBy clause:

scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

Select all rows and increment a numeric field:

scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

It is possible to run SQL queries programmatically through the sql function of SparkSession. This function returns the results of the query in a DataFrame, which, for Scala, is a Dataset[Row]. Let's consider the same DataFrame as for the previous example:

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can register it as an SQL temporary view:

df.createOrReplaceTempView("people")

Then, you can execute an SQL query there:

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

The same things can be done in Python as well:

>>> df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

Resulting in the following:

>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

>>> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>>> df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

>>> df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

>>> df.createOrReplaceTempView("people")
>>> sqlDF = spark.sql("SELECT * FROM people")
>>> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Other features of Spark SQL and Datasets (data sources, aggregations, self-contained applications, and so on) will be covered in Chapter 3, Extract, Transform, Load.

 

Spark Streaming

Spark Streaming is another Spark module that extends the core Spark API and provides a scalable, fault-tolerant, and efficient way of processing live streaming data. By converting streaming data into micro batches, Spark's simple batch programming model can be applied in streaming use cases too. This unified programming model makes it easy to combine batch and interactive data processing with streaming. Diverse sources that ingest data are supported (Kafka, Kinesis, TCP sockets, S3, or HDFS, just to mention a few of the popular ones), as well as data coming from them, and can be processed using any of the high-level functions available in Spark. Finally, the processed data can be persisted to RDBMS, NoSQL databases, HDFS, object storage systems, and so on, or consumed through live dashboards. Nothing prevents other advanced Spark components, such as MLlib or GraphX, being applied to data streams:

Figure 1.8

The following diagram shows how Spark Streaming works internally—it receives live input data streams and divides them into batches; these are processed by the Spark engine to generate the final batches of results:

Figure 1.9

The higher-level abstraction of Spark Streaming is the DStream (short for Discretized Stream), which is a wrapper around a continuous flow of data. Internally, a DStream is represented as a sequence of RDDs. A DStream contains a list of other DStreams that it depends on, a function to convert its input RDDs into output ones, and a time interval at which to invoke the function. DStreams are created by either manipulating existing ones, for example, applying a map or filter function (which internally creates MappedDStreams and FilteredDStreams, respectively), or by reading from an external source (the base class in these cases is InputDStream).

Let's implement a simple Scala example—a streaming word count self-contained application. The code used for this class can be found among the examples that are bundled with the Spark distribution. To compile and package it, you need to add the dependency to Spark Streaming to your Maven, Gradle, or sbt project descriptor, along with the dependencies from Spark Core and Scala.

First, we have to create the SparkConf and a StreamingContext (which is the main entry point for any streaming functionality) from it:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

The batch interval has been set to 1 second. A DStream representing streaming data from a TCP source can be created using the ssc streaming context; we need just to specify the source hostname and port, as well as the desired storage level:

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

The returned lines DStream is the stream of data that is going to be received from the server. Each record will be a single line of text that we want to split into single words, thus specifying the space character as a separator:

val words = lines.flatMap(_.split(" "))

Then, we will count those words:

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

The words DStream is mapped (a one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. The last command will print a few of the counts that are generated every second. Each RDD in a DStream contains data from a certain interval any operation applied on a DStream translates to operations on the underlying RDDs:

Figure 1.10

To start the processing after all the transformations have been set up, use the following code:

ssc.start()
ssc.awaitTermination()

Before running this example, first you will need to run netcat (a small utility found in most Unix-like systems) as a data server:

nc -lk 9999

Then, in a different Terminal, you can start the example by passing the following as arguments:

localhost 9999

Any line that's typed into the Terminal and run with the netcat server will be counted and printed on the application screen every second.

Regardless of whether nc shouldn't be available in the system where you run this example, you can implement your own data server in Scala:

import java.io.DataOutputStream
import java.net.{ServerSocket, Socket}
import java.util.Scanner

object SocketWriter {
def main(args: Array[String]) {
val listener = new ServerSocket(9999)
val socket = listener.accept()

val outputStream = new DataOutputStream(socket.getOutputStream())
System.out.println("Start writing data. Enter close when finish");
val sc = new Scanner(System.in)
var str = ""
/**
* Read content from scanner and write to socket.
*/
while (!(str = sc.nextLine()).equals("close")) {
outputStream.writeUTF(str);
}
//close connection now.
outputStream.close()
listener.close()
}
}

The same self-contained application in Python could be as follows:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

DStreams support most parts of the transformations that are available for RDDs. This means that data from input DStreams can be modified in the same way as the data in RDDs. The following table lists some of the common transformations supported by Spark DStreams:

Transformation Purpose
map(func) Returns a new DStream. The func map function is applied to each element of the source DStream.
flatMap(func) The same as for map. The only difference is that each input item in the new DStream can be mapped to 0 or more output items.
filter(func) Returns a new DStream containing only the elements of the source DStream for which the func filter function returned true.
repartition(numPartitions) This is used to set the level of parallelism by creating a different number of partitions.
union(otherStream) Returns a new DStream. It contains the union of the elements in the source DStream and the input otherDStream DStream.
count() Returns a new DStream. It contains single element RDDs that are obtained by counting the number of elements contained in each RDD arriving from the source.
reduce(func) Returns a new DStream. It contains single element RDDs that are obtained by aggregating those in each RDD of the source by applying the func function (which should be associative and commutative to allow for correct parallel computation).
countByValue() Returns a new DStream of (K, Long) pairs, where K is the type of the elements of the source. The value of each key represents its frequency in each RDD of the source.
reduceByKey(func, [numTasks]) Returns a new DStream of (K, V) pairs (for a source DStream of (K, V) pairs). The values for each key are aggregated by applying the reduce func function. To do the grouping, this transformation uses Spark's default number of parallel tasks (which is two in local mode, while it is determined by the config property spark.default.parallelism in cluster mode), but this can be changed by passing an optional numTasks argument.
join(otherStream, [numTasks]) Returns a new DStream of (K, (V, W)) pairs when called on two DStreams of (K, V) and (K, W) pairs, respectively.
cogroup(otherStream, [numTasks]) Returns a new DStream of (K, Seq[V], Seq[W]) tuples when called on two DStreams of (K, V) and (K, W) pairs, respectively.
transform(func) Returns a new DStream. It applies an RDD-to-RDD func function to every RDD of the source.
updateStateByKey(func) Returns a new state DStream. The state for each key in the new DStream is updated by applying the func input function to the previous state and the new values for the key.

Windowed computations are provided by Spark Streaming. As shown in the following diagram, they allow you to apply transformations over sliding windows of data:

Figure 1.11

When a window slides over a source DStream, all its RDDs that fall within that window are taken into account and transformed to produce the RDDs of the returned windowed DStream. Looking at the specific example that's shown in the preceding diagram, the window-based operation is applied over three time units of data and it slides by two. Two parameters need to be specified by any window operation that's used:

  • Window length: The duration of the window
  • Sliding interval: The interval at which the window operation is performed

These two parameters must be multiples of the batch interval of the source DStream.

Let's see how this could be applied to the application that was presented at the beginning of this section. Suppose you want to generate a word count every 10 seconds over the last 60 seconds of data. The reduceByKey operation needs to be applied on the (word, 1) pairs of the DStream over the last 60 seconds of data. This can be achieved with the reduceByKeyAndWindow operation. When translated into Scala code, this is as follows:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(10))

For Python, it is as follows:

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 60, 10)

The following table lists some of the common window operations supported by Spark for DStreams:

Transformation Purpose
window(windowLength, slideInterval) Returns a new DStream. It is based on windowed batches of the source.
countByWindow(windowLength, slideInterval) Returns a sliding window count (based on the windowLength and slideInterval parameters) of elements in the source DStream.
reduceByWindow(func, windowLength, slideInterval) Returns a new single element DStream. It is created by aggregating elements in the source DStream over a sliding interval by applying the func reduce function (which, to allow for correct parallel computation, is associative and commutative).
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) Returns a new DStream of (K, V) pairs (the same K and V as for the source DStream). The values for each key are aggregated using the func input function over batches (defined by the windowLength and slideInterval arguments) in a sliding window. The number of parallel tasks to do the grouping is two (default) in local mode, while in cluster mode this is given by the Spark configuration property spark.default.parallelism. numTask, which is an optional argument to specify a custom number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) This is a more efficient version of the reduceByKeyAndWindow transformation. This time, the reduce value of the current window is calculated incrementally using the reduce values of the previous one. This happens by reducing the new data that enters a window while inverse reducing the old data that leaves the same one. Please note that this mechanism only works if the func reduce function has a corresponding inverse reduce function, invFunc.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) Returns a DStream of (K, Long) pairs (whatever (K, V) pairs the source DStream is made of). The value of each key in the returned DStream is its frequency within a given sliding window (defined by the windowLength and slideInterval arguments). numTask is an optional argument to specify a custom number of tasks.
 

Cluster mode using different managers

The following diagram shows how Spark applications run on a cluster. They are independent sets of processes that are coordinated by the SparkContext object in the Driver Program. SparkContext connects to a Cluster Manager, which is responsible for allocating resources across applications. Once the SparkContext is connected, Spark gets executors across cluster nodes.

Executors are processes that execute computations and store data for a given Spark application. SparkContext sends the application code (which could be a JAR file for Scala or .py files for Python) to the executors. Finally, it sends the tasks to run to the executors:

Figure 1.12

To isolate applications from each other, every Spark application receives its own executor processes. They stay alive for the duration of the whole application and run tasks in multithreading mode. The downside to this is that it isn't possible to share data across different Spark applications to share it, data needs to be persisted to an external storage system.

Spark supports different cluster managers, but it is agnostic to the underlying type.

The driver program, at execution time, must be network addressable from the worker nodes because it has to listen for and accept incoming connections from its executors. Because it schedules tasks on the cluster, it should be executed close to the worker nodes, on the same local area network (if possible).

The following are the cluster managers that are currently supported in Spark:

  • Standalone: A simple cluster manager that makes it easy to set up a cluster. It is included with Spark.
  • Apache Mesos: An open source project that's used to manage computer clusters, and was developed at the University of California, Berkeley.
  • Hadoop YARN: The resource manager available in Hadoop starting from release 2.
  • Kubernetes: An open source platform for providing a container-centric infrastructure. Kubernetes support in Spark is still experimental, so it's probably not ready for production yet.

Standalone mode

For standalone mode, you only need to place a compiled version of Spark on each node of the cluster. All the cluster nodes need to be able to resolve the hostnames of the other cluster members and are routable to one another. The Spark master URL can be configured in the $SPARK_HOME/conf/spark-defaults.conf file on all of the nodes:

spark.master                     spark://<master_hostname_or_IP>:7077

Then, the hostname or IP address of the Spark master node needs to be specified in the $SPARK_HOME/conf/spark-env.sh file on all of the nodes, as follows:

SPARK_MASTER_HOST,               <master_hostname_or_IP>

It is now possible to start a standalone master server by executing the following script:

$SPARK_HOME/sbin/start-master.sh

Once the master has completed, a web UI will be available at the http://<master_hostname_or_IP>:8080 URL. From there, it is possible to obtain the master URL that's to be used when starting the workers. One or more workers can now be started by executing the following script:

$SPARK_HOME/sbin/start-slave.sh <master-spark-URL>

Each worker, after the start, comes with its own web UI, whose URL is http://<worker_hostname_or_IP>:8081.

The list of workers, along with other information about their number of CPUs and memory, can be found in the master's web UI.

The way to do this is to run a standalone cluster manually. It is also possible to use the provided launch scripts. A $SPARK_HOME/conf/slaves file needs to be created as a preliminary step. It must contain the hostnames one per line of all of the machines where the Spark workers should start. Passwordless SSH (short for Secure Shell) for the Spark master to the Spark slaves needs to be enabled to allow remote login for the slave daemon startup and shutdown actions. A cluster can then be launched or stopped using the following shell scripts, which are available in the $SPARK_HOME/sbin directory:

  • start-master.sh: Starts a master instance
  • start-slaves.sh: Starts a slave instance on each machine specified in the conf/slaves file
  • start-slave.sh: Starts a single slave instance
  • start-all.sh: Starts both a master and a number of slaves
  • stop-master.sh: Stops a master that has been started via the sbin/start-master.sh script
  • stop-slaves.sh: Stops all slave instances on the nodes specified in the conf/slaves file
  • stop-all.sh: Stops both a master and its slaves

These scripts must be executed on the machine the Spark master will run on.

It is possible to run an interactive Spark shell against a cluster in the following way:

$SPARK_HOME/bin/spark-shell --master <master-spark-URL>

The $SPARK_HOME/bin/spark-submit script can be used to submit a compiled Spark application to the cluster. Spark currently supports two deploy modes for standalone clusters: client and cluster. In client mode, the driver and the client that submits the application are launched in the same process, while in cluster mode, the driver is launched from one of the worker processes and the client process exits as soon as it completes submitting the application (it doesn't have to wait for the application to finish).
When an application is launched through spark-submit, then its JAR file is automatically distributed to all the worker nodes. Any additional JAR that an application depends on should be specified through the jars flag using a comma as a delimiter (for example, jars, jar1, jar2).

As mentioned in the Apache Spark fundamentals section, in standalone mode, the Spark master is a single point of failure. This means that if the Spark master node should go down, the Spark cluster would stop functioning and all currently submitted or running applications would fail, and it wouldn't be possible to submit new applications.

High availability can be configured using Apache ZooKeeper (https://zookeeper.apache.org/), an open source and highly reliable distributed coordination service, or can be deployed as a cluster through Mesos or YARN, which we will talk about in the following two sections.

Mesos cluster mode

Spark can run on clusters that are managed by Apache Mesos (http://mesos.apache.org/). Mesos is a cross-platform, cloud provider-agnostic, centralized, and fault-tolerant cluster manager, designed for distributed computing environments. Among its main features, it provides resource management and isolation, and the scheduling of CPU and memory across the cluster. It can join multiple physical resources into a single virtual one, and in doing so is different from classic virtualization, where a single physical resource is split into multiple virtual resources. With Mesos, it is possible to build or schedule cluster frameworks such as Apache Spark (though it is not restricted to just this). The following diagram shows the Mesos architecture:

Figure 1.13

Mesos consists of a master daemon and frameworks. The master daemon manages agent daemons running on each cluster node, while the Mesos frameworks run tasks on the agents. The master empowers fine-grained sharing of resources (including CPU and RAM) across frameworks by making them resource offers. It decides how much of the available resources to offer to each framework, depending on given organizational policies. To support diverse sets of policies, the master uses a modular architecture that makes it easy to add new allocation modules through a plugin mechanism. A Mesos framework consists of two components a scheduler, which registers itself with the master to be offered resources, and an executor, a process that is launched on agent nodes to execute the framework's tasks. While it is the master that determines how many resources are offered to each framework, the frameworks' schedulers are responsible for selecting which of the offered resources to use. The moment a framework accepts offered resources, it passes a description of the tasks it wants to execute on them to Mesos. Mesos, in turn, launches the tasks on the corresponding agents.

The advantages of deploying a Spark cluster using Mesos to replace the Spark Master Manager include the following:

  • Dynamic partitioning between Spark and other frameworks
  • Scalable partitioning between multiple instances of Spark

Spark 2.2.1 is designed to be used with Mesos 1.0.0+. In this section, I won't describe the steps to deploy a Mesos cluster I am assuming that a Mesos cluster is already available and running. No particular procedure or patch is required in terms of Mesos installation to run Spark on it. To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master web UI at port 5050:

Figure 1.14

Check that all of the expected machines are present in the Agents tab.

To use Mesos from Spark, a Spark binary package needs to be available in a place that's accessible by Mesos itself, and a Spark driver program needs to be configured to connect to Mesos. Alternatively, it is possible to install Spark in the same location across all the Mesos slaves and then configure the spark.mesos.executor.home property (the default value is $SPARK_HOME) to point to that location.

The Mesos master URLs have the form mesos://host:5050 for a single-master Mesos cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos for a multi-master Mesos cluster when using Zookeeper.

The following is an example of how to start a Spark shell on a Mesos cluster:

$SPARK_HOME/bin/spark-shell --master mesos://127.0.0.1:5050 -c spark.mesos.executor.home=`pwd`

A Spark application can be submitted to a Mesos managed Spark cluster as follows:

$SPARK_HOME/bin/spark-submit --master mesos://127.0.0.1:5050 --total-executor-cores 2 --executor-memory 3G  $SPARK_HOME/examples/src/main/python/pi.py 100

YARN cluster mode

YARN (http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), which was introduced in Apache Hadoop 2.0, brought significant improvements in terms of scalability, high availability, and support for different paradigms. In the Hadoop version 1 MapReduce framework, job execution was controlled by types of processes—a single master process called JobTracker coordinates all the jobs running on the cluster and assigns map and reduce tasks to run on the TaskTrackers, which are a number of subordinate processes running assigned tasks and periodically reporting the progress to the JobTracker. Having a single JobTracker was a scalability bottleneck. The maximum cluster size was a little more than 4,000 nodes, with the number of concurrent tasks limited to 40,000. Furthermore, the JobTracker was a single point of failure and the only available programming model was MapReduce.

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling or monitoring into separate daemons. The idea is to have a global ResourceManager and per-application ApplicationMaster (App Mstr). An application is either a single job or a DAG of jobs. The following is a diagram of YARN's architecture:

Figure 1.15

The ResourceManager and the NodeManager form the YARN framework. The ResourceManager decides on resource usage across all the running applications, while the NodeManager is an agent running on any machine in the cluster and is responsible for the containers by monitoring their resource usage (including CPU and memory) and reporting to the ResourceManager. The ResourceManager consists of two components the scheduler and the ApplicationsManager. The scheduler is the component that's responsible for allocating resources to the various applications running, and it doesn't perform any monitoring of applications' statuses, nor offer guarantees about restarting any failed tasks. It performs scheduling based on an application's resource requirements.

The ApplicationsManager accepts job submissions and provides a service to restart the App Mstr container on any failure. The per-application App Mstr is responsible for negotiating the appropriate resource containers from the scheduler and monitoring their status and progress. YARN, by its nature, is a general scheduler, so support for non-MapReduce jobs (such as Spark jobs) is available for Hadoop clusters.

Submitting Spark applications on YARN

To launch Spark applications on YARN, the HADOOP_CONF_DIR or YARN_CONF_DIR env variable needs to be set and pointing to the directory that contains the client-side configuration files for the Hadoop cluster. These configurations are needed to connect to the YARN ResourceManager and to write to HDFS. This configuration is distributed to the YARN cluster so that all the containers used by the Spark application have the same configuration. To launch Spark applications on YARN, two deployment modes are available:

  • Cluster mode: In this case, the Spark driver runs inside an application master process that's managed by YARN on the cluster. The client can finish its execution after initiating the application.
  • Client mode: In this case, the driver runs and the client runs in the same process. The application master is used for the sole purpose of requesting resources from YARN.

Unlike the other modes, in which the master's address is specified in the master parameter, in YARN mode, the ResourceManager's address is retrieved from the Hadoop configuration. Therefore, the master parameter value is always yarn.

You can use the following command to launch a Spark application in cluster mode:

$SPARK_HOME/bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

In cluster mode, since the driver runs on a different machine than the client, the SparkContext.addJar method doesn't work with the files that are local to the client. The only choice is to include them using the jars option in the launch command.

Launching a Spark application in client mode happens the same way—the deploy-mode option value needs to change from cluster to client.

Kubernetes cluster mode

Kubernetes (https://kubernetes.io/) is an open source system that's used automate the deployment, scaling, and management of containerized applications. It was originally implemented at Google and then open sourced in 2014. The following are the main concepts of Kubernetes:

  • Pod: This is the smallest deployable unit of computing that can be created and managed. A pod can be seen as a group of one or more containers that share network and storage space, which also contains a specification for how to run those containers.
  • Deployment: This is a layer of abstraction whose primary purpose is to declare how many replicas of a pod should be running at a time.
  • Ingress: This is an open channel for communication with a service running in a pod.
  • Node: This is a representation of a single machine in a cluster.
  • Persistent volume: This provides a filesystem that can be mounted to a cluster, not to be associated with any particular node. This is the way Kubernetes persists information (data, files, and so on).

The following diagram (source: https://d33wubrfki0l68.cloudfront.net/518e18713c865fe67a5f23fc64260806d72b38f5/61d75/images/docs/post-ccm-arch.png) shows the Kubernetes architecture:

Figure 1.16

The main components of the Kubernetes architecture are as follows:

  • Cloud controller manager: It runs the Kubernetes controllers
  • Controllers: There are four of them—node, route, service, and PersistenceVolumeLabels
  • Kubelets: The primary agents that run on nodes

The submission of Spark jobs to a Kubernetes cluster can be done directly through spark-submit. Kubernetes requires that we supply Docker (https://www.docker.com/) images that can be deployed into containers within pods. Starting from the 2.3 release, Spark provides a Dockerfile ($SPARK_HOME/kubernetes/dockerfiles/Dockerfile, which can also be customized to match specific applications' needs) and a script ($SPARK_HOME/bin/docker-image-tool.sh) that can be used to build and publish Docker images that are to be used within a Kubernetes backend. The following is the syntax that's used to build a Docker image through the provided script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag build

This following is the syntax to push an image to a Docker repository while using the same script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag push

A job can be submitted in the following way:

$SPARK_HOME/bin/spark-submit \
--master k8s://https://<k8s_hostname>:<k8s_port> \
--deploy-mode cluster \
--name <application-name> \
--class <package>.<ClassName> \
--conf spark.executor.instances=<instance_count> \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/<sparkjob>.jar

Kubernetes requires application names to contain only lowercase alphanumeric characters, hyphens, and dots, and to start and end with an alphanumeric character.

The following diagram shows the way the submission mechanism works:

Figure 1.17

Here's what happens:

  • Spark creates a driver that's running within a Kubernetes pod
  • The driver creates the executors, which also run within Kubernetes pods, and then connects to them and executes application code
  • At the end of the execution, the executor pods terminate and are cleaned up, while the driver pod still persists logs and remains in a completed state (which means that it doesn't use cluster computation or memory resources) in the Kubernetes API (until it's eventually garbage collected or manually deleted)

 

Summary

In this chapter, we became familiar with Apache Spark and most of its main modules. We started to use the available Spark shells and wrote our first self-contained application using the Scala and Python programming languages. Finally, we explored different ways of deploying and running Spark in cluster mode. Everything we have learned about so far is necessary for understanding the topics that are presented from Chapter 3, Extract, Transform, Load, onward. If you have any doubts about any of the presented topics, I suggest that you go back and read this chapter again before moving on.

In the next chapter, we are going to explore the basics of DL, with an emphasis on some particular implementations of multi-layer neural networks.

About the Author

  • Guglielmo Iozzia

    Guglielmo Iozzia is currently a big data delivery manager at Optum in Dublin. He completed his master's degree in biomedical engineering at the University of Bologna. After graduation, he joined a start-up IT company in Bologna that had implemented a new system to manage online payments. There, he worked on complex Java projects for different customers in different areas. He has also worked at the IT department of FAO, an agency of the United Nations. In 2013, he had the chance to join IBM in Dublin. There, he improved his DevOps skills, working mostly on cloud-based applications. He is a golden member, writes articles at DZone, and maintains a personal blog to share his findings and thoughts about various tech topics.

    Browse publications by this author
Hands-On Deep Learning with Apache Spark
Unlock this book and the full library FREE for 7 days
Start now