Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Big Data Analytics with Hadoop 3

You're reading from  Big Data Analytics with Hadoop 3

Product type Book
Published in May 2018
Publisher Packt
ISBN-13 9781788628846
Pages 482 pages
Edition 1st Edition
Languages
Concepts
Author (1):
Sridhar Alla Sridhar Alla
Profile icon Sridhar Alla

Table of Contents (18) Chapters

Title Page
Copyright and Credits
Packt Upsell
Contributors
Preface
Introduction to Hadoop Overview of Big Data Analytics Big Data Processing with MapReduce Scientific Computing and Big Data Analysis with Python and Hadoop Statistical Big Data Computing with R and Hadoop Batch Analytics with Apache Spark Real-Time Analytics with Apache Spark Batch Analytics with Apache Flink Stream Processing with Apache Flink Visualizing Big Data Introduction to Cloud Computing Using Amazon Web Services Index

Chapter 6. Batch Analytics with Apache Spark

In this chapter, you will learn about Apache Spark and how to use it for big data analytics based on a batch processing model. Spark SQL is a component on top of Spark Core that can be used to query structured data. It is becoming the de facto tool, replacing Hive as the choice for batch analytics on Hadoop.

Moreover, you will learn how to use Spark for the analysis of structured data (unstructured data such as a document containing arbitrary text, or some other format that has to be transformed into a structured form). We will see how DataFrames/datasets are the cornerstone here, and how SparkSQL's APIs make querying structured data simple yet robust.

We will also introduce datasets and see the difference between datasets, DataFrames, and RDDs. In a nutshell, the following topics will be covered in this chapter:

  • SparkSQL and DataFrames
  • DataFrames and the SQL API
  • DataFrame schema
  • Datasets and encoders
  • Loading and saving data
  • Aggregations
  • Joins

SparkSQL and DataFrames


Before Apache Spark, Apache Hive was the go-to technology whenever anyone wanted to run an SQL-like query on large amount of data. Apache Hive essentially translated an SQL query into MapReduce, like logic automatically making it very easy to perform many kinds of analytics on big data without actually learning to write complex code in Java and Scala. 

With the advent of Apache Spark, there was a paradigm shift in how we could perform analysis at a big data scale. Spark SQL provides an SQL-like layer on top of Apache Spark's distributed computation abilities that is rather simple to use. In fact, Spark SQL can be used as an online analytical processing database. Spark SQL works by parsing the SQL-like statement into an abstract syntax tree (AST), subsequently converting that plan to a logical plan and then optimizing the logical plan into a physical plan that can be executed, as shown in the following diagram:

The final execution uses the underlying DataFrame API, making...

DataFrame APIs and the SQL API


A DataFrame can be created in several ways; some of them are as follows:

  • Execute SQL queries, load external data such as Parquet, JSON, CSV, Text, Hive, JDBC, and so on
  • Convert RDDs to DataFrames
  • Load a CSV file

We will take a look at statesPopulation.csv here, which we will then load as a DataFrame.

The CSV has the following format of the population of US states from the years 2010 to 2016:

State

Year

Population

Alabama

2010

47,85,492

Alaska

2010

714,031

Arizona

2010

64,08,312

Arkansas

2010

2,921,995

California

2010

37,332,685

Since this CSV has a header, we can use it to quickly load into a DataFrame with an implicit schema detection:

scala> val statesDF = spark.read.option("header",
"true").option("inferschema", "true").option("sep",
",").csv("statesPopulation.csv")
statesDF: org.apache.spark.sql.DataFrame = [State: string, Year: int ... 1
more field]

Once we load the DataFrame, it can be examined for the schema:

scala> statesDF.printSchema
root
|-- State: string (nullable ...

Schema – structure of data


A schema is the description of the structure of your data and can be either implicit or explicit. There are two main ways to convert existing RDDs into datasets as the DataFrames are internally based on the RDD; they are as follows:

  • Using reflection to infer the schema of the RDD
  • Through a programmatic interface with the help of which you can take an existing RDD and render a schema to convert the RDD into a dataset with schema

Implicit schema

Let's look at an example of loading a comma-separated values (CSV) file into a DataFrame. Whenever a text file contains a header, the read API can infer the schema by reading the header line. We also have the option to specify the separator to be used to split the text file lines.

We read the csv inferring the schema from the header line and use the comma (,) as the separator. We also show the use of the schema command and the printSchema command to verify the schema of the input file:

scala> val statesDF = spark.read.option...

Loading datasets


Spark SQL can read data from external storage systems such as files, Hive tables, and JDBC databases through the DataFrameReader interface.

The format of the API call is spark.read.inputtype:

  • Parquet
  • CSV
  • Hive table
  • JDBC
  • ORC
  • Text
  • JSON

Let's look at a couple of simple examples of reading CSV files into DataFrames:

scala> val statesPopulationDF = spark.read.option("header",
"true").option("inferschema", "true").option("sep",
",").csv("statesPopulation.csv")
statesPopulationDF: org.apache.spark.sql.DataFrame = [State: string, Year:
int ... 1 more field]
scala> val statesTaxRatesDF = spark.read.option("header",
"true").option("inferschema", "true").option("sep",
",").csv("statesTaxRates.csv")
statesTaxRatesDF: org.apache.spark.sql.DataFrame = [State: string, TaxRate:
double]

Saving datasets


Spark SQL can save data to external storage systems like files, Hive tables and JDBC databases through theDataFrameWriter interface.

Format of the API call isdataframe.write.outputtype:

  • Parquet
  • ORC
  • Text
  • Hive table
  • JSON
  • CSV
  • JDBC

Let's look at couple of examples of writing or saving a DataFrame to a CSV file:

scala> statesPopulationDF.write.option("header",
"true").csv("statesPopulation_dup.csv")
scala> statesTaxRatesDF.write.option("header",
"true").csv("statesTaxRates_dup.csv")

Aggregations


Aggregation is the method of collecting data together based on a condition and performing analytics on the data. Aggregation is very important to make sense of data of all sizes as just having raw records of data is not that useful for most use cases.

Note

Imagine a table containing one temperature measurement per day for every city in the world for five years.

For example, if you see the following table and then the aggregated view of the same data then it is obvious that just raw records do not help you understand the data. Shown below is the raw data in the form of a table:

City

Date 

Temperature

Boston

12/23/2016

32

New York

12/24/2016

36

Boston

12/24/2016

30

Philadelphia

12/25/2016

34

Boston

12/25/2016

28

 

Shown below is the average temperature per city:

City

AverageTemperature

Boston 

30 - (32 + 30 + 28)/3

New York

36

Philadelphia

34

Aggregate functions

Aggregations can be performed with the help of functions that can be found in the org.apache.spark.sql.functions package. In addition to this, custom...

Joins


In traditional databases, joins are used to join one transaction table with another lookup table to generate a more complete view. For example, if you have a table of online transactions sorted by customer ID and another table containing the customer city and customer ID, you can use join to generate reports on the transactions sorted by city.

Transactions table: This table has three columns, the CustomerID, the Purchased item, and how much the customer paid for the item:

CustomerID

Purchased Item

Price Paid

1

Headphones

25.00

2

Watch

20.00

3

Keyboard

20.00

1

Mouse

10.00

4

Cable

10.00

3

Headphones

30.00

Customer Info table: This table has two columns the CustomerID and the City the customer lives in:

Customer ID

City

1

Boston

2

New York

3

Philadelphia

4

Boston

 

Joining the transaction table with the customer info table will generate a view as follows:

Summary


In this chapter, we have discussed the origin of DataFrames and how Spark SQL provides the SQL interface on top of DataFrame. The power of DataFrames is such that the execution times have decreased over the original RDD-based computations. Having such a powerful layer with a simple SQL-like interface makes it all the more powerful. We also looked at various APIs to create and manipulate DataFrames and dug deeper into the sophisticated features of aggregations, including groupBy, Window, rollup, and cubes. Finally, we also looked at the concept of joining datasets and the various types of joins possible such as inner, outer, cross, and so on.

We will explore the exciting world of real-time data processing and analytics in Chapter 7Real-Time Analytics with Apache Spark.

lock icon The rest of the chapter is locked
You have been reading a chapter from
Big Data Analytics with Hadoop 3
Published in: May 2018 Publisher: Packt ISBN-13: 9781788628846
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}

Customer ID

Purchased Item

Price Paid

City

1

Headphone

25.00

Boston

2

Watch

100.00

New York

3

Keyboard

20.00

Philadelphia

1

Mouse

10.00

Boston

4

Cable

10.00

Boston

3

Headphones

30.00

Philadelphia...