Apache Spark 2.x Cookbook

5 (1 reviews total)
By Rishi Yadav
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Getting Started with Apache Spark

About this book

While Apache Spark 1.x gained a lot of traction and adoption in the early years, Spark 2.x delivers notable improvements in the areas of API, schema awareness, Performance, Structured Streaming, and simplifying building blocks to build better, faster, smarter, and more accessible big data applications. This book uncovers all these features in the form of structured recipes to analyze and mature large and complex sets of data.

Starting with installing and configuring Apache Spark with various cluster managers, you will learn to set up development environments. Further on, you will be introduced to working with RDDs, DataFrames and Datasets to operate on schema aware data, and real-time streaming with various sources such as Twitter Stream and Apache Kafka. You will also work through recipes on machine learning, including supervised learning, unsupervised learning & recommendation engines in Spark.

Last but not least, the final few chapters delve deeper into the concepts of graph processing using GraphX, securing your implementations, cluster optimization, and troubleshooting.

Publication date:
May 2017
Publisher
Packt
Pages
294
ISBN
9781787127265

 

Chapter 1. Getting Started with Apache Spark

In this chapter, we will set up Spark and configure it. This chapter contains the following recipes:

  • Leveraging Databricks Cloud
  • Deploying Spark using Amazon EMR
  • Installing Spark from binaries
  • Building the Spark source code with Maven
  • Launching Spark on Amazon EC2
  • Deploying Spark on a cluster in standalone mode
  • Deploying Spark on a cluster with Mesos
  • Deploying Spark on a cluster with YARN
  • Understanding SparkContext and SparkSession
  • Understanding Resilient Distributed Datasets (RDD)
 

Introduction


Apache Spark is a general-purpose cluster computing system to process big data workloads. What sets Spark apart from its predecessors, such as Hadoop MapReduce, is its speed, ease of use, and sophisticated analytics.

It was originally developed at AMPLab, UC Berkeley, in 2009. It was made open source in 2010 under the BSD license and switched to the Apache 2.0 license in 2013. Toward the later part of 2013, the creators of Spark founded Databricks to focus on Spark's development and future releases.

Databricks offers Spark as a service in the Amazon Web Services(AWS) Cloud, called Databricks Cloud. In this book, we are going to maximize the use of AWS as a data storage layer.

Talking about speed, Spark can achieve subsecond latency on big data workloads. To achieve such low latency, Spark makes use of memory for storage. In MapReduce, memory is primarily used for the actual computation. Spark uses memory both to compute and store objects.

Spark also provides a unified runtime connecting to various big data storage sources, such as HDFS, Cassandra, and S3. It also provides a rich set of high-level libraries for different big data compute tasks, such as machine learning, SQL processing, graph processing, and real-time streaming. These libraries make development faster and can be combined in an arbitrary fashion.

Though Spark is written in Scala--and this book only focuses on recipes on Scala--it also supports Java, Python, and R.

Spark is an open source community project, and everyone uses the pure open source Apache distributions for deployments, unlike Hadoop, which has multiple distributions available with vendor enhancements.

The following figure shows the Spark ecosystem:

Spark's runtime runs on top of a variety of cluster managers, including YARN (Hadoop's compute framework), Mesos, and Spark's own cluster manager called Standalone mode. Alluxio is a memory-centric distributed file system that enables reliable file sharing at memory speed across cluster frameworks. In short, it is an off-heap storage layer in memory that helps share data across jobs and users. Mesos is a cluster manager, which is evolving into a data center operating system. YARN is Hadoop's compute framework and has a robust resource management feature that Spark can seamlessly use.

Apache Spark, initially devised as a replacement of MapReduce, had a good proportion of workloads running in an on-premises manner. Now, most of the workloads have been moved to public clouds (AWS, Azure, and GCP). In a public cloud, we see two types of applications:

  • Outcome-driven applications   
  • Data transformation pipelines

For outcome-driven applications, where the goal is to derive a predefined signal/outcome from the given data, Databricks Cloud fits the bill perfectly. For traditional data transformation pipelines, Amazon's Elastic MapReduce (EMR) does a great job. 

 

Leveraging Databricks Cloud


Databricks is the company behind Spark. It has a cloud platform that takes out all of the complexity of deploying Spark and provides you with a ready-to-go environment with notebooks for various languages. Databricks Cloud also has a community edition that provides one node instance with 6 GB of RAM for free. It is a great starting place for developers. The Spark cluster that is created also terminates after 2 hours of sitting idle. 

Note

All the recipes in this book can be run on either the InfoObjects Sandbox or Databricks Cloud community edition. The entire data for the recipes in this book has also been ported to a public bucket called sparkcookbook on S3. Just put these recipes on the Databricks Cloud community edition, and they will work seamlessly. 

 

How to do it...

  1. Click on Sign Up :
  1. Choose COMMUNITY EDITION (or full platform): 
  1.  Fill in the details and you'll be presented with a landing page, as follows:
  1. Click on Clusters, then Create Cluster (showing community edition below it):
  1. Enter the cluster name, for example, myfirstcluster, and choose Availability Zone (more about AZs in the next recipe). Then click on Create Cluster:
  1. Once the cluster is created, the blinking green signal will become solid green, as follows:
  1. Now go to Home and click on Notebook. Choose an appropriate notebook name, for example, config, and choose Scala as the language:
  1. Then set the AWS access parameters. There are two access parameters:
    • ACCESS_KEY: This is referred to as fs.s3n.awsAccessKeyId in SparkContext's Hadoop configuration.
    • SECRET_KEY: This is referred to as  fs.s3n.awsSecretAccessKey in SparkContext's Hadoop configuration.
  2. Set ACCESS_KEY in the config notebook:
        sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<replace  
          with your key>")
  1. Set SECRET_KEY in the config notebook:
        sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","
          <replace with your secret key>")
  1. Load a folder from the sparkcookbook bucket (all of the data for the recipes in this book are available in this bucket:
        val yelpdata = 
          spark.read.textFile("s3a://sparkcookbook/yelpdata")
  1. The problem with the previous approach was that if you were to publish your notebook, your keys would be visible. To avoid the use of this approach, use Databricks File System (DBFS).

Note

DBFS is Databricks Cloud's internal file system. It is a layer above S3, as you can guess. It mounts S3 buckets in a user's workspace as well as caches frequently accessed data on worker nodes. 

  1. Set the access key in the Scala notebook:
        val accessKey = "<your access key>"
  1. Set the secret key in the Scala notebook:
        val secretKey = "<your secret key>".replace("/","%2F")
  1. Set the bucket name in the Scala notebook:
        val bucket = "sparkcookbook"
  1. Set the mount name in the Scala notebook:
        val mount = "cookbook"
  1. Mount the bucket:
        dbutils.fs.mount(s"s3a://$accessKey:[email protected]$bucket",
          s"/mnt/$mount")
  1. Display the contents of the bucket:
        display(dbutils.fs.ls(s"/mnt/$mount"))

Note

The rest of the recipes will assume that you would have set up AWS credentials.

How it works...

Let's look at the key concepts in Databricks Cloud.

Cluster

The concept of clusters is self-evident. A cluster contains a master node and one or more slave nodes. These nodes are EC2 nodes, which we are going to learn more about in the next recipe. 

Notebook

Notebook is the most powerful feature of Databricks Cloud. You can write your code in Scala/Python/R or a simple SQL notebook. These notebooks cover the whole 9 yards. You can use notebooks to write code like a programmer, use SQL like an analyst, or do visualization like a Business Intelligence (BI) expert. 

Table

Tables enable Spark to run SQL queries.

Library

Library is the section where you upload the libraries you would like to attach to your notebooks. The beauty is that you do not have to upload libraries manually; you can simply provide the Maven parameters and it would find the library for you and attach it.

 

Deploying Spark using Amazon EMR


There is a reason why deploying Spark on Amazon EMR is added as one of the first recipes in this edition of the book. The majority of the production deployments of Spark happen on EMR (in fact, the majority, and increasingly so, big data deployments happen on EMR). If you understand this recipe well, you may skip the rest of the recipes in this chapter, unless you are doing an on-premises deployment. 

Note

Since this topic is of paramount importance in the current context, a lot more theory is being provided than what a typical cookbook would have. You can skip the theory section and directly go to the How to do it.. section, but I encourage you not to do so. 

What it represents is much bigger than what it looks

What EMR represents is far more than meets the eye. Most of the enterprise workloads are migrating to public clouds at an accelerated pace. Once migrated, these workloads get rearchitected to leverage cloud-based services as opposed to simply using it as Infrastructure as a Service (IaaS). EC2 is an IaaS compute service of AWS, while EMR is the leading Platform as a Service (PaaS) service of AWS, with more big data workloads running on EMR than the alternatives combined. 

EMR's architecture

Hadoop's core feature is data locality, that is, taking compute to where the data is. AWS disrupts this concept by separating storage and compute. AWS has multiple storage options, including the following:

  • Amazon S3: S3 is general-purpose object storage.
  • Amazon Redshift: This is a distributed cloud data warehouse.
  • Amazon DynamoDB: This is a NoSQL database.
  • Amazon Aurora: This is a cloud-based relational database.

Amazon S3 is the cheapest and most reliable cloud storage available, and this makes it the first choice, unless there is a compelling reason not to do so. EMR also supports attaching elastic block storage (EBS) volumes to compute instances (EC2) in order to provide a lower latency option.

Which option to choose depends upon what type of cluster is being created. There are two types of clusters:

  • Persistent cluster: It runs 24 x 7. Here, there is a continuous analysis of data for use cases such as fraud detection in the financial industry or clickstream analytics in ad tech. For these purposes, HDFS mounted on EBS is a good choice. 
  • Transient cluster: Here, workloads are run inconsistently, for example, genome sequencing or holiday surge in retail. In this case, the cluster is only spawned when needed, making Elastic Map Reduce File System (EMRFS) based on S3 a better choice.  

How to do it...

  1. Log in to https://aws.amazon.com with your credentials.
  2. Click on Services and select/search for EMR: 
  1. Click on Create cluster and select the last option in the Applications option box:
  1.  Click on Create Cluster and the cluster will start as follows:
  1. Once the cluster is created with the given configuration, the My Cluster status will change to Waiting, as shown in the following screenshot:
  1. Now add a step to select the JAR file; it takes the input file from the S3 location and produces the output file and stores it in the desired S3 bucket:
  1. The wordcount step's status will change to completed, indicating a successful completion of the step, as shown in the following screenshot:
  1. The output will be created and stored in the given S3 location. Here, it is in the output folder under the io.playground bucket:

How it works...

Let's look at the options shown in step 3:

  • Cluster name: This is where you provide an appropriate name for the cluster.
  • S3 folder: This is the folder location where the S3 bucket's logs for this cluster will go to.
  • Launch mode: 
    • Cluster: The cluster will continue to run until you terminate it.
    • Step execution: This is to add steps after the application is launched.
  • Software configuration:
    • Vendor: This is Amazon EMI with the open source Hadoop versus MapR's version.
    • Release: This is self-evident.
    • Applications: 
      • Core Hadoop: This is focused on the SQL interface.
      • HBase: This is focused on partial no-SQL-oriented workloads.
      • Presto: This is focused on ad-hoc query processing.
      • Spark: This is focused on Spark.
  • Hardware configuration: 
    • Instance type: This topic will be covered in detail in the next section.
    • Number of instances: This refers to the number of nodes in the cluster. One of them will be the master node and the rest slave nodes.
  • Security and access:
    • EC2 key pair: You can associate an EC2 key pair with the cluster that you can use to connect to it via SSH.
    • Permissions: You can allow other users besides the default Hadoop user to submit jobs.
    • EMR role: This allows EMR to call other AWS services, such as EC2, on your behalf.
    • EC2 instance profile: This provides access to other AWS services, such as S3 and DynamoDB, via the EC2 instances that are launched by EMR.

EC2 instance types

EC2 instances are the most expensive part of a company's AWS bill. So, selecting the right instance type is the key through which you can optimize your bill. The following section is a quick overview of the different instance types. Instance types, both in the cloud and on premises, are defined by four factors:

  • Number of cores
  • Memory
  • Storage (size and type)
  • Network performance

To see a quick illustration of how these factors affect each other, visit http://youtube.com/infoobjects.

In the EC2 world, these factors have been modified slightly to vCPU. vCPU is a virtualized unit of:

  • Memory  
  • Storage (size and type)
  • Network performance

Instance type families are defined by the ratio of these factors, especially vCPU to memory. In a given family, this ratio remains unchanged (T2 excluded). Different instance families serve different purposes, almost like different types of automobiles. In fact, we are going to use the automobile metaphor in this section to illustrate these families.

T2 - Free Tier Burstable (EBS only)

The T2 instance type is a gateway drug in the AWS world, the reason being it belongs to Free Tier. Developers who sign up for AWS get this instance type for up to a year. This tier has six subtypes:

Instance Type

vCPUs

CPU Credit/Hr

Memory (GiB)

t2.micro

1

6

1

t2.small

1

12

2

t2.medium

2

24

4

t2.large

2

36

6

t2.xlarge

4

54

6

t2.2xlarge

8

81

32

M4 - General purpose (EBS only)

M4 is the instance type you use when in doubt. Developers who sign up for AWS get this instance type for up to a year. This tier has six subtypes:

Instance Type

vCPUs

Memory (GiB)

Dedicated Bandwidth

m4.large

2

8

450 mbps

m4.xlarge

4

16

750 mbps

m4.2xlarge

8

32

1,000 mbps

m4.4xlarge

16

64

2,000 mbps

m4.10xlarge

40

160

4,000 mbps

m4.16xlarge

64

256

10,000 mbps

C4 - Compute optimized

This tier has five subtypes:

Instance Type

vCPUs

Memory (GiB)

Dedicated Bandwidth

c4.large

2

3.75

500 mbps

c4.xlarge

4

7.5

750 mbps

c4.2xlarge

8

15

1,000 mbps

c4.4xlarge

16

30

2,000 mbps

c4.8xlarge

36

60

4,000 mbps

X1 - Memory optimized

This tier has two subtypes:

Instance Type

vCPUs

Memory (GiB)

Dedicated Bandwidth

x1.16xlarge

2

8

450 mbps

x1.32xlarge

4

16

750 mbps

R4 - Memory optimized

This tier has six subtypes:

Instance Type

vCPUs

Memory (GiB)

Dedicated Bandwidth

r4.large

2

15.25

10 gbps

r4.xlarge

4

30.5

10 gbps

r4.2xlarge

8

61

10 gbps

r4.4xlarge

16

122

10 gbps

r4.8xlarge

32

244

10 gbps

r4.16xlarge

64

488

20 gbps

P2 - General purpose GPU

This tier has three subtypes:

Instance Type

vCPUs

Memory (GiB)

GPUs

GPU Memory (GiB)

p2.xlarge

4

61

1

12

p2.8xlarge

32

488

8

96

p2.16xlarge

64

732

16

192

I3 - Storage optimized

This tier has six subtypes:

Instance Type

vCPUs

Memory (GiB)

Storage (GB)

i3.large

2

15.25

475 NVMe SSD

i3.xlarge

4

30.5

950 NVMe SSD

i3.2xlarge

8

61

1,900 NVMe SSD

i3.4xlarge

16

122

2x1,900 NVMe SSD

i3.8xlarge

32

244

4x1,900 NVMe SSD

i3.16xlarge

64

488

8x1,900 NVMe SSD

D2 - Storage optimized

This tier is for massively parallel processing (MPP), data warehouse, and so on type usage. This tier has four subtypes:

Instance Type

vCPUs

Memory (GiB)

Storage (GB)

d2.xlarge

4

30.5

3x2000 HDD

d2.2xlarge

8

61

6x2000 HDD

d2.4xlarge

16

122

12x2000 HDD

d2.8xlarge

32

244

24x2000 HDD

 

Installing Spark from binaries


You can build Spark from the source code, or you can download precompiled binaries from http://spark.apache.org. For a standard use case, binaries are good enough, and this recipe will focus on installing Spark using binaries.

Getting ready

At the time of writing, Spark's current version is 2.1. Please check the latest version from Spark's download page at http://spark.apache.org/downloads.html. Binaries are developed with the most recent and stable version of Hadoop. To use a specific version of Hadoop, the recommended approach is that you build it from sources, which we will cover in the next recipe.

All the recipes in this book are developed using Ubuntu Linux, but they should work fine on any POSIX environment. Spark expects Java to be installed and the JAVA_HOME environment variable set.

In Linux/Unix systems, there are certain standards for the location of files and directories, which we are going to follow in this book. The following is a quick cheat sheet:

Directory

Description

/bin

This stores essential command binaries

/etc

This is where host-specific system configurations are located

/opt

This is where add-on application software packages are located

/var

This is where variable data is located

/tmp

This stores the temporary files

/home

This is where user home directories are located

How to do it…

Here are the installation steps:

  1. Open the terminal and download the binaries using the following command:
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
  1. Unpack the binaries:
$ tar -zxf spark-2.1.0-bin-hadoop2.7.tgz
  1. Rename the folder containing the binaries by stripping the version information:
$ sudo mv spark-2.1.0-bin-hadoop2.7 spark
  1. Move the configuration folder to the /etc folder so that it can be turned into a symbolic link later:
$ sudo mv spark/conf/* /etc/spark
  1. Create your company-specific installation directory under /opt. As the recipes in this book are tested on the infoobjects sandbox, use infoobjects as the directory name. Create the /opt/infoobjects directory:
$ sudo mkdir -p /opt/infoobjects
  1. Move the spark directory to /opt/infoobjects, as it's an add-on software package:
$ sudo mv spark /opt/infoobjects/
  1. Change the permissions of the spark home directory, namely 0755 = user:read-write-execute group:read-execute world:read-execute:
$ sudo chmod -R 755 /opt/infoobjects/spark
  1. Move to the spark home directory:
$ cd /opt/infoobjects/spark
  1. Create the symbolic link:
$ sudo ln -s /etc/spark conf
  1. Append Spark binaries path to PATH in .bashrc:
$ echo "export PATH=$PATH:/opt/infoobjects/spark/bin" >> /home/hduser/.bashrc
  1. Open a new terminal.
  2. Create the log directory in /var:
$ sudo mkdir -p /var/log/spark
  1. Make hduser the owner of Spark's log directory:
$ sudo chown -R hduser:hduser /var/log/spark
  1. Create Spark's tmp directory:
$ mkdir /tmp/spark
  1. Configure Spark with the help of the following command lines:
     $ cd /etc/spark
     $ echo "export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop" >> spark-env.sh
     $ echo "export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop" >> spark-env.sh
     $ echo "export SPARK_LOG_DIR=/var/log/spark" >> spark-env.sh
     $ echo "export SPARK_WORKER_DIR=/tmp/spark" >> spark-env.sh
  1. Change the ownership of the spark home directory to root:
$ sudo chown -R root:root /opt/infoobjects/spark
 

Building the Spark source code with Maven


Installing Spark using binaries works fine in most cases. For advanced cases, such as the following (but not limited to), compiling from the source code is a better option:

  • Compiling for a specific Hadoop version
  • Adding the Hive integration
  • Adding the YARN integration

Getting ready

The following are the prerequisites for this recipe to work:

  • Java 1.8 or a later version
  • Maven 3.x

How to do it...

The following are the steps to build the Spark source code with Maven:

  1. Increase MaxPermSize of the heap:
$ echo "export _JAVA_OPTIONS="-XX:MaxPermSize=1G""  >> 
         /home/hduser/.bashrc
  1. Open a new terminal window and download the Spark source code from GitHub:
$ wget https://github.com/apache/spark/archive/branch-2.1.zip
  1. Unpack the archive:
$ unzip branch-2.1.zip
  1. Rename unzipped folder to spark:
        $ mv spark-branch-2.1 spark
  1. Move to the spark directory:
$ cd spark
  1. Compile the sources with the YARN-enabled, Hadoop version 2.7, and Hive-enabled flags and skip the tests for faster compilation:
$ mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -
        DskipTests clean package
  1. Move the conf folder to the etc folder so that it can be turned into a symbolic link:
$ sudo mv spark/conf /etc/
  1. Move the spark directory to /opt as it's an add-on software package:
$ sudo mv spark /opt/infoobjects/spark
  1. Change the ownership of the spark home directory to root:
$ sudo chown -R root:root /opt/infoobjects/spark
  1. Change the permissions of the spark home directory, namely 0755 = user:rwx group:r-x world:r-x:
$ sudo chmod -R 755 /opt/infoobjects/spark
  1. Move to the spark home directory:
$ cd /opt/infoobjects/spark
  1. Create a symbolic link:
$ sudo ln -s /etc/spark conf
  1. Put the Spark executable in the path by editing .bashrc:
$ echo "export PATH=$PATH:/opt/infoobjects/spark/bin" >> 
          /home/hduser/.bashrc
  1. Create the log directory in /var:
$ sudo mkdir -p /var/log/spark
  1. Make hduser the owner of Spark's log directory:
$ sudo chown -R hduser:hduser /var/log/spark
  1. Create Spark's tmp directory:
$ mkdir /tmp/spark
  1. Configure Spark with the help of the following command lines:
$ cd /etc/spark
$ echo "export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop" 
       >> spark-env.sh
$ echo "export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop" 
       >> spark-env.sh
$ echo "export SPARK_LOG_DIR=/var/log/spark" >> spark-env.sh
$ echo "export SPARK_WORKER_DIR=/tmp/spark" >> spark-env.sh
 

Launching Spark on Amazon EC2


Amazon Elastic Compute Cloud (Amazon EC2) is a web service that provides resizable compute instances in the cloud. Amazon EC2 provides the following features:

  • On-demand delivery of IT resources via the Internet
  • Provisioning of as many instances as you like
  • Payment for the hours during which you use instances, such as your utility bill
  • No setup cost, no installation, and no overhead at all
  • Shutting down or terminating instances when you no longer need them
  • Making such instances available on familiar operating systems

EC2 provides different types of instances to meet all your compute needs, such as general-purpose instances, microinstances, memory-optimized instances, storage-optimized instances, and others. They also have a Free Tier of microinstances for trial purposes.

Getting ready

The spark-ec2 script comes bundled with Spark and makes it easy to launch, manage, and shut down clusters on Amazon EC2.

Before you start, do the following things: log in to the Amazon AWS account via http://aws.amazon.com.

  1. Click on Security Credentials under your account name in the top-right corner.
  2. Click on Access Keys and Create New Access Key:
  1. Download the key file (let's save it in the /home/hduser/kp folder as spark-kp1.pem).
  2. Set permissions on the key file to 600.
  3. Set environment variables to reflect access key ID and secret access key (replace the sample values with your own values):
$ echo "export AWS_ACCESS_KEY_ID="AKIAOD7M2LOWATFXFKQ"" >> /home/hduser/.bashrc
$ echo "export 
       AWS_SECRET_ACCESS_KEY="+Xr4UroVYJxiLiY8DLT4DLT4D4sxc3ijZGMx1D3pfZ2q"" >> 
         /home/hduser/.bashrc
$ echo "export PATH=$PATH:/opt/infoobjects/spark/ec2" >> /home/hduser/.bashrc

How to do it…

  1. Spark comes bundled with scripts to launch the Spark cluster on Amazon EC2. Let's launch the cluster using the following command:
$ cd /home/hduser
$ spark-ec2 -k <key-pair> -i <key-file> -s <num-slaves> launch <cluster-name>
<key-pair> - name of EC2 keypair created in AWS
<key-file> the private key file you downloaded
<num-slaves> number of slave nodes to launch
<cluster-name> name of the cluster
  1. Launch the cluster with the example value:
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem --hadoop-major-
          version 2  -s 3 launch spark-cluster
  1. Sometimes, the default availability zones are not available; in that case, retry sending the request by specifying the specific availability zone you are requesting:
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -z us-east-1b --
          hadoop-major-version 2  -s 3 launch spark-cluster
  1. If your application needs to retain data after the instance shuts down, attach EBS volume to it (for example, 10 GB space):
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem --hadoop-major-
          version 2 -ebs-vol-size 10 -s 3 launch spark-cluster
  1. If you use Amazon's spot instances, here is the way to do it:
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -spot-price=0.15 
          --hadoop-major-version 2  -s 3 launch spark-cluster

Note

Spot instances allow you to name your own price for Amazon EC2's computing capacity. You simply bid for spare Amazon EC2 instances and run them whenever your bid exceeds the current spot price, which varies in real time and is based on supply and demand (source: www.amazon.com).

  1. After completing the preceding launch process, check the status of the cluster by going to the webUI URL that will be printed at the end:
  1. Check the status of the cluster:
  1. Now, to access the Spark cluster on EC2, connect to the master node using secure shell protocol (SSH):
$ spark-ec2 -k spark-kp1 -i /home/hduser/kp/spark-kp1.pem  login spark-cluster
  1. The following image illustrates the result you'll get:
  1. Check the directories in the master node and see what they do:

Directory

Description

ephemeral-hdfs

This is the Hadoop instance for which data is ephemeral and gets deleted when you stop or restart the machine.

persistent-hdfs

Each node has a very small amount of persistent storage (approximately 3 GB). If you use this instance, data will be retained in that space.

hadoop-native

This refers to the native libraries that support Hadoop, such as snappy compression libraries.

scala

This refers to the Scala installation.

shark

This refers to the Shark installation (Shark is no longer supported and is replaced by Spark SQL).

spark

This refers to the Spark installation.

spark-ec2

This refers to the files that support this cluster deployment.

tachyon

This refers to the Tachyon installation.

 

  1. Check the HDFS version in an ephemeral instance:
$ ephemeral-hdfs/bin/hadoop version
Hadoop 2.0.0-chd4.2.0
  1. Check the HDFS version in a persistent instance with the following command:
$ persistent-hdfs/bin/hadoop version
Hadoop 2.0.0-chd4.2.0
  1. Change the configuration level of the logs:
$ cd spark/conf
  1. The default log level information is too verbose, so let's change it to Error:
  •  Create the log4.properties file by renaming the template:
$ mv log4j.properties.template log4j.properties
  • Open log4j.properties in vi or your favorite editor:
$ vi log4j.properties
  • Change the second line from | log4j.rootCategory=INFO, console to | log4j.rootCategory=ERROR, console.
  1. Copy the configuration to all the slave nodes after the change:
$ spark-ec2/copydir spark/conf
  1. You should get something like this:
  1. Destroy the Spark cluster:
$ spark-ec2 destroy spark-cluster
 

Deploying Spark on a cluster in standalone mode


Compute resources in a distributed environment need to be managed so that resource utilization is efficient and every job gets a fair chance to run. Spark comes with its own cluster manager, which is conveniently called standalone mode. Spark also supports working with YARN and Mesos cluster managers.

The cluster manager you choose should be mostly driven by both legacy concerns and whether other frameworks, such as MapReduce, share the same compute resource pool. If your cluster has legacy MapReduce jobs running and all of them cannot be converted into Spark jobs, it is a good idea to use YARN as the cluster manager. Mesos is emerging as a data center operating system to conveniently manage jobs across frameworks, and it is very compatible with Spark.

If the Spark framework is the only framework in your cluster, then the standalone mode is good enough. As Spark is evolving as a technology, you will see more and more use cases of Spark being used as the standalone framework, serving all your big data compute needs. For example, some jobs may use Apache Mahout at present because MLlib does not have a specific machine-learning library, which the job needs. As soon as MLlib gets its library, this particular job can be moved to Spark.

Getting ready

Let's consider a cluster of six nodes as an example setup--one master and five slaves (replace them with the actual node names in your cluster):

Master
m1.zettabytes.com
Slaves
s1.zettabytes.com
s2.zettabytes.com
s3.zettabytes.com
s4.zettabytes.com
s5.zettabytes.com

How to do it…

  1. Since Spark's standalone mode is the default, all you need to do is have Spark binaries installed on both master and slave machines. Put /opt/infoobjects/spark/sbin in the path on every node:
$ echo "export PATH=$PATH:/opt/infoobjects/spark/sbin" >> /home/hduser/.bashrc
  1. Start the standalone master server (SSH to master first):
[email protected]~] start-master.sh

Note

Master, by default, starts on port 7077, which slaves use to connect to it. It also has a web UI at port 8088.

 

  1. Connect to the master node using a Secure Shell (SSH) connection and then start the slaves:
[email protected]~] spark-class org.apache.spark.deploy.worker.Worker 
          spark://m1.zettabytes.com:7077

Argument 

Meaning

-h <ipaddress/HOST> and--host <ipaddress/HOST>

IP address/DNS service to listen on

-p <port> and --port <port>

Port for the service to listen on

--webui-port <port>

This is the port for the web UI (by default, 8080 is for the master and 8081 for the worker)

-c <cores> and --cores <cores>

These refer to the total CPU core Spark applications that can be used on a machine (worker only)

-m <memory> and --memory <memory>

These refer to the total RAM Spark applications that can be used on a machine (worker only)

-d <dir> and --work-dir <dir>

These refer to the directory to use for scratch space and job output logs

Note

For fine-grained configuration, the above parameters work with both master and slaves. Rather than manually starting master and slave daemons on each node, it can also be accomplished using cluster launch scripts. Cluster launch scripts are outside the scope of this book. Please refer to books about Chef or Puppet.

  1. First, create the conf/slaves file on a master node and add one line per slave hostname (using an example of five slave nodes, replace the following slave DNS with the DNS of the slave nodes in your cluster):
[email protected]~] echo "s1.zettabytes.com" >> conf/slaves
[email protected]~] echo "s2.zettabytes.com" >> conf/slaves
[email protected]~] echo "s3.zettabytes.com" >> conf/slaves
[email protected]~] echo "s4.zettabytes.com" >> conf/slaves
[email protected]~] echo "s5.zettabytes.com" >> conf/slaves

Once the slave machine is set up, you can call the following scripts to start/stop the cluster:

Script name

Purpose

start-master.sh

Starts a master instance on the host machine

start-slaves.sh

Starts a slave instance on each node of the slaves file

start-all.sh

Starts both the master and slaves

stop-master.sh

Stops the master instance on the host machine

stop-slaves.sh

Stops the slave instance on all the nodes of the slaves file

stop-all.sh

Stops both the master and slaves

  1. Connect an application to the cluster through Scala code:
        val sparkContext = new SparkContext(new 
          SparkConf().setMaster("spark://m1.zettabytes.com:7077")Setting master URL for 
            spark-shell
  1. Connect to the cluster through Spark shell:
$ spark-shell --master spark://master:7077

How it works…

In standalone mode, Spark follows the master-slave architecture, very much like Hadoop, MapReduce, and YARN. The compute master daemon is called Spark master and runs on one master node. Spark master can be made highly available using ZooKeeper. You can also add more standby masters on the fly if needed.

The compute slave daemon is called a worker, and it exists on each slave node. The worker daemon does the following:

  • Reports the availability of the compute resources on a slave node, such as the number of cores, memory, and others, to the Spark master
  • Spawns the executor when asked to do so by the Spark master
  • Restarts the executor if it dies

There is, at most, one executor per application, per slave machine.

Both Spark master and the worker are very lightweight. Typically, memory allocation between 500 MB to 1 GB is sufficient. This value can be set in conf/spark-env.sh by setting the SPARK_DAEMON_MEMORY parameter. For example, the following configuration will set the memory to 1 gigabits for both the master and worker daemon. Make sure you have sudo as the super user before running it:

$ echo "export SPARK_DAEMON_MEMORY=1g" >> /opt/infoobjects/spark/conf/spark-env.sh

By default, each slave node has one worker instance running on it. Sometimes, you may have a few machines that are more powerful than others. In that case, you can spawn more than one worker on that machine with the following configuration (only on those machines):

$ echo "export SPARK_WORKER_INSTANCES=2" >> /opt/infoobjects/spark/conf/spark-env.sh

The Spark worker, by default, uses all the cores on the slave machine for its executors. If you would like to limit the number of cores the worker could use, you can set it to the number of your choice (for example, 12), using the following configuration:

$ echo "export SPARK_WORKER_CORES=12" >> /opt/infoobjects/spark/conf/spark-env.sh

The Spark worker, by default, uses all of the available RAM (1 GB for executors). Note that you cannot allocate how much memory each specific executor will use (you can control this from the driver configuration). To assign another value to the total memory (for example, 24 GB) to be used by all the executors combined, execute the following setting:

$ echo "export SPARK_WORKER_MEMORY=24g" >> /opt/infoobjects/spark/conf/spark-env.sh

There are some settings you can do at the driver level:

  • To specify the maximum number of CPU cores to be used by a given application across the cluster, you can set the spark.cores.max configuration in Spark submit  or  Spark shellas follows:
$ spark-submit --conf spark.cores.max=12
  • To specify the amount of memory that each executor should be allocated (the minimum recommendation is 8 GB), you can set the spark.executor.memory configuration in Spark submit or Spark shell as follows:
$ spark-submit --conf spark.executor.memory=8g

The following diagram depicts the high-level architecture of a Spark cluster:

See also

To find more configuration options, refer to the following URL:

 

Deploying Spark on a cluster with Mesos


Mesos is slowly emerging as a data center operating system for managing all the compute resources across a data center. Mesos runs on any computer running the Linux operating system. It is built using the same principles as the Linux kernel. Let's see how we can install Mesos.

How to do it…

Mesosphere provides a binary distribution of Mesos. The most recent package of the Mesos distribution can be installed from the Mesosphere repositories by performing the following steps:

  1. Execute Mesos on a Ubuntu OS with the trusty version:
$ sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF 
          DISTRO=$(lsb_release -is | tr '[:upper:]' '[:lower:]') CODENAME=$(lsb_release 
            -cs)
$ sudo vi /etc/apt/sources.list.d/mesosphere.list
deb http://repos.mesosphere.io/Ubuntu trusty main
  1. Update the repositories:
$ sudo apt-get -y update
  1. Install Mesos:
$ sudo apt-get -y install mesos
  1. To connect Spark to Mesos and to integrate Spark with Mesos, make Spark binaries available to Mesos and configure the Spark driver to connect to Mesos.
  2. Use the Spark binaries from the first recipe and upload them to HDFS:
$ hdfs dfs -put spark-2.1.0-bin-hadoop2.7.tgz spark-2.1.0-bin-hadoop2.7.tgz
  1. The master URL of a single master Mesos is mesos://host:5050; the master URL of a ZooKeeper-managed Mesos cluster is mesos://zk://host:2181.
  2. Set the following variables in spark-env.sh:
$ sudo vi spark-env.sh
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI= hdfs://localhost:9000/user/hduser/spark-2.1.0-bin-
          hadoop2.7.tgz
  1. Run the following commands from the Scala program:
Val conf = new SparkConf().setMaster("mesos://host:5050")
Val sparkContext = new SparkContext(conf)
  1. Run the following command from the Spark shell:
$ spark-shell --master mesos://host:5050

Note

Mesos has two run modes:

  • Fine-grained: In the fine-grained (default) mode, every Spark task runs as a separate Mesos task. 
  • Coarse-grained: This mode will launch only one long-running Spark task on each Mesos machine
  1. To run in the coarse-grained mode, set the spark.mesos.coarse property:
Conf.set("spark.mesos.coarse","true")
 

Deploying Spark on a cluster with YARN


Yet Another Resource Negotiator (YARN) is Hadoop's compute framework that runs on top of HDFS, which is Hadoop's storage layer.

YARN follows the master-slave architecture. The master daemon is called ResourceManager and the slave daemon is called NodeManager. Besides this application, life cycle management is done by ApplicationMaster, which can be spawned on any slave node and would be alive during the lifetime of an application.

When Spark is run on YARN, ResourceManager performs the role of the Spark master and NodeManagers works as executor nodes.

While running Spark with YARN, each Spark executor is run as a YARN container.

Getting ready

Running Spark on YARN requires a binary distribution of Spark that has YARN support. In both the Spark installation recipes, we have taken care of this.

How to do it…

  1. To run Spark on YARN, the first step is to set the configuration:
HADOOP_CONF_DIR: to write to HDFS
YARN_CONF_DIR: to connect to YARN ResourceManager
$ cd /opt/infoobjects/spark/conf (or /etc/spark)
$ sudo vi spark-env.sh
export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop
export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop
  • You can see this in the following screenshot:
  1. The following command launches YARN Spark in the yarn-client mode:
$ spark-submit --class path.to.your.Class --master yarn --deploy-mode client 
          [options] <app jar> [app options]

Here's an example:

$ spark-submit --class com.infoobjects.TwitterFireHose --master yarn --deploy-
          mode client --num-executors 3 --driver-memory 4g --executor-memory 2g --
            executor-cores 1 target/sparkio.jar 10
  1. The following command launches Spark shell in the yarn-client mode:
$ spark-shell --master yarn --deploy-mode client 
  1. The command to launch the spark application in the yarn-cluster mode is as follows:
$ spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster 
          [options] <app jar> [app options]

Here's an example:

$ spark-submit --class com.infoobjects.TwitterFireHose --master yarn --deploy-
          mode cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --
            executor-cores 1 target/sparkio.jar 10

How it works…

Spark applications on YARN run in two modes:

  • yarn-client: Spark Driver runs in the client process outside of the YARN cluster, and ApplicationMaster is only used to negotiate the resources from ResourceManager.
  • yarn-cluster: Spark Driver runs in ApplicationMaster, spawned by NodeManager on a slave node.

The yarn-cluster mode is recommended for production deployments, while the yarn-client mode is good for development and debugging, where you would like to see the immediate output. There is no need to specify the Spark master in either mode as it's picked from the Hadoop configuration, and the master parameter is either yarn-client or yarn-cluster.

The following figure shows how Spark is run with YARN in the client mode:

The following figure shows how Spark is run with YARN in the cluster mode:

In the YARN mode, the following configuration parameters can be set:

  • --num-executors: To configure how many executors will be allocated
  • --executor-memory: RAM per executor
  • --executor-cores: CPU cores per executor
 

Understanding SparkContext and SparkSession


SparkContext and SparkSession are the entry points into the world of Spark, so it is important you understand both well. 

SparkContext

SparkContext is the first object that a Spark program must create to access the cluster. In spark-shell, it is directly accessible via spark.sparkContext. Here's how you can programmatically create SparkContext in your Scala code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("my app").setMaster("master url")
new SparkContext(conf)

SparkSession

SparkContext, though still supported, was more relevant in the case of RDD (covered in the next recipe). As you will see in the rest of the book, different libraries have different wrappers around SparkContext, for example, HiveContext/SQLContext for Spark SQL, StreamingContext for Streaming, and so on. As all the libraries are moving toward DataSet/DataFrame, it makes sense to have a unified entry point for all these libraries as well, and that is SparkSession. SparkSession is available as spark in the spark-shell. Here's how you do it:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val sparkSession = SparkSession.builder.master("master url").appName("my app").getOrCreate()
 

Understanding resilient distributed dataset - RDD


Though RDD is getting replaced with DataFrame/DataSet-based APIs, there are still a lot of APIs that have not been migrated yet. In this recipe, we will look at how the concept of lineage works in RDD.

Externally, RDD is a distributed, immutable collection of objects. Internally, it consists of the following five parts:

  • Set of partitions (rdd.getPartitions)
  • List of dependencies on parent RDDs (rdd.dependencies)
  • Function to compute a partition, given its parents
  • Partitioner, which is optional (rdd.partitioner)
  • Preferred location of each partition, which is optional (rdd.preferredLocations)

The first three are needed for an RDD to be recomputed in case data is lost. When combined, it is called lineage. The last two parts are optimizations.

A set of partitions is how data is divided into nodes. In the case of HDFS, it means InputSplits, which are mostly the same as the block (except when a record crosses block boundaries; in that case, it will be slightly bigger than a block).

How to do it...

Let's revisit our word count example to understand these five parts. This is how an RDD graph looks for wordCount at the dataset level view:

Basically, this is how the flow goes:

  1. Load the words folder as an RDD:
scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")

The following are the five parts of the words RDD:

Part

Description

Partitions

One partition per HDFS inputsplit/block (org.apache.spark.rdd.HadoopPartition)

Dependencies

None

Compute function

To read the block

Preferred location

The HDFS block's location

Partitioner

None

  1. Tokenize the words of the words RDD with each word on a separate line:
scala> val wordsFlatMap = words.flatMap(_.split("W+"))

The following are the five parts of the wordsFlatMap RDD:

Part

Description

Partitions

Same as the parent RDD, that is, words (org.apache.spark.rdd.HadoopPartition)

Dependencies

Same as the parent RDD, that is, words (org.apache.spark.OneToOneDependency)

Compute function

To compute the parent and split each element, which flattens the results

Preferred location

Ask parent RDD

Partitioner

None

  1. Transform each word in the wordsFlatMap RDD into the (word,1) tuple:
scala> val wordsMap = wordsFlatMap.map( w => (w,1))

The following are the five parts of the wordsMap RDD:

Part

Description

Partitions

Same as the parent RDD, that is, wordsFlatMap (org.apache.spark.rdd.HadoopPartition)

Dependencies

Same as the parent RDD, that is, wordsFlatMap (org.apache.spark.OneToOneDependency)

Compute function

To compute the parent and map it to PairRDD

Preferred Location

Ask parent RDD

Partitioner

None

  1. Reduce all the values of a given key and sum them up:
scala> val wordCount = wordsMap.reduceByKey(_+_)

The following are the five parts of the wordCount RDD:

Part

Description

Partitions

One per reduce task (org.apache.spark.rdd.ShuffledRDDPartition)

Dependencies

Shuffle dependency on each parent (org.apache.spark.ShuffleDependency)

Compute function

To perform additions on shuffled data

Preferred location

None

Partitioner

HashPartitioner (org.apache.spark.HashPartitioner)

This is how an RDD graph of wordcount looks at the partition level view:

About the Author

  • Rishi Yadav

    Rishi Yadav has 19 years of experience in designing and developing enterprise applications. He is an open source software expert and advises American companies on big data and public cloud trends. Rishi was honored as one of Silicon Valley's 40 under 40 in 2014. He earned his bachelor's degree from the prestigious Indian Institute of Technology, Delhi, in 1998.

    About 12 years ago, Rishi started InfoObjects, a company that helps data-driven businesses gain new insights into data. InfoObjects combines the power of open source and big data to solve business challenges for its clients and has a special focus on Apache Spark. The company has been on the Inc. 5000 list of the fastest growing companies for 6 years in a row. InfoObjects has also been named the best place to work in the Bay Area in 2014 and 2015.

    Rishi is an open source contributor and active blogger.

    This book is dedicated to my parents, Ganesh and Bhagwati Yadav; I would not be where I am without their unconditional support, trust, and providing me the freedom to choose a path of my own.

    Special thanks go to my life partner, Anjali, for providing immense support and putting up with my long, arduous hours (yet again).
    Our 9-year-old son, Vedant, and niece, Kashmira, were the unrelenting force behind keeping me and the book on track.

    Big thanks to InfoObjects' CTO and my business partner, Sudhir Jangir, for providing valuable feedback and also contributing with recipes on enterprise security, a topic he is passionate about; to our SVP, Bart Hickenlooper, for taking the charge in leading the company to the next level; to Tanmoy Chowdhury and Neeraj Gupta for their valuable advice; to Yogesh Chandani, Animesh Chauhan, and Katie Nelson for running operations skillfully so that I could focus on this book; and to our internal review team (especially Rakesh Chandran) for ironing out the kinks. I would also like to thank Marcel Izumi for, as always, providing creative visuals. I cannot miss thanking our dog, Sparky, for giving me company on my long nights out. Last but not least, special thanks to our valuable clients, partners, and employees, who have made InfoObjects the best place to work at and, needless to say, an immensely successful organization.

    Browse publications by this author

Latest Reviews

(1 reviews total)
Highly recommended. Includes everything I was looking for and even more.

Recommended For You

Book Title
Unlock this full book FREE 10 day trial
Start Free Trial