Home Data Apache Spark for Data Science Cookbook

Apache Spark for Data Science Cookbook

By Padma Priya Chitturi
books-svg-icon Book
eBook $39.99 $27.98
Print $48.99
Subscription $15.99 $10 p/m for three months
$10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
BUY NOW $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime!
eBook $39.99 $27.98
Print $48.99
Subscription $15.99 $10 p/m for three months
What do you get with a Packt Subscription?
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
  1. Free Chapter
    Big Data Analytics with Spark
About this book
Spark has emerged as the most promising big data analytics engine for data science professionals. The true power and value of Apache Spark lies in its ability to execute data science tasks with speed and accuracy. Spark’s selling point is that it combines ETL, batch analytics, real-time stream analysis, machine learning, graph processing, and visualizations. It lets you tackle the complexities that come with raw unstructured data sets with ease. This guide will get you comfortable and confident performing data science tasks with Spark. You will learn about implementations including distributed deep learning, numerical computing, and scalable machine learning. You will be shown effective solutions to problematic concepts in data science using Spark’s data science libraries such as MLLib, Pandas, NumPy, SciPy, and more. These simple and efficient recipes will show you how to implement algorithms and optimize your work.
Publication date:
December 2016
Publisher
Packt
Pages
392
ISBN
9781785880100

 

Chapter 1. Big Data Analytics with Spark

In this chapter, we will cover the components of Spark. You will learn them through the following recipes:

  • Initializing SparkContext

  • Working with Spark's Python and Scala shells

  • Building standalone applications

  • Working with the Spark programming model

  • Working with pair RDDs

  • Persisting RDDs

  • Loading and saving data

  • Creating broadcast variables and accumulators

  • Submitting applications to a cluster

  • Working with DataFrames

  • Working with Spark Streaming

 

Introduction


Apache Spark is a general-purpose distributed computing engine for large-scale data processing. It is an open source initiative from AMPLab and donated to the Apache Software Foundation. It is one of the top-level projects under the Apache Software Foundation. Apache Spark offers a data abstraction called Resilient Distributed Datasets (RDDs) to analyze the data in parallel on top of a cluster of resources. The Apache Spark framework is an alternative to Hadoop MapReduce. It is up to 100X faster than MapReduce and offers the best APIs for iterative and expressive data processing. This project is written in Scala and it offers client APIs in Scala, Java, Python, and R.

 

Initializing SparkContext


This recipe shows how to initialize the SparkContext object as a part of many Spark applications. SparkContext is an object which allows us to create the base RDDs. Every Spark application must contain this object to interact with Spark. It is also used to initialize StreamingContext, SQLContext and HiveContext.

Getting ready

To step through this recipe, you will need a running Spark Cluster in any one of the modes that is, local, standalone, YARN, or Mesos. For installing Spark on a standalone cluster, please refer to http://spark.apache.org/docs/latest/spark-standalone.html. Install Hadoop (optional), Scala, and Java. Please download the data from the following location:

https://github.com/ChitturiPadma/datasets/blob/master/stocks.txt

How to do it…

Let's see how to initialize SparkContext:

  1. Invoke spark-shell:

         $SPARK_HOME/bin/spark-shell --master <master type> 
         Spark context available as sc.
    
  2. Invoke PySpark:

         $SPARK_HOME/bin/pyspark --master <master type> 
         SparkContext available as sc
    
  3. Invoke SparkR:

         $SPARK_HOME/bin/sparkR --master <master type> 
         Spark context is available as sc
    
  4. Now, let's initiate SparkContext in different standalone applications, such as Scala, Java, and Python:

Scala:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

 object SparkContextExample {
   def main(args: Array[String]) {
    val stocksPath = "hdfs://namenode:9000/stocks.txt" 
    val conf = new SparkConf().setAppName("Counting     
     Lines").setMaster("spark://master:7077")
     val sc = new SparkContext(conf)
     val data = sc.textFile(stocksPath, 2)
     val totalLines = data.count()
     println("Total number of Lines: %s".format(totalLines))
   }
 } 

Java:

import org.apache.spark.api.java.*; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 

public class SparkContextExample {
   public static void main(String[] args) {
     String stocks = "hdfs://namenode:9000/stocks.txt"  
     SparkConf conf = new SparkConf().setAppName("Counting
     Lines").setMaster("spark://master:7077");
     JavaSparkContext sc = new JavaSparkContext(conf);
     JavaRDD<String> logData = sc.textFile(stocks);

     long totalLines = stocks.count();   
     System.out.println("Total number of Lines " + totalLines); 
 } 
} 

Python:

from pyspark  
import SparkContext

stocks = "hdfs://namenode:9000/stocks.txt"  
 
sc = SparkContext("<master URI>", "ApplicationName")
data = sc.textFile(stocks)

totalLines = data.count() 
print("Total Lines are: %i" % (totalLines)) 

How it works…

In the preceding code snippets, new SparkContext(conf), new JavaSparkContext(conf), and SparkContext("<master URI>", "ApplicationName") initialize SparkContext in three different languages: Scala, Java, and Python. SparkContext is the starting point for Spark functionality. It represents the connection to a Spark Cluster, and can be used to create RDDs, accumulators, and broadcast variables on that cluster.

There's more…

SparkContext is created on the driver. It connects with the cluster. Initially, RDDs are created using SparkContext. It is not serialized. Hence it cannot be shipped to workers. Also, only one SparkContext is available per application. In the case of Streaming applications and Spark SQL modules, StreamingContext and SQLContext are created on top of SparkContext.

See also

To understand more about the SparkContext object and its methods, please refer to this documentation page: https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkContext.

 

Working with Spark's Python and Scala shells


This recipe explains the spark-shell and PySpark command-line interface tools from the Apache Spark project. Spark-shell is the Scala-based command line interface tool and PySpark is the Python-based command-line tool used to develop Spark interactive applications. They are already initialized with SparkContext, SQLContext, and HiveContext.

How to do it…

Both spark-shell and PySpark are available in the bin directory of SPARK_HOME, that is, SPARK_HOME/bin:

  1. Invoke spark-shell as follows:

        $SPARK_HOME/bin/spark-shell [Options] 
     
        $SPARK_HOME/bin/spark-shell --master <master type> i.e., local, 
        spark, yarn, mesos. 
        $SPARK_HOME/bin/spark-shell --master
        spark://<sparkmasterHostName>:7077 
     
        Welcome to 
             ____              __ 
            / __/__  ___ _____/ /__ 
           _\ \/ _ \/ _ `/ __/  '_/ 
          /__ / .__/\_,_/_/ /_/\_\   version 1.6.0 
             /_/ 
     
        Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java  
        1.7.0_79) 
        Type in expressions to have them evaluated. 
        Type :help for more information. 
        16/01/17 20:05:38 WARN Utils: Your hostname, localhost resolves to
        a loopback address: 127.0.0.1; using 192.168.1.6 instead (on 
        interface en0)
      SQL context available as sqlContext. 
     
        scala> val data = sc.textFile("hdfs://namenode:9000/stocks.txt"); 
        data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1]
        textFile at <console>:27 
     
        scala> data.count() 
        res0: Long = 57391  
     
        scala> data.first() 
        res1: String = NYSE  CLI   2009-12-31  35.39 35.70 34.50 34.5
                       890100         34.12 
     
        scala> data.top(2) 
        res5: Array[String] = Array(NYSE CZZ   2009-12-31  8.77  8.77  8.67
             8.70  694200    8.70, NYSE  CZZ   2009- 12-30  8.71  8.80 
             8.46    8.68  1588200     8.68) 
     
        scala> val mydata = data.map(line => line.toLowerCase()) 
        mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at
        map at <console>:29 
     
        scala> mydata.collect() 
        res6: Array[String] = Array(nyse cli 2009-12-31 35.39 35.70
        34.50 34.57 890100 34.12, nyse cli 2009-12-30 35.22 35.46
        34.96 35.40 516900 34.94, nyse cli 2009-12-29 35.69 35.95 
        35.21 35.34 556500 34.88, nyse cli 2009-12-28 35.67 36.23 
        35.49 35.69 565000 35.23, nyse cli 2009-12-24 35.38 35.60 
        35.19 35.47 230200 35.01, nyse cli 2009-12-23 35.13 35.51 
        35.07 35.21 520200 34.75, nyse cli 2009-12-22 34.76 35.04 
        34.71 35.04 564600 34.58, nyse cli 2009-12-21 34.65 34.74
     34.41 34.73 428400 34.28, nyse cli 2009-12-18 34.11 34.38 
        33.73 34.22 1152600 33.77, nyse cli 2009-12-17 34.18 34.53 
        33.84 34.21 1082600 33.76, nyse cli 2009-12-16 34.79 35.10 
        34.48 34.66 1007900 34.21, nyse cli 2009-12-15 34.60 34.91 
        34.39 34.84 813200 34.39, nyse cli 2009-12-14 34.21 34.90 
        33.86 34.82 987700 34.37, nyse cli 200...)
    
  2. Invoke PySpark as follows:

        $SPARK_HOME/bin/pyspark [options] 
        $SPARK_HOME/bin/pyspark --master <master type> i.e., local, 
        spark, yarn, mesos 
        $SPARK_HOME/bin/pyspark --master spark://
        sparkmasterHostName:7077 
     
        Python 2.7.6 (default, Sep  9 2014, 15:04:36)  
        [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin 
        Type "help", "copyright", "credits" or "license" for more
        information. 
        Using Spark's default log4j profile: org/apache/spark/log4j-
        defaults.properties 
        16/01/17 20:25:48 INFO SparkContext: Running Spark version 1.6.0 
        ... 
     
        Welcome to 
             ____              __ 
            / __/__  ___ _____/ /__ 
           _\ \/ _ \/ _ `/ __/  '_/ 
          /__ / .__/\_,_/_/ /_/\_\   version 1.6.0 
             /_/
    Using Python version 2.7.6 (default, Sep  9 2014 15:04:36) 
        SparkContext available as sc, HiveContext available as sqlContext. 
     
        >>> data = sc.textFile"hdfs://namenode:9000/stocks.txt"); 
     
        >>> data.count() 
        57391  
        >>> data.first() 
        NYSE     CLI   2009-12-31  35.39 35.70 34.50 34.57 890100
        34.12 
        >>> data.top(2) 
        ['NYSE   CZZ   2009-12-31  8.77  8.77  8.67  8.70  694200   8.70',
         'NYSE   CZZ   2009-12-30  8.71  8.80  8.46  8.68  1588200  8.68' ] 
     
        >>> data.collect()
     ['NYSE CLI 2009-12-31 35.39 35.70 34.50 34.57 890100 34.12, 
         'NYSE CLI 2009-12-30 35.22 35.46 34.96 35.40 516900 34.94, 
         'NYSE CLI 2009-12-29 35.69 35.95 35.21 35.34 556500 34.88', 
         'NYSE CLI 2009-12-28 35.67 36.23 35.49 35.69 565000 35.23', 
         'NYSE CLI 2009-12-24 35.38 35.60 35.19 35.47 230200 35.01', 
         'NYSE CLI 2009-12-23 35.13 35.51 35.07 35.21 520200 34.75', 
         'NYSE CLI 2009-12-22 34.76 35.04 34.71 35.04 564600 34.58', 
         'NYSE CLI 2009-12-21 34.65 34.74 34.41 34.73 428400 34.28', 
         'NYSE CLI 2009-12-18 34.11 34.38 33.73 34.22 1152600 33.77', 
         'NYSE CLI 2009-12-17 34.18 34.53 33.84 34.21 1082600 33.76', 
         'NYSE CLI 2009-12-16 34.79 35.10 34.48 34.66 1007900 34.21', 
         'NYSE CLI 2009-12-15 34.60 34.91 34.39 34.84 813200 34.39', 
         'NYSE CLI 2009-12-14 34.21 34.90 33.86 34.82 987700 34.37', 
         'NYSE CLI 200...
    

How it works…

In the preceding code snippets, Spark RDD transformations and actions are executed interactively in both Spark-shell and PySpark. They work in Read Eval Print Loop (REPL) style and represent a computer environment such as a Window console or Unix/Linux shell where a command is entered and the system responds with an output in interactive mode.

There's more…

Both Spark-shell and PySpark are better command-line interfaces for developing Spark applications interactively. They have advanced features for application prototyping and quicker development. Also, they have numerous options for customizing them.

See also

The Apache Spark documentation offers plenty of examples using these two command-line interfaces; please refer to this documentation page: http://spark.apache.org/docs/latest/quick-start.html#interactive-analysis-with-the-spark-shell.

 

Building standalone applications


This recipe explains how to develop and build Spark standalone applications using programming languages such as Scala, Java, Python, and R. The sample application under this recipe is written in Scala.

Getting ready

Install any IDE tool for application development (the preferred one is Eclipse). Install the SBT build tool to build the project. Create the Scala project and add all the necessary libraries to the build.sbt file. Add this project to Eclipse. SBT is a build tool like Maven for Scala projects.

How to do it…

  1. Develop a Spark standalone application using the Eclipse IDE as follows:

           import org.apache.spark.SparkContext 
           import org.apache.spark.SparkContext._ 
           import org.apache.spark.SparkConf 
    
            object SparkContextExample {  
            def main(args: Array[String]) {
            val file="hdfs://namenode:9000/stocks.txt"     
            val conf = new SparkConf().setAppName("Counting
                       Lines").setMaster("spark://master:7077")
            val sc = new SparkContext(conf)
            val data = sc.textFile(file, 2)
            val totalLines = data.count()
    
            println("Total number of Lines: %s".format(totalLines))}} 
    
  2. Now go to the project directory and build the project using sbt assembly and sbt package manually or build it using eclipse:

            ~/SparkProject/ SparkContextExample/sbt assembly 
            ~/SparkProject/ SparkContextExample/sbt package 
    

How it works…

sbt assembly compiles the program and generates the JAR as SparkContextExample-assembly-<version>.jar. The sbt package generates the jar as SparkContextExample_2.10-1.0.jar. Both the jars are generated in the path ~/SparkProject/SparkContextExample/target/scala-2.10. Submit SparkContextExample-assembly-<version>.jar to the Spark cluster using the spark-submit shell script under the bin directory of SPARK_HOME.

There's more…

We can develop a variety of complex Spark standalone applications to analyze the data in various ways. When working with any third-party libraries, include the corresponding dependency jars in the build.sbt file. Invoking sbt update will download the respective dependencies and will include them in the project classpath.

See also

The Apache Spark documentation covers how to build standalone Spark applications. Please refer to this documentation page: https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications.

 

Working with the Spark programming model


This recipe explains the fundamentals of the Spark programming model. It covers the RDD basics that is, Spark provides a Resilient Distributed Dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel. It also covers how to create and perform transformations and actions on RDDs.

How to do it…

  1. Let's create RDDs and apply a few transformations such as map and filter, and a few actions such as count, take, top, and so on, in Spark-shell:

            scala> val data = Array(1, 2, 3, 4, 5) 
            scala> val rddData = sc.parallelize(data) 
            scala> val mydata = data.filter(ele => ele%2==0) 
            mydata: org.apache.spark.rdd.RDD[String] = 
            MapPartitionsRDD[3]   at
            filter at <console>:29 
            scala> val mydata = data.map(ele => ele+2) 
            mydata: org.apache.spark.rdd.RDD[String] = 
            MapPartitionsRDD[3]  at 
            filter at <console>:30 
            scala> mydata.count() 
            res1: Long = 5 
            scala> mydata.take(2) 
            res2:Array[Int] = Array(1,2) 
            scala> mydata.top(1) 
            res2:Array[Int] = Array(5)
    
  2. Now let's work with the transformations and actions in a Spark standalone application:

              object SparkTransformations { 
              def main(args:Array[String]){ 
              val conf = new SparkConf 
                         conf.setMaster("spark://master:7077") 
              val sc = new SparkContext(conf) 
              val baseRdd1 =     
              sc.parallelize(Array("hello","hi","priya","big","data","hub",
              "hub","hi"),1) 
              val baseRdd2 =      
              sc.parallelize(Array("hey","ram","krishna","priya"),1) 
              val baseRdd3 =  sc.parallelize(Array(1,2,3,4),2) 
              val sampledRdd = baseRdd1.sample(false,0.5) 
              val unionRdd = baseRdd1.union(baseRdd2).repartition(1) 
              val intersectionRdd = baseRdd1.intersection(baseRdd2) 
              val distinctRdd = baseRdd1.distinct.repartition(1) 
              val subtractRdd = baseRdd1.subtract(baseRdd2) 
              val cartesianRdd = sampledRdd.cartesian(baseRdd2) 
              val reducedValue = baseRdd3.reduce((a,b) => a+b) 
           
              val collectedRdd = distinctRdd.collect 
              collectedRdd.foreach(println) 
              val count = distinctRdd.count 
              val first = distinctRdd.first 
              println("Count is..."+count); println("First Element     
              is..."+first) 
              val takeValues = distinctRdd.take(3) 
              val takeSample = distinctRdd.takeSample(false, 2) 
              val takeOrdered = distinctRdd.takeOrdered(2) 
              takeValues.foreach(println) 
              println("Take Sample Values..") 
              takeSample.foreach(println) 
              val foldResult = distinctRdd.fold("<>")((a,b) => a+b) 
              println(foldResult) }} 
    

How it works…

Spark offers an abstraction called an RDD as part of its programming model. The preceding code snippets show RDD creation, transformations, and actions. Transformations such as union, subtract, intersection, sample, cartesian, map, filter, and flatMap when applied on a RDD result in a new RDD, whereas actions such as count, first, take(3), takeSample(false, 2) and takeOrdered(2) compute the result on the RDD and return it to the driver program or save it to external storage. Although we can define RDDs at any point, Spark computes them in lazy fashion, that is, the first time it is used in any action.

There's more…

There are a few transformations, such as reduceByKey, groupByKey, repartition, distinct, intersection, subtract, and so on, which result in shuffle operation. This shuffle is very expensive as it involves disk I/O, data serialization, and network I/O. Using certain configuration parameters, shuffle can be optimized.

See also

The Apache Spark documentation offers a detailed explanation about the Spark programming model. Please refer to this documentation page: http://spark.apache.org/docs/latest/programming-guide.html.

 

Working with pair RDDs


This recipe shows how to work with RDDs of key/value pairs. Key/value RDDs are often widely used to perform aggregations. These key/value RDDs are called pair RDDs. We'll do some initial ETL to get the data into a key/value format and see how to apply transformations on single-pair RDDs and two-pair RDDs.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the other distributed modes, that is, standalone, YARN, or Mesos. It could be run in local mode as well.

How to do it…

  1. We can create a pair RDD from a collection of strings in the following way:

            val baseRdd =
            sc.parallelize(Array("this,is,a,ball","it,is,a,cat","john,is,
            in,town,hall")) 
            val inputRdd = sc.makeRDD(List(("is",2), ("it",2), ("cat",8
            ("this",6),("john",5),("a",1))) 
            val wordsRdd = baseRdd.flatMap(record => record.split(",")) 
            val wordPairs =  wordsRdd.map(word => (word, word.length)) 
            val filteredWordPairs = wordPairs.filter{case(word, length) =>
            length >=2} 
    
  2. Also, pair RDDs can be created from the hdfs input files. Let's take a text file which contains stocks data as follows:

             IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 
             GOOG,20160113,730.849976,734.73999,698.609985,700.559998,
             2468300 
             MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 
             MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 
             YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700 
    
  3. Now, creating pair RDDs for the preceding data looks like this:

            val textFile = sc.textFile("hdfs://namenodeHostName:8020
            /data/stocks.txt") 
            val stocksPairRdd = textFile.map{record => val colData =  
            record.split(",") 
            (colData(0),colData(6))} 
    
  4. Let's apply transformations on pair RDDs as follows:

          val stocksGroupedRdd = stocksPairRdd.groupByKey 
          val stocksReducedRdd = stocksPairRdd.reduceByKey((x,y)=>x+y) 
          val subtractedRdd = wordPairs.subtractByKey(inputRdd) 
          val cogroupedRdd = wordPairs.cogroup(inputRdd) 
          val joinedRdd = filteredWordPairs.join(inputRdd) 
          val sortedRdd = wordPairs.sortByKey 
          val leftOuterJoinRdd = inputRdd.leftOuterJoin(filteredWordPairs) 
          val rightOuterJoinRdd = wordPairs.rightOuterJoin(inputRdd) 
          val flatMapValuesRdd = filteredWordPairs.flatMapValues(length =>   
          1 to 5) 
          val mapValuesRdd = wordPairs.mapValues(length => length*2) 
          val keys = wordPairs.keys 
          val values = filteredWordPairs.values 
    

How it works…

The usage of various pair RDD transformations is given as follows:

  • groupByKey groups the values of the RDD by key.

  • reduceByKey performs aggregation on the grouped values corresponding to a key.

  • subtractByKey removes tuples in the first RDD whose key matches with the other RDD.

  • join groups all the values pertaining to a particular key in both the RDDs.

  • cogroup does the same job as join but in addition it first groups the values in the first RDD and then in the other RDD.

  • leftOuterJoin and rightOuterJoin work similarly to join with a slight variation that is, leftOuterJoin includes all the records from left RDD and if there is no matching record found in the right RDD, the corresponding values are represented as none and vice versa for rightOuterJoin.

  • mapValues transformation applies a function to each of the values of the pair RDD without changing the key.

  • The functioning of flatMapValues is typical. It applies the function which returns an iterator to each value of a pair RDD, and for each element returned, a key/value entry is produced with the old key.

  • keys and values transformations return respectively all keys and all values of a pair RDD.

There's more…

There are other pair RDD transformations, such as, foldByKey, combineByKey, and aggregateByKey, and actions such as countByKey and countByValue along with the available regular actions such as count, first, take, and so on. Any pair RDD transformation would involve a shuffle operation which shuffles the data across the partitions. To know more about the working of the shuffle operation and its performance impact, please refer to http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs.

See also

The Working with the Spark programming model and Working with Spark's Python and Scala shells recipes explain how to work with RDDs and how to make use of Spark shell for testing the application logic.

 

Persisting RDDs


This recipe shows how to persist an RDD. As a known fact, RDDs are lazily evaluated and sometimes it is necessary to reuse the RDD multiple times. In such cases, Spark will re-compute the RDD and all of its dependencies, each time we call an action on the RDD. This is expensive for iterative algorithms which need the computed dataset multiple times. To avoid computing an RDD multiple times, Spark provides a mechanism for persisting the data in an RDD.

After the first time an action computes the RDD's contents, they can be stored in memory or disk across the cluster. The next time an action depends on the RDD, it need not be recomputed from its dependencies.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

Let's see how to persist RDDs using the following code:

val inputRdd = sc.parallelize(Array("this,is,a,ball","it,is,a,cat","julie,is,in,the,church")) 
val wordsRdd = inputRdd.flatMap(record => record.split(",")) 
val wordLengthPairs = wordsRdd.map(word before code=> (word, word.length)) 
val wordPairs = wordsRdd.map(word => (word,1)) 
val reducedWordCountRdd = wordPairs.reduceByKey((x,y) => x+y) 
val filteredWordLengthPairs = wordLengthPairs.filter{case(word,length) => length >=3} 
reducedWordCountRdd.cache() 
val joinedRdd = reducedWordCountRdd.join(filteredWordLengthPairs) 
joinedRdd.persist(StorageLevel.MEMORY_AND_DISK) 
val wordPairsCount =  reducedWordCountRdd.count 
val wordPairsCollection = reducedWordCountRdd.take(10)  
val joinedRddCount = joinedRdd.count 
val joinedPairs = joinedRdd.collect() 
reducedWordCountRdd.unpersist() 
joinedRdd.unpersist() 

How it works…

The call to cache() on reducedWordCountRdd indicates that the RDD should be stored in memory for the next time it's computed. The count action computes it initially. When the take action is invoked, it accesses the cached elements of the RDD instead of re-computing them from the dependencies.

Spark defines levels of persistence or StorageLevel values for persisting RDDs. rdd.cache() is shorthand for rdd.persist(StorageLevel.MEMORY). In the preceding example, joinedRdd is persisted with storage level as MEMORY_AND_DISK which indicates persisting the RDD in memory as well as in disk. It is good practice to un-persist the RDD at the end, which lets us manually remove it from the cache.

There's more…

Spark defines various levels of persistence, such as MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK2, and so on. Deciding when to cache/persist the data can be an art. The decision typically involves trade-offs between space and speed. If you attempt to cache too much data to fit in memory, Spark will use the LRU cache policy to evict old partitions. In general, RDDs should be persisted when they are likely to be referenced by multiple actions and are expensive to regenerate.

See also

Please refer to http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence to gain a detailed understanding of persistence in Spark.

 

Loading and saving data


This recipe shows how Spark supports a wide range of input and output sources. Spark makes it very simple to load and save data in a large number of file formats. Formats range from unstructured, such as text, to semi-structured, such as JSON, to structured, such as SequenceFiles.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos. Also, the reader is expected to have an understanding of text files, JSON, CSV, SequenceFiles, and object files.

How to do it…

  1. Load and save a text file as follows:

          val input = 
          sc.textFile("hdfs://namenodeHostName:8020/repos/spark/README.md") 
          val wholeInput = 
          sc.wholeTextFiles("file://home/padma/salesFiles") 
          val result = wholeInput.mapValues{value => val nums = value.split
          (" ").map(x => x.toDouble) 
          nums.sum/nums.size.toDouble} 
          result.saveAsTextFile("/home/Padma/outputFile.txt") 
    
  2. For loading a JSON file, the people.json input file is taken from the SPARK_HOME folder whose location is /spark-1.6.0/examples/src/main/resource/people.json. Now, loading and saving a JSON file looks like this:

            // Loading JSON file 
            import com.fasterxml.jackson.module.scala.DefaultScalaModule 
            import com.fasterxml.jackson.module.scala.
            experimental.ScalaObjectMapper
            import com.fasterxml.jackson.databind.ObjectMapper 
            import com.fasterxml.jackson.module.databind.
                DeserializatiuonFeature 
            ... 
            case class Person(name:String, age:Int) 
            ... 
            val jsonInput =
            sc.textFile(""hdfs://namenode:9000/data/people.json") 
            val result = jsonInput.flatMap(record => {
            try{Some(mapper.readValue(record, classOf[Person])) 
            } 
            catch{ 
            case e:Exception => None 
            }} ) 
            result.filter(person =>
            person.age>15).map(mapper.writeValueAsString(_)).
            saveAsTextFile(output File) 
    
  3. To load and save a CSV file, let's take the stocks data:

           IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 
           GOOG,20160113,730.849976,734.73999,698.609985,700.559998,2468300 
           MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 
           MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 
           YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700 
           . 
           . 
           import java.io.StringReader 
           import au.com.bytecode.opencsv.CSVReader 
           ... 
           case class Stocks(name:String, totalPrice:Long) 
           ... 
           val input = sc.textFile("hdfs://namenodeHostName:8020 
           /data/stocks.txt") 
           val result = input.map{line => val reader = new CSVReader(new 
           StringReader(line)) 
           reader.readAll().map(x => Stocks(x(0), x(6))) 
           } 
           result.map(stock => Array(stock.name, stock.
           totalPrice)).mapPartitions {stock =>  
           val stringWriter = new StringWriter 
           val csvWriter = new CSVWriter(stringWriter) 
           csvWriter.writeAll(people.toList) 
           Iterator(stringWriter.toString) 
           }.saveAsTextFilehdfs://namenode:9000/CSVOutputFile") 
    
  4. Now, let's see the way sequenceFile is loaded and saved:

           val data = sc.sequenceFile(inputFile, classOf[Text],  
           classOf[IntWritable]).map{case(x,y) => (x.toString, y.get())} 
           val input = sc.parallelize(List(("Panda",3),("Kay",6),
           ("Snail",2))) 
           input.saveAsSequenceFilehdfs://namenode:9000/
           sequenceOutputFile") 
    

How it works…

The call to textFile() on the SparkContext with the path to the file loads the text file as RDD. If there exists multiple input parts in the form of a directory then we can use SparkContext.wholeTextFiles(), which returns a pair RDD with the key as the name of the input file. Well, for handling JSON files, the data is loaded as a text file and then it is parsed using a JSON parser. There are a number of JSON libraries available, but in the example we used the Jackson (http://bit.ly/17k6vli) library as it is relatively simple to implement.

Tip

Please refer to other JSON libraries, such as this one: http://bit.ly/1xP8JFK

Loading CSV/TSV data is similar to JSON data, that is, first the data is loaded as text and then processed. Similar to JSON, there are various CSV libraries, but for Scala, we used opencsv (http://opencsv.sourceforge.net). Using CSVReader, the records are parsed and mapped to case class structure. While saving the file, CSVWriter is used to output the file.

When coming to SequenceFile, it is a popular Hadoop format composed of a flat file with key/value pairs. This sequence file implements Hadoop's writable interface. SparkContext.sequenceFile() is the API to load the sequence file in which the parameters classOf[Text] and classOf[IntWritable] indicate the keyClass and valueClass.

There's more…

As Spark is built on the ecosystem of Hadoop, it can access data through the InputFormat and OutputFormat interfaces used by Hadoop MapReduce, which are available for many common file formats and storage systems (for example, S3, HDFS, Cassandra, HBase, and so on).

Spark can also interact with any Hadoop supported formats (for both old and new Hadoop file APIs) using newAPIHadoopFile, which takes a path and three classes. The first class represents the input format. The next class is for our key and the final class is the class of our value. The Spark SQL module provides a more efficient API for structured data sources, which includes JSON and Hive.

 

Creating broadcast variables and accumulators


This recipe shows how to use accumulators and broadcast variables. Accumulators are used to aggregate values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. The other type of shared variable is the broadcast variable, which allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. Such variables are used in cases where the application needs to send a large, read-only lookup table to all the nodes.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. The log file of workers from Spark log $SPARK_HOME/logs is taken, whose filename looks like this: spark-padma-org.apache.spark.deploy.worker.Worker-1-blrrndtipdl19. Place this file in HDFS. This log file contains Spark log information with different trace levels, such as DEBUG, INFO, WARN, and ERROR. The sample data looks as follows:

  2. Let's work with an accumulator now:

            val sc = new SparkContext 
            val logFile =   
            sc.textFile("hdfs://namenodeHostName:8020/data/spark-
            worker-Worker1.out") 
            val errorLines = sc.accumulator(0) 
            val debugLines = logFile.map{line =>  
            if(line.contains("ERROR")) 
            errorLines +=1 
            if(line.contains("DEBUG"))line 
            } 
           debugLines.saveAsTextFile("hdfs://namenodeHostName:8020/data
           /out/
           debugLines.txt") 
           println("ERROR Lines: "+ errorLines.value) 
    
  3. Now create a broadcast variable and use it in the workers as follows:

          val sc = new SparkContext 
          val broadCastedTemperatures = sc.broadcast(Map("KOCHI" ->  
          22,"BNGLR" -> 22, "HYD" -> 24, "MUMBAI" -> 21, "DELHI" -> 17,   
          "NOIDA" -> 19, "SIMLA" -> 9)) 
          val inputRdd = sc.parallelize(Array("BNGLR",20), ("BNGLR",16),  
          ("KOCHI",-999), ("SIMLA",-999), ("DELHI",19, ("DELHI",-999),
          ("MUMBAI",27), ("MUMBAI",-999), ("HYD",19), ("HYD",25), 
          ("NOIDA",-999) )  
          val replacedRdd = inputRdd.map{case(location, temperature) => 
          val standardTemperatures = broadCastedTemperatures.value 
          if(temperature == -999 && standardTemperatures.get(location) !=  
          None) (location, standardTemperatures.get(location).get) else
          if(temperature != -999) (location, temperature ) 
          } 
          val locationsWithMaxTemperatures =   
          replacedRdd.reduceByKey{(temp1,
          temp2) => if (temp1 > temp2) temp1 else temp2} 
    

How it works…

Initially, when working with accumulators, we created Accumulator[Int], called errorLines, and added 1 to it whenever we saw a line that contained ERROR. We will see the correct count for errorLines only after the saveAsTextFile() action runs because the transformation map() is lazy, so the side-effect, incrementing the accumulator happens only when the map() is forced to occur by saveAsTextFile(). The return type of the accumulator would be the org.apache.spark.Accumulator[T] object where T is the type of the value.

Well, coming to broadcast variables, SparkContext.broadcast creates a broadcast variable of type Broadcast[T]. T is of any type and it should be serializable. The value of the broadcast variable is accessed using the value property. The variable is sent to each node only once and is read-only.

There's more…

Spark has support for custom accumulator types. They need to extend AccumulatorParam.

Also, when working with broadcast variables, it is essential to choose a serialization format which is fast and compact.

Tip

For more information on broadcast variables, please refer: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables.

See also

Please visit the earlier Working with the Spark programming model , Working with Spark's Python and Scala shells , and Working with pair RDDs recipes to get familiar with Spark.

 

Submitting applications to a cluster


This recipe shows how to run an application on distributed clusters. An application is launched on a set of machines using an external service called a cluster manager. There is a wide variety of cluster managers such as Hadoop YARN, Apache Mesos, and Spark's own built-in standalone cluster manager. Spark provides a single tool for submitting jobs across all cluster managers, called spark-submit. Through various options, spark-submit can connect to different cluster managers and control how many resources your application gets.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. Let's create a word count application:

            package org.apache.spark.programs 
            object WordCount{ 
            def main(args:Array[String]) { 
            val conf = new SparkConf 
            conf.setAppName("WordCount") 
            val sc = new SparkContext(conf) 
            val input =     
            sc.parallelize(Array("this,is,a,ball","it,is,a,cat","john,is,
            in,town,hall")) 
            val words = input.flatMap{record => record.split(",")} 
            val wordPairs = words.map(word => (word,1)) 
            val wordCounts = wordPairs.reduceByKey{(a,b) => a+b} 
            val result = wordCounts.collect 
            println("Displaying the WordCounts:") 
            result.foreach(println) 
    
  2. Submit the application to Spark's standalone cluster manager:

          spark-submit --class org.apache.spark.programs.WordCount --master 
          spark://master:7077 WordCount.jar 
    
    
  3. Submit the application to YARN:

          spark-submit --class org.apache.spark.programs.WordCount --master 
          yarn WordCount.jar
    
  4. Submit the application to Mesos:

          spark-submit --class org.apache.spark.programs.WordCount --master       
          mesos://mesos-master:5050 WordCount.jar
    

How it works…

When spark-submit is called with the --master flag as spark://master:7077 submits the application to Spark's standalone cluster. Invoking with the --master flag as yarn runs the application in the YARN cluster, whereas specifying the --master flag as mesos://mesos-master:5050 runs the application on Mesos cluster.

There's more…

Whenever spark-submit is invoked, it launches the driver program. This driver program contacts the cluster manager and requests resources to launch executors. Once the executors are launched by the cluster manager, the driver runs through the user application. It delegates the work to executors in the form of tasks. When the driver's main() method exits, it will terminate the executors and releases resources from the cluster manager. spark-submit provides various options as well to control specific details.

See also

For more information on submitting applications to a cluster and the various options provided by Spark-submit, please visit: http://spark.apache.org/docs/latest/submitting-applications.html. Also, for detailed information about the different cluster managers, please refer to the following:

Also, to learn in details about the different cluster managers, please refer:

 

Working with DataFrames


Spark SQL is a Spark module for structured data processing. It provides the programming abstraction called DataFrame (in earlier versions of Spark, it is called SchemaRDD) and also acts as distributed SQL query engine. The capabilities it provides are as follows:

  • It loads data from a variety of structured sources (for example, JSON, Hive, and Parquet)

  • It lets you query data using SQL, both inside a Spark program and from external tools that connect to Spark SQL through standard database connectors (JDBC/ODBC), such as BI tools like Tableau.

  • Spark SQL provides rich integration between SQL and regular Python/Java/Scala code, including the ability to join RDDs and SQL tables, expose custom functions in SQL, and more.

A DataFrame is an RDD of row objects, each representing a record. It is also known as a schema of records. These can be created from external data sources, from results of queries, or from regular RDDs. The created DataFrame can be registered as a temporary table and apply SQLContext.sql or HiveContext.sql to query the table. This recipe shows how to work with DataFrames.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. Let's see how to create a DataFrame from a JSON file:

             import org.apache.spark.sql._ 
             import org.apache.spark.sql.SQLContext 
             import org.apache.spark.SparkConf 
             import org.apache.spark.SparkContext 
     
             object JSONDataFrame { 
         
             def main(args:Array[String]) 
             { 
             val conf=new SparkConf 
             conf.setMaster("spark://master:7077") 
             conf.setAppName("sql_Sample") 
             val sc=new SparkContext(conf) 
             val sqlcontxt=new SQLContext(sc) 
             val df = sqlContext.read.json("/home/padma/Sparkdev/spark-
             1.6.0/examples/src/main/resources/people.json") 
             df.show 
             df.printSchema 
             df.select("name").show 
             df.select("name","age").show 
             df.select(df("name"),df("age")+4).show 
             df.groupBy("age").count.show 
             df.describe("name,age") } } 
    
  2. Now create a DataFrame from a text file and query on the DataFrame:

            object DataFrames { 
            case class Person(name:String, age:Int) 
            def main(args:Array[String]) 
            { 
            val conf = new SparkConf 
            conf.setMaster("spark://master:7077") 
            conf.setAppName("DataFramesApp") 
            val sc = new SparkContext(conf) 
            val sqlContext = new SQLContext(sc) 
         
            import sqlContext.implicits._ 
            val peopleDf = sc.textFile("/home/padma/Sparkdev/spark-
            1.6.0/examples/src/main/resources/people.txt"). 
            map(line => line.split(",")).map(p => 
            Person(p(0),p(1).trim.toInt)).toDF 
            peopleDf.registerTempTable("people") 
            val teenagers = sqlContext.sql("select name, age from people 
            where age >=13 AND name in(select name from people where age=
            30)") 
            teenagers.map(t => "Name: " + t(0)).collect().foreach(println) 
              }
            }  
    
  3. Here is the code snippet to show how to create a DataFrame from a parquet file:

            val sc = new SparkContext(conf) 
            val sqlContext = new SQLContext(sc) 
            import sqlContext.implicits._ 
         
            val df1 = sc.makeRDD(1 to 5).map(i =>  
            (i,i*2)).toDF("single","double") 
            df1.write.parquet("/home/padma/Sparkdev/SparkApp/
            test_table/key=1") 
         
            val df2 = sc.makeRDD(6 to 10).map(i => 
            (i,i*4)).toDF("single","triple") 
            df2.write.parquet("/home/padma/Sparkdev/SparkApp/
            test_table/key=2") 
         
            val df3 = sqlContext.read.parquet("/home/padma/Sparkdev/
            SparkApp/test_table") 
            df3.show 
    

How it works…

Initially, the JSON file is read, which is the DataFrame, and the API such as show(), printSchema(), select(), or groupBy() can be invoked on the data frame. In the second code snippet, an RDD is created from the text file and the fields are mapped to the case class structure Person and the RDD is converted to a data frame using toDF. This data frame peopleDF is converted to a table using registerTempTable() whose table name is people. Now this table people can be queried using SQLContext.sql.

The final code snippet shows how to write a data frame as a parquet file using df1.write.parquet() and the parquet file is read using sqlContext.read.parquet().

There's more…

Spark SQL in addition provides HiveContext, using which we can access Hive tables, UDFS, SerDes, and also HiveQL. There are ways to create DataFrames by converting an RDD to a DataFrame or creating them programmatically. The different data sources, such as JSON, Parquet, and Avro, can be handled and there is provision to directly run sql queries on the files. Also, data from other databases can be read using JDBC. In Spark 1.6.0, a new feature known as Dataset is introduced, which provides the benefits of Spark SQL's optimized execution engine over RDDs.

See also

For more information on Spark SQL, please visit: http://spark.apache.org/docs/latest/sql-programming-guide.html. The earlier Working with the Spark programming model, Working with Spark's Python and Scala shells, and Working with pair RDDs recipes covered the initial steps in Spark and the basics of RDDs.

 

Working with Spark Streaming


Spark Streaming is a library in the Spark ecosystem which addresses real-time processing needs. Spark's batch processing executes the job over large datasets at once, where as Streaming aims for low latency (in hundreds of milliseconds), as data becomes available, it is transformed and processing is done in near real time.

Spark Streaming functions by running jobs over the small batches of data that accumulate in small intervals. It is used for rapid alerting, for supplying dashboards with up-to-date information, as well as for scenarios that need more complex analytics. For example, a common use case in anomaly detection to run K-means clustering on small batches of data and trigger an alert if the cluster center deviates from what is normal.

This recipe shows how to work with Spark Streaming and apply stateless transformations and Windowed transformations.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. There are two types of transformations supported in Spark Streaming: stateless and stateful (windowed). Let's apply the stateless transformations:

           import org.apache.spark.SparkConf 
           import org.apache.spark.SparkContext._ 
           import org.apache.spark.streaming._ 
           import org.apache.spark.streaming.StreamingContext._ 
           import org.apache.spark.streaming.dstream._ 
           import org.apache.spark.SparkConf 
     
           object StatelessTransformations {   
           def main(args:Array[String]) { 
           val conf= new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("StreamingApp") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(5)) 
           val spamInfoRDD = ssc.sparkContext.textFile("/path/fileName", 2) 
           val lines = ssc.socketTextStream("172.22.225.174", 7777) 
           val mapLines = lines.map(ele => ele+"<<<>>>") 
           val mapLines2 = lines.map(ele => (ele,1)) 
           val errorLines = lines.filter(line =>line.contains("Padma")) 
           val flatMapLines = lines.flatMap(ele => ele.split(",")) 
           val reduceByKeyLines = mapLines2.reduceByKey((a,b) => a+b) 
           val groupByKeyLines = mapLines2.groupByKey().mapValues(names =>        
           names.toSet.size) 
           val unionedDstreams = mapLines.union(flatMapLines) 
           val joinedDstream = reduceByKeyLines.join(groupByKeyLines) 
           val cogroupedDStream = reduceByKeyLines.cogroup(groupByKeyLines) 
           val transformLines = lines.transform(rdd =>  
           {rdd.union(spamInfoRDD).filter(_.contains("Padma"))}) 
           errorLines.print 
           mapLines.print 
           flatMapLines.print 
           reduceByKeyLines.print 
           groupByKeyLines.print 
           joinedDstream.print 
           cogroupedDStream.print 
           unionedDstreams.print 
           ssc.start 
           ssc.awaitTermination 
           ssc.stop     
            } 
           } 
    
  2. Now let's apply windowed/stateful transformations:

           object StatefulTransformations { 
           def updateRunningSum(values:Seq[Long], state:Option[Long]) 
           Some(state.getOrElse(0L) + values.size) 
        
           def main(args:Array[String]) 
           {   
           val conf = new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("
           Stateful_transformations") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(1)) 
           val lines = ssc.socketTextStream("172.25.41.66", 7777) 
           val windowedLines = lines.window(Seconds(4),Seconds(2)) 
           val mapLines = lines.map(ele => (ele,1L))    
           val windowCounts = windowedLines.count    
           val countByWindowLines = lines.countByWindow(Seconds(4),
           Seconds(2)) 
           val reducedLines = lines.reduce(_+_) 
           val updateDStream = mapLines.updateStateByKey
           (updateRunningSum _)  
           val mapLines = lines.map(ele => (ele,1)) 
           val reducedKeyWindow  = mapLines.reduceByKeyAndWindow({(x,y)=> 
           x+y}, {(x,y) => x-y}, Seconds(4), Seconds(2)) 
         
            windowedLines.print 
            windowCounts.print 
            reducedKeyWindow.print 
            countByWindowLines.print 
            updateDStream.print 
            reducedLines.print 
            ssc.checkpoint("/home/padma/StreamingCheckPoint/") 
            ssc.start 
            ssc.awaitTermination 
              } 
            } 
    
  3. It's also possible to apply DataFrames, that is, SQL operations on streaming data. The following code snippet shows SQL operations over streams of data:

            import org.apache.spark.streaming.StreamingContext._ 
            import org.apache.spark.streaming.kafka._ 
            import org.apache.spark.sql._ 
            import org.apache.spark.sql.SQLContext 
            ... 
            object StreamingSQL {  
              case class Words(wordName:String, count:Int) 
       
             def main(args:Array[String]) 
             { 
             val conf = new SparkConf 
             conf.setAppName("StreamingSQL").setMaster
             ("spark://master:7077") 
                val sc = new SparkContext(conf) 
                val ssc = new StreamingContext(sc,Seconds(4)) 
                val kafkaParams = Map("test-consumer-group" -> 1) 
                val topicMap = Map("test" -> 1) 
                val kafkaLines =                     
                KafkaUtils.createStream(ssc,"blrovh:2181",
                "test-consumer-group",topicMap) 
                val words = kafkaLines.map{tuple => tuple._2} 
                val wordPairs = words.map(word => (word,1)) 
                val reduceWords = wordPairs.reduceByKey((a,b) => a+b) 
                reduceWords.foreachRDD{ 
                rdd => 
                { 
                  val sqlContext = new SQLContext(rdd.sparkContext) 
                  import sqlContext.implicits._ 
                  val df = rdd.map(record => Words(record._1, record._2)) 
                  val dfNew = sqlContext.createDataFrame(df) 
                  dfNew.registerTempTable("Words") 
                  val data = sqlContext.sql("select wordName from Words") 
                  data.foreach(row => println(row.toString)) 
                  } 
                } 
            ssc.start 
            ssc.awaitTermination  } } 
    

How it works…

The new StreamingContext(SparkContext, Seconds(1)) line instantiates the StreamingContext, which takes SparkContext and batch interval as parameters. StreamingContext.socketTextStream(<ip>,<port-number>) initializes a socket stream, which listens on the specified port for messages. This creates a DStream (discretized stream) which is a sequence of RDDs, being generated for each batch.

When working with windowed/stateful transformations, lines.window(Seconds(4),Seconds(2)) creates a window of 4 seconds and a sliding duration of 2 seconds on the incoming DStream lines. The window-based transformations such as reduceByKeyAndWindow and countByWindow use data or intermediate results from previous batches to compute the results of the current batch. The updateStateByKey transformation constructs DStream (key, state) pairs by the specified function, where this function indicates how to update the state for each key given new values.

In the case of applying SQL operations on streaming data, KafkaUtils.createStream initializes a DStream from Kafka. It takes StreamingContext, zookeeperhostname (blrovh), port-number of zookeeper (2181), consumer-group name (test-consumer-group) and topic map (Map("test" -> 1)) as parameters. The new SQLContext line creates SQLContext and SQLContext.createDataFrame creates a data frame for each RDD of the DStream. Using the DataFrame, the SQL queries are executed.

There's more…

Spark Streaming uses a micro-batch architecture, where the streaming computation is a continuous series of batch computations on small batches of data. For each input source, Spark Streaming launches receivers, which are long-running tasks within an application executor, collects the data, replicates it to another executor for fault tolerance, and saves it as RDDs. Using the Kafka Direct Streaming approach, the Dstream is created as KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kakfaParams, topicSet). To address the fault tolerance, check-pointing is done, which periodically saves the state to a reliable filesystem.

Also, when running SQL queries on streaming data, there is a method for retaining the data for a specific duration before the query can complete. As in Spark batch processing, streams of data can be persisted either in memory or in disk. In the case of stateful operations, the data is, by default, persistent in memory.

See also

For more details on Spark Streaming, internal architecture, check-pointing, performance tuning, and receiver parallelism, please refer to the following:

About the Author
  • Padma Priya Chitturi

    Padma Priya Chitturi is Analytics Lead at Fractal Analytics Pvt Ltd and has over five years of experience in Big Data processing. Currently, she is part of capability development at Fractal and responsible for solution development for analytical problems across multiple business domains at large scale. Prior to this, she worked for an Airlines product on a real-time processing platform serving one million user requests/sec at Amadeus Software Labs. She has worked on realizing large-scale deep networks (Jeffrey deans work in Google brain) for image classification on the big data platform Spark. She works closely with Big Data technologies such as Spark, Storm, Cassandra and Hadoop. She was an open source contributor to Apache Storm.

    Browse publications by this author
Apache Spark for Data Science Cookbook
Unlock this book and the full library FREE for 7 days
Start now