Reader small image

You're reading from  Apache Spark for Data Science Cookbook

Product typeBook
Published inDec 2016
Publisher
ISBN-139781785880100
Edition1st Edition
Concepts
Right arrow
Author (1)
Padma Priya Chitturi
Padma Priya Chitturi
author image
Padma Priya Chitturi

Padma Priya Chitturi is Analytics Lead at Fractal Analytics Pvt Ltd and has over five years of experience in Big Data processing. Currently, she is part of capability development at Fractal and responsible for solution development for analytical problems across multiple business domains at large scale. Prior to this, she worked for an Airlines product on a real-time processing platform serving one million user requests/sec at Amadeus Software Labs. She has worked on realizing large-scale deep networks (Jeffrey deans work in Google brain) for image classification on the big data platform Spark. She works closely with Big Data technologies such as Spark, Storm, Cassandra and Hadoop. She was an open source contributor to Apache Storm.
Read more about Padma Priya Chitturi

Right arrow

Working with Spark Streaming


Spark Streaming is a library in the Spark ecosystem which addresses real-time processing needs. Spark's batch processing executes the job over large datasets at once, where as Streaming aims for low latency (in hundreds of milliseconds), as data becomes available, it is transformed and processing is done in near real time.

Spark Streaming functions by running jobs over the small batches of data that accumulate in small intervals. It is used for rapid alerting, for supplying dashboards with up-to-date information, as well as for scenarios that need more complex analytics. For example, a common use case in anomaly detection to run K-means clustering on small batches of data and trigger an alert if the cluster center deviates from what is normal.

This recipe shows how to work with Spark Streaming and apply stateless transformations and Windowed transformations.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. There are two types of transformations supported in Spark Streaming: stateless and stateful (windowed). Let's apply the stateless transformations:

           import org.apache.spark.SparkConf 
           import org.apache.spark.SparkContext._ 
           import org.apache.spark.streaming._ 
           import org.apache.spark.streaming.StreamingContext._ 
           import org.apache.spark.streaming.dstream._ 
           import org.apache.spark.SparkConf 
     
           object StatelessTransformations {   
           def main(args:Array[String]) { 
           val conf= new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("StreamingApp") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(5)) 
           val spamInfoRDD = ssc.sparkContext.textFile("/path/fileName", 2) 
           val lines = ssc.socketTextStream("172.22.225.174", 7777) 
           val mapLines = lines.map(ele => ele+"<<<>>>") 
           val mapLines2 = lines.map(ele => (ele,1)) 
           val errorLines = lines.filter(line =>line.contains("Padma")) 
           val flatMapLines = lines.flatMap(ele => ele.split(",")) 
           val reduceByKeyLines = mapLines2.reduceByKey((a,b) => a+b) 
           val groupByKeyLines = mapLines2.groupByKey().mapValues(names =>        
           names.toSet.size) 
           val unionedDstreams = mapLines.union(flatMapLines) 
           val joinedDstream = reduceByKeyLines.join(groupByKeyLines) 
           val cogroupedDStream = reduceByKeyLines.cogroup(groupByKeyLines) 
           val transformLines = lines.transform(rdd =>  
           {rdd.union(spamInfoRDD).filter(_.contains("Padma"))}) 
           errorLines.print 
           mapLines.print 
           flatMapLines.print 
           reduceByKeyLines.print 
           groupByKeyLines.print 
           joinedDstream.print 
           cogroupedDStream.print 
           unionedDstreams.print 
           ssc.start 
           ssc.awaitTermination 
           ssc.stop     
            } 
           } 
    
  2. Now let's apply windowed/stateful transformations:

           object StatefulTransformations { 
           def updateRunningSum(values:Seq[Long], state:Option[Long]) 
           Some(state.getOrElse(0L) + values.size) 
        
           def main(args:Array[String]) 
           {   
           val conf = new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("
           Stateful_transformations") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(1)) 
           val lines = ssc.socketTextStream("172.25.41.66", 7777) 
           val windowedLines = lines.window(Seconds(4),Seconds(2)) 
           val mapLines = lines.map(ele => (ele,1L))    
           val windowCounts = windowedLines.count    
           val countByWindowLines = lines.countByWindow(Seconds(4),
           Seconds(2)) 
           val reducedLines = lines.reduce(_+_) 
           val updateDStream = mapLines.updateStateByKey
           (updateRunningSum _)  
           val mapLines = lines.map(ele => (ele,1)) 
           val reducedKeyWindow  = mapLines.reduceByKeyAndWindow({(x,y)=> 
           x+y}, {(x,y) => x-y}, Seconds(4), Seconds(2)) 
         
            windowedLines.print 
            windowCounts.print 
            reducedKeyWindow.print 
            countByWindowLines.print 
            updateDStream.print 
            reducedLines.print 
            ssc.checkpoint("/home/padma/StreamingCheckPoint/") 
            ssc.start 
            ssc.awaitTermination 
              } 
            } 
    
  3. It's also possible to apply DataFrames, that is, SQL operations on streaming data. The following code snippet shows SQL operations over streams of data:

            import org.apache.spark.streaming.StreamingContext._ 
            import org.apache.spark.streaming.kafka._ 
            import org.apache.spark.sql._ 
            import org.apache.spark.sql.SQLContext 
            ... 
            object StreamingSQL {  
              case class Words(wordName:String, count:Int) 
       
             def main(args:Array[String]) 
             { 
             val conf = new SparkConf 
             conf.setAppName("StreamingSQL").setMaster
             ("spark://master:7077") 
                val sc = new SparkContext(conf) 
                val ssc = new StreamingContext(sc,Seconds(4)) 
                val kafkaParams = Map("test-consumer-group" -> 1) 
                val topicMap = Map("test" -> 1) 
                val kafkaLines =                     
                KafkaUtils.createStream(ssc,"blrovh:2181",
                "test-consumer-group",topicMap) 
                val words = kafkaLines.map{tuple => tuple._2} 
                val wordPairs = words.map(word => (word,1)) 
                val reduceWords = wordPairs.reduceByKey((a,b) => a+b) 
                reduceWords.foreachRDD{ 
                rdd => 
                { 
                  val sqlContext = new SQLContext(rdd.sparkContext) 
                  import sqlContext.implicits._ 
                  val df = rdd.map(record => Words(record._1, record._2)) 
                  val dfNew = sqlContext.createDataFrame(df) 
                  dfNew.registerTempTable("Words") 
                  val data = sqlContext.sql("select wordName from Words") 
                  data.foreach(row => println(row.toString)) 
                  } 
                } 
            ssc.start 
            ssc.awaitTermination  } } 
    

How it works…

The new StreamingContext(SparkContext, Seconds(1)) line instantiates the StreamingContext, which takes SparkContext and batch interval as parameters. StreamingContext.socketTextStream(<ip>,<port-number>) initializes a socket stream, which listens on the specified port for messages. This creates a DStream (discretized stream) which is a sequence of RDDs, being generated for each batch.

When working with windowed/stateful transformations, lines.window(Seconds(4),Seconds(2)) creates a window of 4 seconds and a sliding duration of 2 seconds on the incoming DStream lines. The window-based transformations such as reduceByKeyAndWindow and countByWindow use data or intermediate results from previous batches to compute the results of the current batch. The updateStateByKey transformation constructs DStream (key, state) pairs by the specified function, where this function indicates how to update the state for each key given new values.

In the case of applying SQL operations on streaming data, KafkaUtils.createStream initializes a DStream from Kafka. It takes StreamingContext, zookeeperhostname (blrovh), port-number of zookeeper (2181), consumer-group name (test-consumer-group) and topic map (Map("test" -> 1)) as parameters. The new SQLContext line creates SQLContext and SQLContext.createDataFrame creates a data frame for each RDD of the DStream. Using the DataFrame, the SQL queries are executed.

There's more…

Spark Streaming uses a micro-batch architecture, where the streaming computation is a continuous series of batch computations on small batches of data. For each input source, Spark Streaming launches receivers, which are long-running tasks within an application executor, collects the data, replicates it to another executor for fault tolerance, and saves it as RDDs. Using the Kafka Direct Streaming approach, the Dstream is created as KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kakfaParams, topicSet). To address the fault tolerance, check-pointing is done, which periodically saves the state to a reliable filesystem.

Also, when running SQL queries on streaming data, there is a method for retaining the data for a specific duration before the query can complete. As in Spark batch processing, streams of data can be persisted either in memory or in disk. In the case of stateful operations, the data is, by default, persistent in memory.

See also

For more details on Spark Streaming, internal architecture, check-pointing, performance tuning, and receiver parallelism, please refer to the following:

Previous PageNext Chapter
You have been reading a chapter from
Apache Spark for Data Science Cookbook
Published in: Dec 2016Publisher: ISBN-13: 9781785880100
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Padma Priya Chitturi

Padma Priya Chitturi is Analytics Lead at Fractal Analytics Pvt Ltd and has over five years of experience in Big Data processing. Currently, she is part of capability development at Fractal and responsible for solution development for analytical problems across multiple business domains at large scale. Prior to this, she worked for an Airlines product on a real-time processing platform serving one million user requests/sec at Amadeus Software Labs. She has worked on realizing large-scale deep networks (Jeffrey deans work in Google brain) for image classification on the big data platform Spark. She works closely with Big Data technologies such as Spark, Storm, Cassandra and Hadoop. She was an open source contributor to Apache Storm.
Read more about Padma Priya Chitturi