In this chapter, we will cover the components of Spark. You will learn them through the following recipes:
Initializing SparkContext
Working with Spark's Python and Scala shells
Building standalone applications
Working with the Spark programming model
Working with pair RDDs
Persisting RDDs
Loading and saving data
Creating broadcast variables and accumulators
Submitting applications to a cluster
Working with DataFrames
Working with Spark Streaming
Apache Spark is a general-purpose distributed computing engine for large-scale data processing. It is an open source initiative from AMPLab and donated to the Apache Software Foundation. It is one of the top-level projects under the Apache Software Foundation. Apache Spark offers a data abstraction called Resilient Distributed Datasets (RDDs) to analyze the data in parallel on top of a cluster of resources. The Apache Spark framework is an alternative to Hadoop MapReduce. It is up to 100X faster than MapReduce and offers the best APIs for iterative and expressive data processing. This project is written in Scala and it offers client APIs in Scala, Java, Python, and R.
This recipe shows how to initialize the SparkContext
object as a part of many Spark applications. SparkContext
is an object which allows us to create the base RDDs. Every Spark application must contain this object to interact with Spark. It is also used to initialize StreamingContext
, SQLContext
and HiveContext
.
To step through this recipe, you will need a running Spark Cluster in any one of the modes that is, local, standalone, YARN, or Mesos. For installing Spark on a standalone cluster, please refer to http://spark.apache.org/docs/latest/spark-standalone.html. Install Hadoop (optional), Scala, and Java. Please download the data from the following location:
https://github.com/ChitturiPadma/datasets/blob/master/stocks.txt
Let's see how to initialize SparkContext:
Invoke spark-shell:
$SPARK_HOME/bin/spark-shell --master <master type> Spark context available as sc.
Invoke PySpark:
$SPARK_HOME/bin/pyspark --master <master type> SparkContext available as sc
Invoke SparkR:
$SPARK_HOME/bin/sparkR --master <master type> Spark context is available as sc
Now, let's initiate
SparkContext
in different standalone applications, such as Scala, Java, and Python:
Scala:
import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkContextExample { def main(args: Array[String]) { val stocksPath = "hdfs://namenode:9000/stocks.txt" val conf = new SparkConf().setAppName("Counting Lines").setMaster("spark://master:7077") val sc = new SparkContext(conf) val data = sc.textFile(stocksPath, 2) val totalLines = data.count() println("Total number of Lines: %s".format(totalLines)) } }
Java:
import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; public class SparkContextExample { public static void main(String[] args) { String stocks = "hdfs://namenode:9000/stocks.txt" SparkConf conf = new SparkConf().setAppName("Counting Lines").setMaster("spark://master:7077"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logData = sc.textFile(stocks); long totalLines = stocks.count(); System.out.println("Total number of Lines " + totalLines); } }
Python:
from pyspark import SparkContext stocks = "hdfs://namenode:9000/stocks.txt" sc = SparkContext("<master URI>", "ApplicationName") data = sc.textFile(stocks) totalLines = data.count() print("Total Lines are: %i" % (totalLines))
In the preceding code snippets, new SparkContext(conf)
, new JavaSparkContext(conf)
, and SparkContext("<master URI>", "ApplicationName")
initialize SparkContext in three different languages: Scala, Java, and Python. SparkContext is the starting point for Spark functionality. It represents the connection to a Spark Cluster, and can be used to create RDDs, accumulators, and broadcast variables on that cluster.
SparkContext is created on the driver. It connects with the cluster. Initially, RDDs are created using SparkContext. It is not serialized. Hence it cannot be shipped to workers. Also, only one SparkContext is available per application. In the case of Streaming applications and Spark SQL modules, StreamingContext and SQLContext are created on top of SparkContext.
To understand more about the SparkContext object and its methods, please refer to this documentation page: https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkContext.
This recipe explains the spark-shell and PySpark command-line interface tools from the Apache Spark project. Spark-shell is the Scala-based command line interface tool and PySpark is the Python-based command-line tool used to develop Spark interactive applications. They are already initialized with SparkContext, SQLContext, and HiveContext.
Both spark-shell and PySpark are available in the bin
directory of SPARK_HOME
, that is, SPARK_HOME/bin
:
Invoke spark-shell as follows:
$SPARK_HOME/bin/spark-shell [Options] $SPARK_HOME/bin/spark-shell --master <master type> i.e., local, spark, yarn, mesos. $SPARK_HOME/bin/spark-shell --master spark://<sparkmasterHostName>:7077 Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79) Type in expressions to have them evaluated. Type :help for more information. 16/01/17 20:05:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0) SQL context available as sqlContext. scala> val data = sc.textFile("hdfs://namenode:9000/stocks.txt"); data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] textFile at <console>:27 scala> data.count() res0: Long = 57391 scala> data.first() res1: String = NYSE CLI 2009-12-31 35.39 35.70 34.50 34.5 890100 34.12 scala> data.top(2) res5: Array[String] = Array(NYSE CZZ 2009-12-31 8.77 8.77 8.67 8.70 694200 8.70, NYSE CZZ 2009- 12-30 8.71 8.80 8.46 8.68 1588200 8.68) scala> val mydata = data.map(line => line.toLowerCase()) mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:29 scala> mydata.collect() res6: Array[String] = Array(nyse cli 2009-12-31 35.39 35.70 34.50 34.57 890100 34.12, nyse cli 2009-12-30 35.22 35.46 34.96 35.40 516900 34.94, nyse cli 2009-12-29 35.69 35.95 35.21 35.34 556500 34.88, nyse cli 2009-12-28 35.67 36.23 35.49 35.69 565000 35.23, nyse cli 2009-12-24 35.38 35.60 35.19 35.47 230200 35.01, nyse cli 2009-12-23 35.13 35.51 35.07 35.21 520200 34.75, nyse cli 2009-12-22 34.76 35.04 34.71 35.04 564600 34.58, nyse cli 2009-12-21 34.65 34.74 34.41 34.73 428400 34.28, nyse cli 2009-12-18 34.11 34.38 33.73 34.22 1152600 33.77, nyse cli 2009-12-17 34.18 34.53 33.84 34.21 1082600 33.76, nyse cli 2009-12-16 34.79 35.10 34.48 34.66 1007900 34.21, nyse cli 2009-12-15 34.60 34.91 34.39 34.84 813200 34.39, nyse cli 2009-12-14 34.21 34.90 33.86 34.82 987700 34.37, nyse cli 200...)
Invoke PySpark as follows:
$SPARK_HOME/bin/pyspark [options] $SPARK_HOME/bin/pyspark --master <master type> i.e., local, spark, yarn, mesos $SPARK_HOME/bin/pyspark --master spark:// sparkmasterHostName:7077 Python 2.7.6 (default, Sep 9 2014, 15:04:36) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j- defaults.properties 16/01/17 20:25:48 INFO SparkContext: Running Spark version 1.6.0 ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Python version 2.7.6 (default, Sep 9 2014 15:04:36) SparkContext available as sc, HiveContext available as sqlContext. >>> data = sc.textFile"hdfs://namenode:9000/stocks.txt"); >>> data.count() 57391 >>> data.first() NYSE CLI 2009-12-31 35.39 35.70 34.50 34.57 890100 34.12 >>> data.top(2) ['NYSE CZZ 2009-12-31 8.77 8.77 8.67 8.70 694200 8.70', 'NYSE CZZ 2009-12-30 8.71 8.80 8.46 8.68 1588200 8.68' ] >>> data.collect() ['NYSE CLI 2009-12-31 35.39 35.70 34.50 34.57 890100 34.12, 'NYSE CLI 2009-12-30 35.22 35.46 34.96 35.40 516900 34.94, 'NYSE CLI 2009-12-29 35.69 35.95 35.21 35.34 556500 34.88', 'NYSE CLI 2009-12-28 35.67 36.23 35.49 35.69 565000 35.23', 'NYSE CLI 2009-12-24 35.38 35.60 35.19 35.47 230200 35.01', 'NYSE CLI 2009-12-23 35.13 35.51 35.07 35.21 520200 34.75', 'NYSE CLI 2009-12-22 34.76 35.04 34.71 35.04 564600 34.58', 'NYSE CLI 2009-12-21 34.65 34.74 34.41 34.73 428400 34.28', 'NYSE CLI 2009-12-18 34.11 34.38 33.73 34.22 1152600 33.77', 'NYSE CLI 2009-12-17 34.18 34.53 33.84 34.21 1082600 33.76', 'NYSE CLI 2009-12-16 34.79 35.10 34.48 34.66 1007900 34.21', 'NYSE CLI 2009-12-15 34.60 34.91 34.39 34.84 813200 34.39', 'NYSE CLI 2009-12-14 34.21 34.90 33.86 34.82 987700 34.37', 'NYSE CLI 200...
In the preceding code snippets, Spark RDD transformations and actions are executed interactively in both Spark-shell and PySpark. They work in Read Eval Print Loop (REPL) style and represent a computer environment such as a Window console or Unix/Linux shell where a command is entered and the system responds with an output in interactive mode.
Both Spark-shell and PySpark are better command-line interfaces for developing Spark applications interactively. They have advanced features for application prototyping and quicker development. Also, they have numerous options for customizing them.
The Apache Spark documentation offers plenty of examples using these two command-line interfaces; please refer to this documentation page: http://spark.apache.org/docs/latest/quick-start.html#interactive-analysis-with-the-spark-shell.
This recipe explains how to develop and build Spark standalone applications using programming languages such as Scala, Java, Python, and R. The sample application under this recipe is written in Scala.
Install any IDE tool for application development (the preferred one is Eclipse). Install the SBT build tool to build the project. Create the Scala project and add all the necessary libraries to the build.sbt
file. Add this project to Eclipse. SBT is a build tool like Maven for Scala projects.
Develop a Spark standalone application using the Eclipse IDE as follows:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkContextExample { def main(args: Array[String]) { val file="hdfs://namenode:9000/stocks.txt" val conf = new SparkConf().setAppName("Counting Lines").setMaster("spark://master:7077") val sc = new SparkContext(conf) val data = sc.textFile(file, 2) val totalLines = data.count() println("Total number of Lines: %s".format(totalLines))}}
Now go to the project directory and build the project using
sbt assembly
andsbt package
manually or build it using eclipse:~/SparkProject/ SparkContextExample/sbt assembly ~/SparkProject/ SparkContextExample/sbt package
sbt assembly
compiles the program and generates the JAR as SparkContextExample-assembly-<version>.jar
. The sbt package
generates the jar as SparkContextExample_2.10-1.0.jar
. Both the jars are generated in the path ~/SparkProject/SparkContextExample/target/scala-2.10
. Submit SparkContextExample-assembly-<version>.jar
to the Spark cluster using the spark-submit
shell script under the bin
directory of SPARK_HOME
.
We can develop a variety of complex Spark standalone applications to analyze the data in various ways. When working with any third-party libraries, include the corresponding dependency jars in the build.sbt
file. Invoking sbt update
will download the respective dependencies and will include them in the project classpath.
The Apache Spark documentation covers how to build standalone Spark applications. Please refer to this documentation page: https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications.
This recipe explains the fundamentals of the Spark programming model. It covers the RDD basics that is, Spark provides a Resilient Distributed Dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel. It also covers how to create and perform transformations and actions on RDDs.
Let's create RDDs and apply a few transformations such as
map
andfilter
, and afew
actions such ascount
,take
,top
, and so on, in Spark-shell:scala> val data = Array(1, 2, 3, 4, 5) scala> val rddData = sc.parallelize(data) scala> val mydata = data.filter(ele => ele%2==0) mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:29 scala> val mydata = data.map(ele => ele+2) mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:30 scala> mydata.count() res1: Long = 5 scala> mydata.take(2) res2:Array[Int] = Array(1,2) scala> mydata.top(1) res2:Array[Int] = Array(5)
Now let's work with the transformations and actions in a Spark standalone application:
object SparkTransformations { def main(args:Array[String]){ val conf = new SparkConf conf.setMaster("spark://master:7077") val sc = new SparkContext(conf) val baseRdd1 = sc.parallelize(Array("hello","hi","priya","big","data","hub", "hub","hi"),1) val baseRdd2 = sc.parallelize(Array("hey","ram","krishna","priya"),1) val baseRdd3 = sc.parallelize(Array(1,2,3,4),2) val sampledRdd = baseRdd1.sample(false,0.5) val unionRdd = baseRdd1.union(baseRdd2).repartition(1) val intersectionRdd = baseRdd1.intersection(baseRdd2) val distinctRdd = baseRdd1.distinct.repartition(1) val subtractRdd = baseRdd1.subtract(baseRdd2) val cartesianRdd = sampledRdd.cartesian(baseRdd2) val reducedValue = baseRdd3.reduce((a,b) => a+b) val collectedRdd = distinctRdd.collect collectedRdd.foreach(println) val count = distinctRdd.count val first = distinctRdd.first println("Count is..."+count); println("First Element is..."+first) val takeValues = distinctRdd.take(3) val takeSample = distinctRdd.takeSample(false, 2) val takeOrdered = distinctRdd.takeOrdered(2) takeValues.foreach(println) println("Take Sample Values..") takeSample.foreach(println) val foldResult = distinctRdd.fold("<>")((a,b) => a+b) println(foldResult) }}
Spark offers an abstraction called an RDD as part of its programming model. The preceding code snippets show RDD creation, transformations, and actions. Transformations such as union
, subtract
, intersection
, sample
, cartesian
, map
, filter
, and flatMap
when applied on a RDD result in a new RDD, whereas actions such as count
, first
, take(3)
, takeSample(false, 2)
and takeOrdered(2)
compute the result on the RDD and return it to the driver program or save it to external storage. Although we can define RDDs at any point, Spark computes them in lazy fashion, that is, the first time it is used in any action.
There are a few transformations, such as reduceByKey
, groupByKey
, repartition
, distinct
, intersection
, subtract
, and so on, which result in shuffle operation. This shuffle is very expensive as it involves disk I/O, data serialization, and network I/O. Using certain configuration parameters, shuffle can be optimized.
The Apache Spark documentation offers a detailed explanation about the Spark programming model. Please refer to this documentation page: http://spark.apache.org/docs/latest/programming-guide.html.
This recipe shows how to work with RDDs of key/value pairs. Key/value RDDs are often widely used to perform aggregations. These key/value RDDs are called pair RDDs. We'll do some initial ETL to get the data into a key/value format and see how to apply transformations on single-pair RDDs and two-pair RDDs.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the other distributed modes, that is, standalone, YARN, or Mesos. It could be run in local mode as well.
We can create a pair RDD from a collection of strings in the following way:
val baseRdd = sc.parallelize(Array("this,is,a,ball","it,is,a,cat","john,is, in,town,hall")) val inputRdd = sc.makeRDD(List(("is",2), ("it",2), ("cat",8 ("this",6),("john",5),("a",1))) val wordsRdd = baseRdd.flatMap(record => record.split(",")) val wordPairs = wordsRdd.map(word => (word, word.length)) val filteredWordPairs = wordPairs.filter{case(word, length) => length >=2}
Also, pair RDDs can be created from the
hdfs
input files. Let's take a text file which contains stocks data as follows:IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 GOOG,20160113,730.849976,734.73999,698.609985,700.559998, 2468300 MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700
Now, creating pair RDDs for the preceding data looks like this:
val textFile = sc.textFile("hdfs://namenodeHostName:8020 /data/stocks.txt") val stocksPairRdd = textFile.map{record => val colData = record.split(",") (colData(0),colData(6))}
Let's apply transformations on pair RDDs as follows:
val stocksGroupedRdd = stocksPairRdd.groupByKey val stocksReducedRdd = stocksPairRdd.reduceByKey((x,y)=>x+y) val subtractedRdd = wordPairs.subtractByKey(inputRdd) val cogroupedRdd = wordPairs.cogroup(inputRdd) val joinedRdd = filteredWordPairs.join(inputRdd) val sortedRdd = wordPairs.sortByKey val leftOuterJoinRdd = inputRdd.leftOuterJoin(filteredWordPairs) val rightOuterJoinRdd = wordPairs.rightOuterJoin(inputRdd) val flatMapValuesRdd = filteredWordPairs.flatMapValues(length => 1 to 5) val mapValuesRdd = wordPairs.mapValues(length => length*2) val keys = wordPairs.keys val values = filteredWordPairs.values
The usage of various pair RDD transformations is given as follows:
groupByKey
groups the values of the RDD by key.reduceByKey
performs aggregation on the grouped values corresponding to a key.subtractByKey
removes tuples in the first RDD whose key matches with the other RDD.join
groups all the values pertaining to a particular key in both the RDDs.cogroup
does the same job asjoin
but in addition it first groups the values in the first RDD and then in the other RDD.leftOuterJoin
andrightOuterJoin
work similarly to join with a slight variation that is,leftOuterJoin
includes all the records from left RDD and if there is no matching record found in the right RDD, the corresponding values are represented as none and vice versa forrightOuterJoin
.mapValues
transformation applies a function to each of the values of the pair RDD without changing the key.The functioning of
flatMapValues
is typical. It applies the function which returns an iterator to each value of a pair RDD, and for each element returned, a key/value entry is produced with the old key.keys
andvalues
transformations return respectively all keys and all values of a pair RDD.
There are other pair RDD transformations, such as, foldByKey
, combineByKey
, and aggregateByKey
, and actions such as countByKey
and countByValue
along with the available regular actions such as count
, first
, take
, and so on. Any pair RDD transformation would involve a shuffle operation which shuffles the data across the partitions. To know more about the working of the shuffle operation and its performance impact, please refer to http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs.
This recipe shows how to persist an RDD. As a known fact, RDDs are lazily evaluated and sometimes it is necessary to reuse the RDD multiple times. In such cases, Spark will re-compute the RDD and all of its dependencies, each time we call an action on the RDD. This is expensive for iterative algorithms which need the computed dataset multiple times. To avoid computing an RDD multiple times, Spark provides a mechanism for persisting the data in an RDD.
After the first time an action computes the RDD's contents, they can be stored in memory or disk across the cluster. The next time an action depends on the RDD, it need not be recomputed from its dependencies.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
Let's see how to persist RDDs using the following code:
val inputRdd = sc.parallelize(Array("this,is,a,ball","it,is,a,cat","julie,is,in,the,church")) val wordsRdd = inputRdd.flatMap(record => record.split(",")) val wordLengthPairs = wordsRdd.map(word before code=> (word, word.length)) val wordPairs = wordsRdd.map(word => (word,1)) val reducedWordCountRdd = wordPairs.reduceByKey((x,y) => x+y) val filteredWordLengthPairs = wordLengthPairs.filter{case(word,length) => length >=3} reducedWordCountRdd.cache() val joinedRdd = reducedWordCountRdd.join(filteredWordLengthPairs) joinedRdd.persist(StorageLevel.MEMORY_AND_DISK) val wordPairsCount = reducedWordCountRdd.count val wordPairsCollection = reducedWordCountRdd.take(10) val joinedRddCount = joinedRdd.count val joinedPairs = joinedRdd.collect() reducedWordCountRdd.unpersist() joinedRdd.unpersist()
The call to cache()
on reducedWordCountRdd
indicates that the RDD should be stored in memory for the next time it's computed. The count
action computes it initially. When the take
action is invoked, it accesses the cached elements of the RDD instead of re-computing them from the dependencies.
Spark defines levels of persistence or StorageLevel
values for persisting RDDs. rdd.cache()
is shorthand for rdd.persist(StorageLevel.MEMORY)
. In the preceding example, joinedRdd
is persisted with storage level as MEMORY_AND_DISK
which indicates persisting the RDD in memory as well as in disk. It is good practice to un-persist the RDD at the end, which lets us manually remove it from the cache.
Spark defines various levels of persistence, such as MEMORY_ONLY
, MEMORY_AND_DISK
, MEMORY_AND_DISK2
, and so on. Deciding when to cache/persist the data can be an art. The decision typically involves trade-offs between space and speed. If you attempt to cache too much data to fit in memory, Spark will use the LRU cache policy to evict old partitions. In general, RDDs should be persisted when they are likely to be referenced by multiple actions and are expensive to regenerate.
Please refer to http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence to gain a detailed understanding of persistence in Spark.
This recipe shows how Spark supports a wide range of input and output sources. Spark makes it very simple to load and save data in a large number of file formats. Formats range from unstructured, such as text
, to semi-structured, such as JSON
, to structured, such as SequenceFiles
.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos. Also, the reader is expected to have an understanding of text files, JSON, CSV, SequenceFiles, and object files.
Load and save a text file as follows:
val input = sc.textFile("hdfs://namenodeHostName:8020/repos/spark/README.md") val wholeInput = sc.wholeTextFiles("file://home/padma/salesFiles") val result = wholeInput.mapValues{value => val nums = value.split (" ").map(x => x.toDouble) nums.sum/nums.size.toDouble} result.saveAsTextFile("/home/Padma/outputFile.txt")
For loading a JSON file, the
people.json
input file is taken from theSPARK_HOME
folder whose location is/spark-1.6.0/examples/src/main/resource/people.json
. Now, loading and saving a JSON file looks like this:// Loading JSON file import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala. experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.databind. DeserializatiuonFeature ... case class Person(name:String, age:Int) ... val jsonInput = sc.textFile(""hdfs://namenode:9000/data/people.json") val result = jsonInput.flatMap(record => { try{Some(mapper.readValue(record, classOf[Person])) } catch{ case e:Exception => None }} ) result.filter(person => person.age>15).map(mapper.writeValueAsString(_)). saveAsTextFile(output File)
To load and save a CSV file, let's take the stocks data:
IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 GOOG,20160113,730.849976,734.73999,698.609985,700.559998,2468300 MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700 . . import java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... case class Stocks(name:String, totalPrice:Long) ... val input = sc.textFile("hdfs://namenodeHostName:8020 /data/stocks.txt") val result = input.map{line => val reader = new CSVReader(new StringReader(line)) reader.readAll().map(x => Stocks(x(0), x(6))) } result.map(stock => Array(stock.name, stock. totalPrice)).mapPartitions {stock => val stringWriter = new StringWriter val csvWriter = new CSVWriter(stringWriter) csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) }.saveAsTextFilehdfs://namenode:9000/CSVOutputFile")
Now, let's see the way
sequenceFile
is loaded and saved:val data = sc.sequenceFile(inputFile, classOf[Text], classOf[IntWritable]).map{case(x,y) => (x.toString, y.get())} val input = sc.parallelize(List(("Panda",3),("Kay",6), ("Snail",2))) input.saveAsSequenceFilehdfs://namenode:9000/ sequenceOutputFile")
The call to textFile()
on the SparkContext with the path to the file loads the text file as RDD. If there exists multiple input parts in the form of a directory then we can use SparkContext.wholeTextFiles()
, which returns a pair RDD with the key as the name of the input file. Well, for handling JSON files, the data is loaded as a text file and then it is parsed using a JSON parser. There are a number of JSON libraries available, but in the example we used the Jackson (http://bit.ly/17k6vli) library as it is relatively simple to implement.
Tip
Please refer to other JSON libraries, such as this one: http://bit.ly/1xP8JFK
Loading CSV/TSV data is similar to JSON data, that is, first the data is loaded as text and then processed. Similar to JSON, there are various CSV libraries, but for Scala, we used opencsv
(http://opencsv.sourceforge.net). Using CSVReader
, the records are parsed and mapped to case class structure. While saving the file, CSVWriter
is used to output the file.
When coming to SequenceFile
, it is a popular Hadoop format composed of a flat file with key/value pairs. This sequence file implements Hadoop's writable interface. SparkContext.sequenceFile()
is the API to load the sequence file in which the parameters classOf[Text]
and classOf[IntWritable]
indicate the keyClass
and valueClass
.
As Spark is built on the ecosystem of Hadoop, it can access data through the InputFormat
and OutputFormat
interfaces used by Hadoop MapReduce, which are available for many common file formats and storage systems (for example, S3, HDFS, Cassandra, HBase, and so on).
Tip
For more information, please refer Hadoop InputFormat (http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html) and SequenceFiles (http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html).
Spark can also interact with any Hadoop supported formats (for both old and new Hadoop file APIs) using newAPIHadoopFile
, which takes a path and three classes. The first class represents the input format. The next class is for our key and the final class is the class of our value. The Spark SQL module provides a more efficient API for structured data sources, which includes JSON and Hive.
For more details on Hadoop input and output formats and SequenceFiles
input format, please refer to the following:
This recipe shows how to use accumulators and broadcast variables. Accumulators are used to aggregate values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. The other type of shared variable is the broadcast variable, which allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. Such variables are used in cases where the application needs to send a large, read-only lookup table to all the nodes.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
The log file of workers from Spark log
$SPARK_HOME/logs
is taken, whose filename looks like this:spark-padma-org.apache.spark.deploy.worker.Worker-1-blrrndtipdl19
. Place this file in HDFS. This log file contains Spark log information with different trace levels, such asDEBUG
,INFO
,WARN
, andERROR
. The sample data looks as follows:Let's work with an accumulator now:
val sc = new SparkContext val logFile = sc.textFile("hdfs://namenodeHostName:8020/data/spark- worker-Worker1.out") val errorLines = sc.accumulator(0) val debugLines = logFile.map{line => if(line.contains("ERROR")) errorLines +=1 if(line.contains("DEBUG"))line } debugLines.saveAsTextFile("hdfs://namenodeHostName:8020/data /out/ debugLines.txt") println("ERROR Lines: "+ errorLines.value)
Now create a broadcast variable and use it in the workers as follows:
val sc = new SparkContext val broadCastedTemperatures = sc.broadcast(Map("KOCHI" -> 22,"BNGLR" -> 22, "HYD" -> 24, "MUMBAI" -> 21, "DELHI" -> 17, "NOIDA" -> 19, "SIMLA" -> 9)) val inputRdd = sc.parallelize(Array("BNGLR",20), ("BNGLR",16), ("KOCHI",-999), ("SIMLA",-999), ("DELHI",19, ("DELHI",-999), ("MUMBAI",27), ("MUMBAI",-999), ("HYD",19), ("HYD",25), ("NOIDA",-999) ) val replacedRdd = inputRdd.map{case(location, temperature) => val standardTemperatures = broadCastedTemperatures.value if(temperature == -999 && standardTemperatures.get(location) != None) (location, standardTemperatures.get(location).get) else if(temperature != -999) (location, temperature ) } val locationsWithMaxTemperatures = replacedRdd.reduceByKey{(temp1, temp2) => if (temp1 > temp2) temp1 else temp2}
Initially, when working with accumulators, we created Accumulator[Int]
, called errorLines
, and added 1
to it whenever we saw a line that contained ERROR
. We will see the correct count for errorLines
only after the saveAsTextFile()
action runs because the transformation map()
is lazy, so the side-effect, incrementing the accumulator happens only when the map()
is forced to occur by saveAsTextFile()
. The return type of the accumulator would be the org.apache.spark.Accumulator[T]
object where T
is the type of the value.
Well, coming to broadcast variables, SparkContext.broadcast
creates a broadcast variable of type Broadcast[T]
. T
is of any type and it should be serializable. The value of the broadcast variable is accessed using the value
property. The variable is sent to each node only once and is read-only.
Spark has support for custom accumulator types. They need to extend AccumulatorParam
.
Tip
For additional information on this, please visit: http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka.
Also, when working with broadcast variables, it is essential to choose a serialization format which is fast and compact.
Tip
For more information on broadcast variables, please refer: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables.
This recipe shows how to run an application on distributed clusters. An application is launched on a set of machines using an external service called a cluster manager. There is a wide variety of cluster managers such as Hadoop YARN, Apache Mesos, and Spark's own built-in standalone cluster manager. Spark provides a single tool for submitting jobs across all cluster managers, called spark-submit. Through various options, spark-submit can connect to different cluster managers and control how many resources your application gets.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
Let's create a word count application:
package org.apache.spark.programs object WordCount{ def main(args:Array[String]) { val conf = new SparkConf conf.setAppName("WordCount") val sc = new SparkContext(conf) val input = sc.parallelize(Array("this,is,a,ball","it,is,a,cat","john,is, in,town,hall")) val words = input.flatMap{record => record.split(",")} val wordPairs = words.map(word => (word,1)) val wordCounts = wordPairs.reduceByKey{(a,b) => a+b} val result = wordCounts.collect println("Displaying the WordCounts:") result.foreach(println)
Submit the application to Spark's standalone cluster manager:
spark-submit --class org.apache.spark.programs.WordCount --master spark://master:7077 WordCount.jar
Submit the application to YARN:
spark-submit --class org.apache.spark.programs.WordCount --master yarn WordCount.jar
Submit the application to Mesos:
spark-submit --class org.apache.spark.programs.WordCount --master mesos://mesos-master:5050 WordCount.jar
When spark-submit
is called with the --master
flag as spark://master:7077
submits the application to Spark's standalone cluster. Invoking with the --master
flag as yarn
runs the application in the YARN cluster, whereas specifying the --master
flag as mesos://mesos-master:5050
runs the application on Mesos
cluster.
Whenever spark-submit
is invoked, it launches the driver program. This driver program contacts the cluster manager and requests resources to launch executors. Once the executors are launched by the cluster manager, the driver runs through the user application. It delegates the work to executors in the form of tasks. When the driver's main()
method exits, it will terminate the executors and releases resources from the cluster manager. spark-submit
provides various options as well to control specific details.
For more information on submitting applications to a cluster and the various options provided by Spark-submit, please visit: http://spark.apache.org/docs/latest/submitting-applications.html. Also, for detailed information about the different cluster managers, please refer to the following:
Also, to learn in details about the different cluster managers, please refer:
Spark SQL is a Spark module for structured data processing. It provides the programming abstraction called DataFrame (in earlier versions of Spark, it is called SchemaRDD) and also acts as distributed SQL query engine. The capabilities it provides are as follows:
It loads data from a variety of structured sources (for example, JSON, Hive, and Parquet)
It lets you query data using SQL, both inside a Spark program and from external tools that connect to Spark SQL through standard database connectors (JDBC/ODBC), such as BI tools like Tableau.
Spark SQL provides rich integration between SQL and regular Python/Java/Scala code, including the ability to join RDDs and SQL tables, expose custom functions in SQL, and more.
A DataFrame is an RDD of row objects, each representing a record. It is also known as a schema of records. These can be created from external data sources, from results of queries, or from regular RDDs. The created DataFrame can be registered as a temporary table and apply SQLContext.sql
or HiveContext.sql
to query the table. This recipe shows how to work with DataFrames.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
Let's see how to create a DataFrame from a JSON file:
import org.apache.spark.sql._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext object JSONDataFrame { def main(args:Array[String]) { val conf=new SparkConf conf.setMaster("spark://master:7077") conf.setAppName("sql_Sample") val sc=new SparkContext(conf) val sqlcontxt=new SQLContext(sc) val df = sqlContext.read.json("/home/padma/Sparkdev/spark- 1.6.0/examples/src/main/resources/people.json") df.show df.printSchema df.select("name").show df.select("name","age").show df.select(df("name"),df("age")+4).show df.groupBy("age").count.show df.describe("name,age") } }
Now create a DataFrame from a text file and query on the DataFrame:
object DataFrames { case class Person(name:String, age:Int) def main(args:Array[String]) { val conf = new SparkConf conf.setMaster("spark://master:7077") conf.setAppName("DataFramesApp") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val peopleDf = sc.textFile("/home/padma/Sparkdev/spark- 1.6.0/examples/src/main/resources/people.txt"). map(line => line.split(",")).map(p => Person(p(0),p(1).trim.toInt)).toDF peopleDf.registerTempTable("people") val teenagers = sqlContext.sql("select name, age from people where age >=13 AND name in(select name from people where age= 30)") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) } }
Here is the code snippet to show how to create a DataFrame from a parquet file:
val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df1 = sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double") df1.write.parquet("/home/padma/Sparkdev/SparkApp/ test_table/key=1") val df2 = sc.makeRDD(6 to 10).map(i => (i,i*4)).toDF("single","triple") df2.write.parquet("/home/padma/Sparkdev/SparkApp/ test_table/key=2") val df3 = sqlContext.read.parquet("/home/padma/Sparkdev/ SparkApp/test_table") df3.show
Initially, the JSON file is read, which is the DataFrame, and the API such as show()
, printSchema()
, select()
, or groupBy()
can be invoked on the data frame. In the second code snippet, an RDD
is created from the text file and the fields are mapped to the case class structure Person
and the RDD
is converted to a data frame using toDF
. This data frame peopleDF
is converted to a table using registerTempTable()
whose table name is people
. Now this table people
can be queried using SQLContext.sql
.
The final code snippet shows how to write a data frame as a parquet file using df1.write.parquet()
and the parquet file is read using sqlContext.read.parquet()
.
Spark SQL in addition provides HiveContext, using which we can access Hive tables, UDFS, SerDes, and also HiveQL. There are ways to create DataFrames by converting an RDD to a DataFrame or creating them programmatically. The different data sources, such as JSON, Parquet, and Avro, can be handled and there is provision to directly run sql
queries on the files. Also, data from other databases can be read using JDBC. In Spark 1.6.0, a new feature known as Dataset is introduced, which provides the benefits of Spark SQL's optimized execution engine over RDDs.
For more information on Spark SQL, please visit: http://spark.apache.org/docs/latest/sql-programming-guide.html. The earlier Working with the Spark programming model, Working with Spark's Python and Scala shells, and Working with pair RDDs recipes covered the initial steps in Spark and the basics of RDDs.
Spark Streaming is a library in the Spark ecosystem which addresses real-time processing needs. Spark's batch processing executes the job over large datasets at once, where as Streaming aims for low latency (in hundreds of milliseconds), as data becomes available, it is transformed and processing is done in near real time.
Spark Streaming functions by running jobs over the small batches of data that accumulate in small intervals. It is used for rapid alerting, for supplying dashboards with up-to-date information, as well as for scenarios that need more complex analytics. For example, a common use case in anomaly detection to run K-means clustering on small batches of data and trigger an alert if the cluster center deviates from what is normal.
Tip
For more information, please visit: http://spark.apache.org/docs/latest/streaming-programming-guide.html.
This recipe shows how to work with Spark Streaming and apply stateless transformations and Windowed transformations.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
There are two types of transformations supported in Spark Streaming: stateless and stateful (windowed). Let's apply the stateless transformations:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream._ import org.apache.spark.SparkConf object StatelessTransformations { def main(args:Array[String]) { val conf= new SparkConf conf.setMaster("spark://master:7077").setAppName("StreamingApp") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) val spamInfoRDD = ssc.sparkContext.textFile("/path/fileName", 2) val lines = ssc.socketTextStream("172.22.225.174", 7777) val mapLines = lines.map(ele => ele+"<<<>>>") val mapLines2 = lines.map(ele => (ele,1)) val errorLines = lines.filter(line =>line.contains("Padma")) val flatMapLines = lines.flatMap(ele => ele.split(",")) val reduceByKeyLines = mapLines2.reduceByKey((a,b) => a+b) val groupByKeyLines = mapLines2.groupByKey().mapValues(names => names.toSet.size) val unionedDstreams = mapLines.union(flatMapLines) val joinedDstream = reduceByKeyLines.join(groupByKeyLines) val cogroupedDStream = reduceByKeyLines.cogroup(groupByKeyLines) val transformLines = lines.transform(rdd => {rdd.union(spamInfoRDD).filter(_.contains("Padma"))}) errorLines.print mapLines.print flatMapLines.print reduceByKeyLines.print groupByKeyLines.print joinedDstream.print cogroupedDStream.print unionedDstreams.print ssc.start ssc.awaitTermination ssc.stop } }
Now let's apply windowed/stateful transformations:
object StatefulTransformations { def updateRunningSum(values:Seq[Long], state:Option[Long]) Some(state.getOrElse(0L) + values.size) def main(args:Array[String]) { val conf = new SparkConf conf.setMaster("spark://master:7077").setAppName(" Stateful_transformations") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("172.25.41.66", 7777) val windowedLines = lines.window(Seconds(4),Seconds(2)) val mapLines = lines.map(ele => (ele,1L)) val windowCounts = windowedLines.count val countByWindowLines = lines.countByWindow(Seconds(4), Seconds(2)) val reducedLines = lines.reduce(_+_) val updateDStream = mapLines.updateStateByKey (updateRunningSum _) val mapLines = lines.map(ele => (ele,1)) val reducedKeyWindow = mapLines.reduceByKeyAndWindow({(x,y)=> x+y}, {(x,y) => x-y}, Seconds(4), Seconds(2)) windowedLines.print windowCounts.print reducedKeyWindow.print countByWindowLines.print updateDStream.print reducedLines.print ssc.checkpoint("/home/padma/StreamingCheckPoint/") ssc.start ssc.awaitTermination } }
It's also possible to apply DataFrames, that is, SQL operations on streaming data. The following code snippet shows SQL operations over streams of data:
import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.sql._ import org.apache.spark.sql.SQLContext ... object StreamingSQL { case class Words(wordName:String, count:Int) def main(args:Array[String]) { val conf = new SparkConf conf.setAppName("StreamingSQL").setMaster ("spark://master:7077") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(4)) val kafkaParams = Map("test-consumer-group" -> 1) val topicMap = Map("test" -> 1) val kafkaLines = KafkaUtils.createStream(ssc,"blrovh:2181", "test-consumer-group",topicMap) val words = kafkaLines.map{tuple => tuple._2} val wordPairs = words.map(word => (word,1)) val reduceWords = wordPairs.reduceByKey((a,b) => a+b) reduceWords.foreachRDD{ rdd => { val sqlContext = new SQLContext(rdd.sparkContext) import sqlContext.implicits._ val df = rdd.map(record => Words(record._1, record._2)) val dfNew = sqlContext.createDataFrame(df) dfNew.registerTempTable("Words") val data = sqlContext.sql("select wordName from Words") data.foreach(row => println(row.toString)) } } ssc.start ssc.awaitTermination } }
The new StreamingContext(SparkContext, Seconds(1))
line instantiates the StreamingContext
, which takes SparkContext
and batch interval as parameters. StreamingContext.socketTextStream(<ip>,<port-number>)
initializes a socket stream, which listens on the specified port for messages. This creates a DStream (discretized stream) which is a sequence of RDDs, being generated for each batch.
When working with windowed/stateful transformations, lines.window(Seconds(4),Seconds(2))
creates a window of 4 seconds and a sliding duration of 2 seconds on the incoming DStream lines. The window-based transformations such as reduceByKeyAndWindow
and countByWindow
use data or intermediate results from previous batches to compute the results of the current batch. The updateStateByKey
transformation constructs DStream (key, state) pairs by the specified function, where this function indicates how to update the state for each key given new values.
In the case of applying SQL operations on streaming data, KafkaUtils.createStream
initializes a DStream from Kafka. It takes StreamingContext
, zookeeperhostname
(blrovh
), port-number
of zookeeper (2181
), consumer-group name
(test-consumer-group) and topic map
(Map("test" -> 1)
) as parameters. The new SQLContext
line creates SQLContext and SQLContext.createDataFrame
creates a data frame for each RDD of the DStream. Using the DataFrame, the SQL queries are executed.
Spark Streaming uses a micro-batch architecture, where the streaming computation is a continuous series of batch computations on small batches of data. For each input source, Spark Streaming launches receivers, which are long-running tasks within an application executor, collects the data, replicates it to another executor for fault tolerance, and saves it as RDDs. Using the Kafka Direct Streaming approach, the Dstream is created as KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kakfaParams, topicSet)
. To address the fault tolerance, check-pointing is done, which periodically saves the state to a reliable filesystem.
Also, when running SQL queries on streaming data, there is a method for retaining the data for a specific duration before the query can complete. As in Spark batch processing, streams of data can be persisted either in memory or in disk. In the case of stateful operations, the data is, by default, persistent in memory.
For more details on Spark Streaming, internal architecture, check-pointing, performance tuning, and receiver parallelism, please refer to the following: