Reader small image

You're reading from  Scala for Data Science

Product typeBook
Published inJan 2016
Reading LevelIntermediate
Publisher
ISBN-139781785281372
Edition1st Edition
Languages
Right arrow
Author (1)
Pascal Bugnion
Pascal Bugnion
author image
Pascal Bugnion

Pascal Bugnion is a data engineer at the ASI, a consultancy offering bespoke data science services. Previously, he was the head of data engineering at SCL Elections. He holds a PhD in computational physics from Cambridge University. Besides Scala, Pascal is a keen Python developer. He has contributed to NumPy, matplotlib and IPython. He also maintains scikit-monaco, an open source library for Monte Carlo integration. He currently lives in London, UK.
Read more about Pascal Bugnion

Right arrow

Chapter 11. Spark SQL and DataFrames

In the previous chapter, we learned how to build a simple distributed application using Spark. The data that we used took the form of a set of e-mails stored as text files.

We learned that Spark was built around the concept of resilient distributed datasets (RDDs). We explored several types of RDDs: simple RDDs of strings, key-value RDDs, and RDDs of doubles. In the case of key-value RDDs and RDDs of doubles, Spark added functionality beyond that of the simple RDDs through implicit conversions. There is one important type of RDD that we have not explored yet: DataFrames (previously called SchemaRDD). DataFrames allow the manipulation of objects significantly more complex than those we have explored to date.

A DataFrame is a distributed tabular data structure, and is therefore very useful for representing and manipulating structured data. In this chapter, we will first investigate DataFrames through the Spark shell, and then use the Ling-spam e-mail dataset...

DataFrames – a whirlwind introduction


Let's start by opening a Spark shell:

$ spark-shell

Let's imagine that we are interested in running analytics on a set of patients to estimate their overall health level. We have measured, for each patient, their height, weight, age, and whether they smoke.

We might represent the readings for each patient as a case class (you might wish to write some of this in a text editor and paste it into the Scala shell using :paste):

scala> case class PatientReadings(
  val patientId: Int,
  val heightCm: Int,
  val weightKg: Int,
  val age:Int,    
  val isSmoker:Boolean  
)
defined class PatientReadings

We would, typically, have many thousands of patients, possibly stored in a database or a CSV file. We will worry about how to interact with external sources later in this chapter. For now, let's just hard-code a few readings directly in the shell:

scala> val readings = List(
  PatientReadings(1, 175, 72, 43, false),
  PatientReadings(2, 182, 78, 28, true...

Aggregation operations


We have seen how to apply an operation to every row in a DataFrame to create a new column, and we have seen how to use filters to build new DataFrames with a sub-set of rows from the original DataFrame. The last set of operations on DataFrames is grouping operations, equivalent to the GROUP BY statement in SQL. Let's calculate the average BMI for smokers and non-smokers. We must first tell Spark to group the DataFrame by a column (the isSmoker column, in this case), and then apply an aggregation operation (averaging, in this case) to reduce each group:

scala> val smokingDF = readingsWithBmiDF.groupBy(
  "isSmoker").agg(avg("BMI"))
smokingDF: org.apache.spark.sql.DataFrame = [isSmoker: boolean, AVG(BMI): double]

This has created a new DataFrame with two columns: the grouping column and the column over which we aggregated. Let's show this DataFrame:

scala> smokingDF.show
+--------+------------------+
|isSmoker|          AVG(BMI)|
+--------+------------------+
...

Joining DataFrames together


So far, we have only considered operations on a single DataFrame. Spark also offers SQL-like joins to combine DataFrames. Let's assume that we have another DataFrame mapping the patient id to a (systolic) blood pressure measurement. We will assume we have the data as a list of pairs mapping patient IDs to blood pressures:

scala> val bloodPressures = List((1 -> 110), (3 -> 100), (4 -> 125))
bloodPressures: List[(Int, Int)] = List((1,110), (3,100), (4,125))

scala> val bloodPressureRDD = sc.parallelize(bloodPressures)
res16: rdd.RDD[(Int, Int)] = ParallelCollectionRDD[74] at parallelize at <console>:24

We can construct a DataFrame from this RDD of tuples. However, unlike when constructing DataFrames from RDDs of case classes, Spark cannot infer column names. We must therefore pass these explicitly to .toDF:

scala> val bloodPressureDF = bloodPressureRDD.toDF(
  "patientId", "bloodPressure")
bloodPressureDF: DataFrame = [patientId: int, bloodPressure...

Custom functions on DataFrames


So far, we have only used built-in functions to operate on DataFrame columns. While these are often sufficient, we sometimes need greater flexibility. Spark lets us apply custom transformations to every row through user-defined functions (UDFs). Let's assume that we want to use the equation that we derived in Chapter 2, Manipulating Data with Breeze, for the probability of a person being male, given their height and weight. We calculated that the decision boundary was given by:

Any person with f > 0 is more likely to be male than female, given their height and weight and the training set used for Chapter 2, Manipulating Data with Breeze (which was based on students, so is unlikely to be representative of the population as a whole). To convert from a height in centimeters to the normalized height, rescaledHeight, we can use this formula:

Similarly, to convert a weight (in kilograms) to the normalized weight, rescaledWeight, we can use:

The average and standard...

DataFrame immutability and persistence


DataFrames, like RDDs, are immutable. When you define a transformation on a DataFrame, this always creates a new DataFrame. The original DataFrame cannot be modified in place (this is notably different to pandas DataFrames, for instance).

Operations on DataFrames can be grouped into two: transformations, which result in the creation of a new DataFrame, and actions, which usually return a Scala type or have a side-effect. Methods like filter or withColumn are transformations, while methods like show or head are actions.

Transformations are lazy, much like transformations on RDDs. When you generate a new DataFrame by transforming an existing DataFrame, this results in the elaboration of an execution plan for creating the new DataFrame, but the data itself is not transformed immediately. You can access the execution plan with the queryExecution method.

When you call an action on a DataFrame, Spark processes the action as if it were a regular RDD: it implicitly...

SQL statements on DataFrames


By now, you will have noticed that many operations on DataFrames are inspired by SQL operations. Additionally, Spark allows us to register DataFrames as tables and query them with SQL statements directly. We can therefore build a temporary database as part of the program flow.

Let's register readingsDF as a temporary table:

scala> readingsDF.registerTempTable("readings")

This registers a temporary table that can be used in SQL queries. Registering a temporary table relies on the presence of a SQL context. The temporary tables are destroyed when the SQL context is destroyed (when we close the shell, for instance).

Let's explore what we can do with our temporary tables and the SQL context. We can first get a list of all the tables currently registered with the context:

scala> sqlContext.tables
DataFrame = [tableName: string, isTemporary: boolean]

This returns a DataFrame. In general, all operations on a SQL context that return data return DataFrames:

scala...

Complex data types – arrays, maps, and structs


So far, all the elements in our DataFrames were simple types. DataFrames support three additional collection types: arrays, maps, and structs.

Structs

The first compound type that we will look at is the struct. A struct is similar to a case class: it stores a set of key-value pairs, with a fixed set of keys. If we convert an RDD of a case class containing nested case classes to a DataFrame, Spark will convert the nested objects to a struct.

Let's imagine that we want to serialize Lords of the Ring characters. We might use the following object model:

case class Weapon(name:String, weaponType:String)
case class LotrCharacter(name:String, val weapon:Weapon)

We want to create a DataFrame of LotrCharacter instances. Let's create some dummy data:

scala> val characters = List(
  LotrCharacter("Gandalf", Weapon("Glamdring", "sword")),
  LotrCharacter("Frodo", Weapon("Sting", "dagger")),
  LotrCharacter("Aragorn", Weapon("Anduril", "sword"))
)
characters...

Interacting with data sources


A major challenge in data science or engineering is dealing with the wealth of input and output formats for persisting data. We might receive or send data as CSV files, JSON files, or through a SQL database, to name a few.

Spark provides a unified API for serializing and de-serializing DataFrames to and from different data sources.

JSON files

Spark supports loading data from JSON files, provided that each line in the JSON file corresponds to a single JSON object. Each object will be mapped to a DataFrame row. JSON arrays are mapped to arrays, and embedded objects are mapped to structs.

This section would be a little dry without some data, so let's generate some from the GitHub API. Unfortunately, the GitHub API does not return JSON formatted as a single object per line. The code repository for this chapter contains a script, FetchData.scala which will download and format JSON entries for Martin Odersky's repositories, saving the objects to a file named odersky_repos...

Standalone programs


So far, we have been using Spark SQL and DataFrames through the Spark shell. To use it in standalone programs, you will need to create it explicitly, from a Spark context:

val conf = new SparkConf().setAppName("applicationName")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Additionally, importing the implicits object nested in sqlContext allows the conversions of RDDs to DataFrames:

import sqlContext.implicits._

We will use DataFrames extensively in the next chapter to manipulate data to get it ready for use with MLlib.

Summary


In this chapter, we explored Spark SQL and DataFrames. DataFrames add a rich layer of abstraction on top of Spark's core engine, greatly facilitating the manipulation of tabular data. Additionally, the source API allows the serialization and de-serialization of DataFrames from a rich variety of data files.

In the next chapter, we will build on our knowledge of Spark and DataFrames to build a spam filter using MLlib.

References


DataFrames are a relatively recent addition to Spark. There is thus still a dearth of literature and documentation. The first port of call should be the Scala docs, available at: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame.

The Scaladocs for operations available on the DataFrame Column type can be found at: http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.Column.

There is also extensive documentation on the Parquet file format: https://parquet.apache.org.

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Scala for Data Science
Published in: Jan 2016Publisher: ISBN-13: 9781785281372
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Pascal Bugnion

Pascal Bugnion is a data engineer at the ASI, a consultancy offering bespoke data science services. Previously, he was the head of data engineering at SCL Elections. He holds a PhD in computational physics from Cambridge University. Besides Scala, Pascal is a keen Python developer. He has contributed to NumPy, matplotlib and IPython. He also maintains scikit-monaco, an open source library for Monte Carlo integration. He currently lives in London, UK.
Read more about Pascal Bugnion