Large Scale Machine Learning with Spark

4 (1 reviews total)
By Md. Rezaul Karim , Md. Mahedi Kaysar
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Introduction to Data Analytics with Spark

About this book

Data processing, implementing related algorithms, tuning, scaling up and finally deploying are some crucial steps in the process of optimising any application.

Spark is capable of handling large-scale batch and streaming data to figure out when to cache data in memory and processing them up to 100 times faster than Hadoop-based MapReduce.This means predictive analytics can be applied to streaming and batch to develop complete machine learning (ML) applications a lot quicker, making Spark an ideal candidate for large data-intensive applications.

This book focuses on design engineering and scalable solutions using ML with Spark. First, you will learn how to install Spark with all new features from the latest Spark 2.0 release. Moving on, you’ll explore important concepts such as advanced feature engineering with RDD and Datasets. After studying developing and deploying applications, you will see how to use external libraries with Spark.

In summary, you will be able to develop complete and personalised ML applications from data collections,model building, tuning, and scaling up to deploying on a cluster or the cloud.

Publication date:
October 2016
Publisher
Packt
Pages
476
ISBN
9781785888748

 

Chapter 1. Introduction to Data Analytics with Spark

This chapter covers an overview of Apache Spark, its computing paradigm, and installation to getting started. It will briefly describe the main components of Spark and focus on its new computing advancements. A description of the Resilient Distributed Datasets (RDD) and Dataset will be discussed as a base knowledge for the rest of this book. It will then focus on the Spark machine learning libraries. Installing and packaging a simple machine learning application with Spark and Maven will be demonstrated then before getting on board. In a nutshell, the following topics will be covered in this chapter:

  • Spark overview

  • New computing paradigm with Spark

  • Spark ecosystem

  • Spark machine learning libraries

  • Installing and getting started with Spark

  • Packaging your application with dependencies

  • Running a simple machine learning application

 

Spark overview


This section describes Spark (https://spark.apache.org/) basics followed by the issues with the traditional parallel and distributed computing, then how Spark was evolved, and it then brings a new computing paradigm across the big data processing and analytics on top of that. In addition, we also presented some exciting features of Spark that easily attract the big data engineers, data scientists, and researchers, including:

  • Simplicity of data processing and computation

  • Speed of computation

  • Scalability and throughput across large-scale datasets

  • Sophistication across diverse data types

  • Ease of cluster computing and deployment with different cluster managers

  • Working capabilities and supports with various big data storage and sources

  • Diverse APIs are written in widely used and emerging programming languages

Spark basics

Before praising Spark and its many virtues, a short overview is in the mandate. Apache Spark is a fast, in-memory, big data processing, and general-purpose cluster computing framework with a bunch of sophisticated APIs for advanced data analytics. Unlike the Hadoop-based MapReduce, which is only suited for batch jobs in speed and ease of use, Spark could be considered as a general execution engine that is suitable for applying advanced analytics on both static (batch) as well as real-time data:

  • Spark was originally developed at the University of California, Berkeley's AMPLab based on Resilient Distributed Datasets (RDDs), which provides a fault-tolerant abstraction for in-memory cluster computing facilities. However, later on Spark's code base was bequeathed to the Apache Software Foundation making it open source, since then open source communities are taking care of it. Spark provides an interface to perform data analytics on entire clusters at scale with implicit data parallelism and fault-tolerance through its high-level APIs written in Java, Scala, Python, and R.

In Spark 2.0.0, elevated libraries (most widely used data analysis algorithms) are implemented, including:

  • Spark SQL for querying and processing large-scale structured data

  • SparkR for statistical computing that provides distributed computing using programming language R at scale

  • MLlib for machine learning (ML) applications, which is internally divided into two parts; MLlib for RDD-based machine learning application development and Spark ML for a high-level abstraction to develop complete computational data science and machine learning workflows

  • GraphX for large-scale graph data processing

  • Spark Streaming for handling large-scale real-time streaming data to provide a dynamic working environment to static machine learning

Since its first stable release, Spark has already experienced dramatic and rapid development as well as wide adoptions through active initiatives from a wide range of IT solution providers, open source communities, and researchers around the world. Recently it has emerged as one of the most active, and the largest open source project in the area of big data processing and cluster computing, not only for its comprehensive adoptions, but also deployments and surveys by IT peoples, data scientists, and big data engineers worldwide. As quoted by Matei Zaharia, founder of Spark and the CTO of Databricks on the Big Data analytics news website at: http://bigdataanalyticsnews.com/apache-spark-3-real-world-use-cases/:

It's an interesting thing. There hasn't been as much noise about it commercially, but the actual developer community votes with its feet and people are actually getting things done and working with the project.

Even though many Tech Giants such as Yahoo, Baidu, Conviva, ClearStory, Hortonworks, Gartner, and Tencent are already using Spark in production - on the other hand, IBM, DataStax, Cloudera, and BlueData provide the commercialized Spark distribution for the enterprise. These companies have enthusiastically deployed Spark applications at a massive scale collectively for processing multiple petabytes of data on clusters of 8,000 nodes, which is the largest known cluster of Spark.

Beauties of Spark

Are you planning to develop a machine learning (ML) application? If so, you probably already have some data to perform preprocessing before you train a model on that data, and finally, you will be using the trained model to make predictions on new data to see the adaptability. That's all you need? We guess no, since you have to consider other parameters as well. Obviously, you will desire your ML models to be working perfectly in terms of accuracy, execution time, memory usage, throughput, tuning, and adaptability. Wait! Still not done yet; what happens if you would like to make your application handle large training and new datasets at scale? Or as a data scientist, what if you could build your ML models to overcome these issues as a multi-step journey from data incorporation through train and error to production by running the same machine learning code on the big cluster and the personal computer without breaking down further? You can simply rely on Spark and close your eyes.

Spark has several advantages over other big data technologies such as MapReduce (you can refer to https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html for MapReduce tutorials and the research paper MapReduce: Simplified Data Processing on Large Clusters, Jeffrey Dean et al, In proc of OSDI, 2004 to get to know more) and Storm, which is a free and open source distributed real-time computation system (please refer to http://storm.apache.org/ for more on Storm-based distributed computing). First of all, Spark gives a comprehensive, unified engine to manage big data processing requirements with a variety of datasets such as text and tabular to graph data as well as the source of data (batch and real-time streaming data) that are diverse in nature. As a user (data science engineers, academicians, or developers), you can be likely benefited from Spark's rapid application development through simple and easy-to-understand APIs across batches, interactive, and real-time streaming applications.

Working and programming with Spark is easy and simple. Let us show you an example of that. Yahoo is one of the contributors and an early adopter of Spark, who implemented an ML algorithm with 120 lines of Scala code. With just 30 minutes of training on a large dataset with a hundred million records, the Scala ML algorithm was ready for business. Surprisingly, the same algorithm was written using C++ in 15,000 lines of code previously (please refer to the following URL for more at: https://www.datanami.com/2014/03/06/apache_spark_3_real-world_use_cases/). You can develop your applications using Java, Scala, R, or Python with a built-in set of over 100 high-level operators (mostly supported after Spark release 1.6.1) for transforming datasets and getting the familiarity with the data frame APIs for manipulating semi-structured, structured, and streaming data. In addition to the Map and Reduce operations, it supports SQL queries, streaming data, machine learning, and graph data processing. Moreover, Spark also provides an interactive shell written in Scala and Python for executing your codes sequentially (such as SQL or R style).

The main reason Spark adopts so quickly is because of two main factors: speed and sophistication. Spark provides order-of-magnitude performance for many applications using coarse-grained, immutable, and sophisticated data called Resilient Distributed Datasets that are spread across the cluster and that can be stored in memory or disks. An RDD offers fault-tolerance, which is resilient in a sense that it cannot be changed once created. Moreover, Spark's RDD has the property of recreating from its lineage if it is lost in the middle of computation. Furthermore, the RDD can be distributed automatically across the clusters by means of partitions and it holds your data. You can also keep it on your data on memory by the caching mechanism of Spark, and this mechanism enables big data applications in Hadoop-based MapReduce clusters to execute up to 100 times faster for in-memory if executed iteratively and even 10 times faster for disk-based operation.

Let's look at a surprising statistic about Spark and its computation powers. Recently, Spark took over Hadoop-based MapReduce by completing the 2014 Gray Sort Benchmark in the 100 TB category, which is an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records) (please refer to http://spark.apache.org/news/spark-wins-daytona-gray-sort-100tb-benchmark.html and http://sortbenchmark.org/). Finally, it becomes the open source engine (please refer to the following URL for more information https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html) for sorting at petabyte scale. In comparison, the previous world record set by Hadoop MapReduce had to use 2100 machines, taking 72 minutes of execution time, which implies Spark sorted the same data three times faster using 10 times fewer machines. Moreover, you can combine multiple libraries seamlessly to develop large-scale machine learning and data analytics pipelines to execute the job on various cluster managers such as Hadoop YARN, Mesos, or in the cloud by accessing data storage and sources such as HDFS, Cassandra, HBase, Amazon S3, or even RDBMs. Moreover, the job can be executed as a standalone mode on a local PC or cluster, or even on AWS EC2. Therefore, deployment of a Spark application on the cluster is easy (we will show more on how to deploy a Spark application on the cluster later in this chapter).

The other beauties of Spark are: it is open source and platform independent. These two are also its greatest advantage, which is it's free to use, distribute, and modify and develop an application on any platform. An open source project is also more secure as the code is accessible to everyone and anyone can fix bugs as they are found. Consequently, Spark has evolved so rapidly that it has become the largest open source project concerning big data solutions with 750+ contributors from 200+ organizations.

 

New computing paradigm with Spark


In this section, we will show a chronology of Spark that will provide a concept of how it was evolved and emerged as a revolution for big data processing and cluster computing. In addition to this, we will also describe the Spark ecosystem in brief to understand the features and facilities of Spark in more details.

Traditional distributed computing

The traditional data processing paradigm is commonly referred to as a client-server model, which people used to move data to the code. The database server (or simply the server) was mainly responsible for performing data operations and then returning the results to the client-server (or simply the client) program. However, when the number of task to be computed is increased, a variety of operations and client devices also started to increase exponentially. As a result, a progressively complex array of computing endpoint in servers also started in the background. So to keep this type of computing model we need to increase the application (client) servers and database server in balance for storing and processing the increased number of operations. Consequently, the data propagation between nodes and data transfer back and forth across this network also increases drastically. Therefore, the network itself becomes a performance bottleneck. As a result, the performance (in terms of both the scalability and throughput) in this kind of computing paradigm also decreases undoubtedly. It is shown in the following diagram:

Figure 1: Traditional distributed processing in action.

After the successful completion of human genome projects in life sciences, real-time IOT data, sensor data, data from mobile devices, and data from the Web are creating the data-deluge and contributing for the big data, which has mostly evolved the data-intensive computing. The data-intensive computing nowadays is now flattering increasingly in an emerging way, which requires an integrated infrastructure or computing paradigm, so that the computational resources and data could be brought in a common platform and perform the analytics on top of it. The reasons are diverse because big data is really huge in terms of complexity (volume, variety, and velocity), and from the operational perspective four ms (that is, move, manage, merge, and munge).

In addition, since we will be talking about large-scale machine learning applications in this book, we also need to consider some addition and critical assessing parameters such as validity, veracity, value, and visibility to grow the business. Visibility is important, because suppose you have a big dataset with a size of 1 PB; however, if there is no visibility, everything is a black hole. We will explain more on big data values in upcoming chapters.

It may not be feasible to store and process these large-scale and complex big datasets in a single system; therefore, they need to be partitioned and stored across multiple physical machines. Well, big datasets are partitioned or distributed, but to process and analyze these rigorously complex datasets, both the database servers as well as application servers might need to be increased to intensify the processing power at a large-scale. Again, the same performance bottleneck issues arrive at worst in multi-dimension that requires a new and more data-intensive big data processing and related computing paradigm.

Moving code to the data

To overcome the issues mentioned previously, a new computing paradigm is desperately needed so that instead of moving data to the code/application, we could move the code or application to the data and perform the data manipulation, processing, and associated computing at home (that is, where the data is stored). As you understand the motivation and objective, now the reverts programming model can be called move code to data and do parallel processing on distributed system, which can be visualized in the following diagram:

Figure 2: New computing (move code to data and do parallel processing on distributed system).

To understand the workflows illustrated in Figure 2, we can envisage a new programming model described as follows:

  • Execution of a big data processing using your application initiated at your personal computer (let's name it Driver Program), which coordinates the execution in action remotely across multiple computing nodes within a cluster or grid, or a more openly speaking cloud.

  • Now what you need is to transfer your developed application/algorithm/code segments (could be invoked or revoked using command-line or shell scripting as a simple programming language notation) to the computing/worker nodes (having large storage, main memory, and processing capability). We can simply imagine that the data to be computed or manipulated is already stored in those computing nodes as partitions or blocks.

  • It is also understandable that the bulk data no longer needs to be transferred (upload/download) to your driver program because of the network or computing bottleneck, but it only holds the data reference in its variable instead, which is basically an address (hostname/IP address with a port) to locate the physical data stored in the computing nodes in a cluster, for example (of course bulk-upload could be performed using other solutions, such as scalable provisioning that is to be discussed in later chapters).

  • So what do the remote computing nodes have? They have the data as well as code to perform the data computations and necessary processing to materialize the output or modified data without leaving their home (more technically, the computing nodes).

  • Finally, upon your request, only the results could be transferred across the network to your driver program for validation or other analytics since there are many subsets of the original datasets.

It's worth noticing that by moving the code to the data, the computing structure has been changed drastically. Most interestingly, the volume of data transfer across the network has significantly reduced. The justification here is that you will be transferring only a small chunk of software code to the computing nodes and receiving a small subset of the original data as results in return. This was the most important paradigm shifting for big data processing that Spark brought to us with the concept of RDD, datasets, DataFrame, and other lucrative features that imply great revolution in the history of big data engineering and cluster computing. However, for brevity, in the next section we will only discuss the concepts of RDD and the other computing features will be discussed in upcoming sections

RDD – a new computing paradigm

To understand the new computing paradigm, we need to understand the concept of Resilient Distributed Datasets (RDDs), by which and how Spark has implemented the data reference concept. As a result, it has been able to shift the data processing easily to take it at scale. The basic thing about RDD is that it helps you to treat your input datasets almost like any other data objects. In other words, it brings the diversity of input data types, which you greatly missed in the Hadoop-based MapReduce framework.

An RDD provides the fault-tolerance capability in a resilient way in a sense that it cannot be changed once created and the Spark engine will try to iterate the operation once failed. It is distributed because once it has created performed partition operations, RDDs are automatically distributed across the clusters by means of partitions. RDDs let you play more with your input datasets since RDDs can also be transformed into other forms rapidly and robustly. In parallel, RDDs can also be dumped through an action and shared across your applications that are logically co-related or computationally homogeneous. This is achievable because it is a part of Spark's general-purpose execution engine to gain massive parallelism, so it can virtually be applied in any type of datasets.

However, to make the RDD and related operation on your inputs, Spark engines require you to make a distinguishing borderline between the data pointer (that is, the reference) and the input data itself. Basically, your driver program will not hold data, but only the reference of the data where the data is actually located on the remote computing nodes in a cluster.

To make the data processing faster and easier, Spark supports two types of operations, which can be performed on RDDs: transformations and actions (please refer to Figure 3). A transformation operation basically creates a new dataset from an existing one. An action, on the other hand, materializes a value to the driver program after a successful computation on input datasets on the remote server (computing nodes to be more exact).

The style of data execution initiated by the driver program builds up a graph as a Directed Acyclic Graph (DAG) style; where nodes represent RDDs and the transformation operations are represented by the edges. However, the execution itself does not start in the computing nodes in a Spark cluster until an action operation is performed. Nevertheless, before starting the operation, the driver program sends the execution graph (that signifies the style of operation for the data computation pipelining or workflows) and the code block (as a domain-specific script or file) to the cluster and each worker/computing node receives a copy from the cluster manager node:

Figure 3: RDD in action (transformation and action operation).

Before proceeding to the next section, we argue you to learn about the action and transformation operation in more detail. Although we will discuss these two operations in Chapter 3, Understanding the Problem by Understanding the Data in detail. There are two types of transformation operations currently supported by Spark. The first one is the narrow transformation, where data mingle is unnecessary. Typical Spark narrow transformation operations are performed using the filter(), sample(), map(), flatMap(), mapPartitions() , and other methods. The wide transformation is essential to make a wider change to your input datasets so that the data could be brought in a common node out of multiple partitions of data shuffling. Wide transformation operations include groupByKey(), reduceByKey(), union(), intersection(), join(), and so on.

An action operation returns the final results of RDD computations from the transformation by triggering execution as a Directed Acyclic Graph (DAG) style to the Driver Program. But the materialized results are actually written on the storage, including the intermediate transformation results of the data objects and return the final results. Common actions include: first(), take(), reduce(), collect(), count(), saveAsTextFile(), saveAsSequenceFile(), and so on. At this point we believe that you have gained the basic operation on top of RDDs, so we can now define an RDD and related programs in a natural way. A typical RDD programming model that Spark provides can be described as follows:

  • From an environment variable, Spark Context (Spark shell or Python Pyspark provides you with a Spark Context or you can make your own, this will be described later in this chapter) creates an initial data reference RDD object.

  • Transform the initial RDD to create more RDDs objects following the functional programming style (to be discussed later on).

  • Send the code/algorithms/applications from the driver program to the cluster manager nodes. Then the cluster manager provides a copy to each computing node.

  • Computing nodes hold a reference of the RDDs in its partition (again, the driver program also holds a data reference). However, computing nodes could have the input dataset provided by the cluster manager as well.

  • After a transformation (via either narrow or wider transformation), the result to be generated is a brand new RDD, since the original one will not be mutated. Finally, the RDD object or more (specifically data reference) is materialized through an action to dump the RDD into the storage.

  • The Driver Program can request the computing nodes for a chunk of results for the analysis or visualization of a program.

Wait! So far we have moved smoothly. We suppose you will ship your application code to the computing nodes in the cluster. Still you will have to upload or send the input datasets to the cluster to be distributed among the computing nodes. Even during the bulk-upload you will have to transfer the data across the network. We also argue that the size of the application code and results are negligible or trivial. Another obstacle is if you/we want Spark to process the data at scale computation, it might require data objects to be merged from multiple partitions first. That means we will need to shuffle data among the worker/computing nodes that are usually done by partition(), intersection(), and join() transformation operations.

So frankly speaking, the data transfer has not been eliminated completely. As we and you understand the overheads being contributed especially for the bulk upload/download of these operations, their corresponding outcomes are as follows:

Figure 4: RDD in action (the caching mechanism).

Well, it's true that we have been compromised with these burdens. However, situations could be tackled or reduced significantly using the caching mechanism of Spark. Imagine you are going to perform an action multiple times on the same RDD objects, which have a long lineage; this will cause an increase in execution time as well as data movement inside a computing node. You can remove (or at least reduce) this redundancy with the caching mechanism of Spark (Figure 4) that stores the computed result of the RDD in the memory. This eliminates the recurrent computation every time. Because, when you cache on an RDD, its partitions are loaded into the main memory instead of a disk (however, if there is not enough space in the memory, the disk will be used instead) of the nodes that hold it. This technique enables big data applications on Spark clusters to outperform MapReduce significantly for each round of parallel processing. We will discuss more on Spark data manipulations and other techniques in Chapter 3, Understanding the Problem by Understanding the Data in detail.

 

Spark ecosystem


To provide more enhancements and additional big data processing capabilities, Spark can be configured and run on top of existing Hadoop-based clusters. As already stated, although Hadoop provides the Hadoop Distributed File System (HDFS) for efficient and operational storing of large-scale data cheaply; however, MapReduce provides the computation fully disk-based. Another limitation of MapReduce is that; only simple computations can be executed with a high-latency batch model, or static data to be more specific. The core APIs in Spark, on the other hand, are written in Java, Scala, Python, and R. Compared to MapReduce, with the more general and powerful programming model, Spark also provides several libraries that are part of the Spark ecosystems for redundant capabilities in big data analytics, processing, and machine learning areas. The Spark ecosystem consists of the following components, as shown in Figure 5:

Figure 5: Spark ecosystem (till date up to Spark 1.6.1).

As we have already stated, it is very much possible to combine these APIs seamlessly to develop large-scale machine learning and data analytics applications. Moreover, the job can be executed on various cluster managers such as Hadoop YARN, Mesos, standalone, or in the cloud by accessing data storage and sources such as HDFS, Cassandra, HBase, Amazon S3, or even RDBMs.

Nevertheless, Spark is enriched with other features and APIs. For example, recently Cisco has announced to invest $150M in the Spark ecosystem towards Cisco Spark Hybrid Services (http://www.cisco.com/c/en/us/solutions/collaboration/cloud-collaboration/index.html). So Cisco Spark open APIs could boost its popularity with developers in higher cardinality (highly secure collaboration and connecting smartphone systems to the cloud). Beyond this, Spark has recently integrated Tachyon (http://ampcamp.berkeley.edu/5/exercises/tachyon.html), a distributed in-memory storage system that economically fits in memory to further improve Spark's performance.

Spark core engine

Spark itself is written in Scala, which is functional, as well as Object Oriented Programming Language (OOPL) which runs on top of JVM. Moreover, as mentioned in Figure 5, Spark's ecosystem is built on top of the general and core execution engine, which has some extensible API's implemented in different languages. The lower level layer or upper level layer also uses the Spark core engine as a general execution job performing engine and it provides all other functionality on top. The Spark Core is written in Scala as already mentioned, and it runs on Java Virtual Machine (JVM) and the high-level APIs (that is, Spark MLlib, SparkR, Spark SQL, Dataset, DataFrame, Spark Streaming, and GraphX) that use the core in the execution time.

Spark has brought the in-memory computing mode to a great visibility. This concept (in-memory computing) enables the Spark core engine to leverage speed through a generalized execution model to develop diverse applications.

The low-level implementation of general purpose data computing and machine learning algorithms written in Java, Scala, R, and Python are easy to use for big data application development. The Spark framework is built on Scala, so developing ML applications in Scala can provide access to the latest features that might not be available in other Spark languages initially. However, that is not a big problem, open source communities also take care of the necessity of developers around the globe. Therefore, if you do need a particular machine learning algorithm to be developed, and you want to add it to the Spark library, you can contribute it to the Spark community. The source code of Spark is openly available on GitHub at https://github.com/apache/spark as Apache Spark mirror. You can do a pull out request and the open source community will review your changes or algorithm before adding it to the master branch. For more information, please check the Spark Jira confluence site at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark.

Python was a great arsenal for data scientists previously, and the contribution of Python in Spark is also not different. That means Python also has some excellent libraries for data analysis and processing; however, it is comparatively slower than Scala. R on the other hand, has a rich environment for data manipulation, data pre-processing, graphical analysis, machine learning, and statistical analysis, which can help to increase the developer's productivity. Java is definitely a good choice for developers who are coming from the Java and Hadoop background. However, Java also has the similar problem as Python, since Java is also slower than Scala.

A recent survey presented on the Databricks website at http://go.databricks.com/2015-spark-survey on Spark users (66% users evaluated the Spark languages where 41% were data engineers and 22% were data scientists) shows that 58% are using Python, 71% are using Scala, 31% are using Java, and 18% are using R for developing their Spark applications. However, in this book, we will try to provide the examples mostly in Java and a few in Scala if needed for the simplicity. The reason for this is that many of the readers are very familiar with Java-based MapReduce. Nevertheless, we will provide some hints of using the same examples in Python or R in the appendix at the end.

Spark SQL

Spark SQL is a Spark component for querying and structured data processing. The demand was obvious since many data science engineers and business intelligence analysts also rely on interactive SQL queries for exploring data from RDBMS. Previously, MS SQL server, Oracle, and DB2 were used frequently by the enterprise. However, these tools were not scalable or interactive. Therefore, to make it easier, Spark SQL provides a programming abstraction called DataFrames and datasets that work as distributed SQL query engines, which support unmodified Hadoop Hive queries to execute 100 times faster on existing deployments and data. Spark SQL is a powerful integration with the rest of the Spark ecosystem.

Recently, Spark has offered a new experimental interface, commonly referred to as datasets (to be discussed in more details in the next section), which provide the same benefits of RDDs to use the lambda functions strongly. Lambda evolves from the Lambda Calculus (http://en.wikipedia.org/wiki/Lambda_calculus) that refers to anonymous functions in computer programming. It is a flexible concept in modern programming language that allows you to write any function quickly without naming them. In addition, it also provides a nice way to write closures. For example, in Python:

def adder(x): 
    return lambda y: x + y 
add6 = adder(6) 
add4(4) 

It returns the result as 10. On the other hand, in Java, it can be similarly written if an integer is odd or even:

Subject<Integer> sub = x -> x % 2 = 0; // Tests if the parameter is even. 
boolean result = sub.test(8); 

The previous lambda function checks if the parameter is even and returns either true or false. For example, the preceding snippet would return true since 8 is divisible by 2.

Please note that in Spark 2.0.0, the Spark SQL has substantially been improved with the SQL functionalities with SQL 2003 support. Therefore, Spark SQL can now be executed with all the 99 TPC-DS queries. More importantly, now a native SQL parser supports ANSI_SQL and Hive QL. Native DDL is a command that can also be executed, it also now supports sub querying of SQL and the view of canonicalization support.

DataFrames and datasets unification

In the latest Spark release 2.0.0, in Scala and Java, the DataFrame and dataset have been unified. In other words, the DataFrame is just a type alias for a dataset of rows. However, in Python and R, given the lack of type safety, DataFrame is the main programming interface. And for Java, DataFrame is no longer supported, but only the Dataset and RDD-based computations are supported, and DataFrame has become obsolete (note that it has become obsolete - not depreciated). Although SQLContext and HiveContext are kept for backward compatibility; however, in Spark 2.0.0 release, the new entry point that replaces the old SQLContext and HiveContext for DataFrame and dataset APIs is SparkSession.

Spark streaming

You might want your applications to have the ability to process and analyze not only static datasets, but also real-time streams data. To make your wish easier, Spark Streaming provides the facility to integrate your application with popular batch and streaming data sources. The most commonly used data sources include HDFS, Flume, Kafka, and Twitter, and they can be used through their public APIs. This integration allows users to develop powerful interactive and analytical applications on both streaming and historical data. In addition to this, the fault tolerance characteristics are achieved through Spark streaming.

Graph computation – GraphX

GraphX is a resilient distributed graph computation engine built on top of Spark. GraphX brought a revolution to the users who want to interactively build, transform, and reason graph structured data with millions of nodes and vertices at scale. As a developer you will enjoy the simplicity so that a large-scale graph (social network graph, normal network graph, or astrophysics) could be represented using a small chunk of code written in Scala, Java, or Python. GraphX enables the developers to take the advantages of both data-parallel and graph-parallel systems by efficiently expressing graph computation with ease and speed. Another beauty added in the cabinet of GraphX is that it can be used to build an end-to-end graph analytical pipeline on real-time streaming data, where the graph-space partitioning is used to handle large-scale directed multigraph with properties associated with each vertex and edge. Well, some fundamental graph operators are used to make this happen such as subgraph, joinVertices and aggregateMessages as well as an optimized variant of the Pregel API in particular.

Machine learning and Spark ML pipelines

Traditional machine learning applications were used to build using R or Matlab that lacks scalability issues. Spark has brought two emerging APIs, Spark MLlib and Spark ML. These APIs make the machine learning as an actionable insight for engineering big data to remove the scalability constraint. Built on top of Spark, MLlib is a scalable machine learning library that is enriched with numerous high-quality algorithms with a high-accuracy performance that mainly works for RDDs. Spark provides many language options for the developers that are functioning in Java, Scala, R, and Python to develop complete workflows. Spark ML, on the other hand, is an ALPHA component that enhances a new set of machine learning algorithms to let data scientists quickly assemble and configure practical machine learning pipelines on top of DataFrames.

Statistical computation – SparkR

SparkR is an R package specially designed for the data scientists who are familiar with R language and want to analyze large datasets and interactively run jobs from the R shell, which supports all the major Spark DataFrame operations such as aggregation, filtering, grouping, summary statistics, and much more. Similarly, users also can create SparkR DataFrames from local R data frames, or from any Spark supported data sources such as Hive, HDFS, Parquet, or JSON. Technically speaking, the concept of Spark DataFrame is a tabular data object akin to R's native DataFrame (https://cran.r-project.org/web/packages/dplyr/vignettes/data_frames.html), which on the other hand, is syntactically similar to dplyr (an R package, refer to https://cran.rstudio.com/web/packages/dplyr/vignettes/introduction.html), but is stored in the cluster setting instead.

 

Spark machine learning libraries


In this section, we will describe two main machine learning libraries (Spark MLib and Spark ML) and the most widely used implemented algorithms. The ultimate target is to provide you with some familiarization about the machine learning treasures of Spark since many people still think that Spark is only a general-purpose in-memory big data processing or cluster computing framework. However, this is not like that, rather this information would help you to understand what could be done with the Spark machine learning APIs. In addition, this information would help you to explore and will increase the usability while deploying real-life machine learning pipelines using Spark MLlib and Spark ML.

Machine learning with Spark

In the pre-Spark era, big data modelers typically used to build their ML models. Where a model is prepared through a training process where it is required to make predictions and is corrected when those predictions are wrong. In short, an ML model is an object that takes an input, does some processing, and finally produces the output. Those models were commonly constructed using statistical languages such as R and SAS. Then the data engineers used to re-implement the same model in Java to deploy on Hadoop. However, this kind of workflow lacks efficiency, scalability, throughput, and accuracy with extended execution time. Using Spark, the same ML model can be built, adopted, and deployed, making the whole workflow much more efficient, robust, and faster and that allows you to provide hands-on insight to increase the performance. The main goal of Spark machine learning libraries is to make practical machine learning applications scalable, faster, and easy. It consists of common and widely used machine learning algorithms and their utilities, including classification, regression, clustering, collaborative filtering, and dimensionality reduction. It is divided into two packages: Spark MLlib (spark.mllib) and Spark ML (spark.ml).

Spark MLlib

MLlib is the machine learning library of Spark. It is a distributed, low-level library written with Scala, Java, and Python against Spark core runtime. MLlib mainly focus on learning algorithms and their proper utilities to not only provide machine learning analytical capabilities. The major learning utilities include classification, regression, clustering, recommender system, and dimensionality reduction. In addition, it also aids to optimize the general purpose primitives for developing large-scale machine learning pipelines. As stated earlier, MLlib comes with some exciting APIs written in Java, Scala, R, and Python. The main components of Spark MLlib are described in the following sections.

Data types

Spark provides support of local vectors and matrix data types stored on a single machine, as well as distributed matrices backed by one or multiple RDDs. Local vectors and matrices are simple data models that serve as public interfaces. The vector and matrix operations are heavily dependent on the linear algebra operation, and you are recommended to gain some background knowledge before using these data types.

Basic statistics

Spark not only provides a column summary and basic statistics to be performed on RDDs, but it also supports calculating the correlation between two series of data or more complex correlation operations, such as pairwise correlations among many series of data, which are a common operation in statistics. However, currently Pearson's and Spearman's correlations are only supported and more are to be added in future Spark releases. Unlike the other statistical function, stratified sampling is also supported by Spark and can be performed on RDD's as key-value pairs; however, some functionalities are yet to be added to Python developers.

Spark provides only the Pearson's chi-squared test for hypothesis testing for its goodness of fit and independence of a claim hypothesis, which is a powerful technique in statistics that determines whether a result is statistically significant to satisfy the claim. Spark also provides online implementations of some tests to support use cases such as A/B testing as streaming significance testing typically performed on real-time streaming data. Another exciting feature of Spark is the factory methods to generate random double RDDs or vector RDDs that are useful for randomized algorithms, prototyping, performance, and hypothesis testing. Other functionality in the current Spark MLib provides computation facilities of kernel density estimation from sample RDDs, which is a useful technique for visualizing empirical probability distributions.

Classification and regression

Classification is a typical process that helps new data objects and components to be organized, differentiated, and understood or belong in a certain way on the basis of training data. In statistical computing, two types of classification exist, binary classification (also commonly referred to as binomial classification) and multiclass classification. Binary classification is the task of classifying data objects of a given observation into two groups. Support Vector Machines (SVMs), logistic regression, decision trees, random forests, gradient-boosted trees, and Naive Bayes have been implemented up to the latest release of Spark.

Multiclass classification, on the other hand, is the task of classifying data objects of a given observation into more than two groups. The logistic regression, decision trees, random forests, and naive Bayes are implemented as multiclass classification. However, more complex classification algorithms such as multi-level classification and multiclass perceptron have not been implemented yet. The regression analysis is also a statistical process that estimates relationships among variables or observation. Other than the classification process, regression analysis involves several techniques for modeling and analyzing data objects. Currently, the following algorithms are supported by Spark MLlib library:

  • Linear least squares

  • Lasso

  • Ridge regression

  • Decision trees

  • Random forests

  • Gradient-boosted trees

  • Isotonic regression

Recommender system development

An intelligent and scalable recommender system is an emerging application that is currently being developed by many enterprises to expand their business and cost towards automating recommendation for customers. The collaborative filtering approach is the most widely used algorithm in the recommender system, aiming to fill in the missing entries of a user-item association matrix. For example, Netflix is an example who could manage to reduce their movie recommendation by several million dollars. However, the current implementation of Spark MLlib provides only the model-based collaborative filtering technique.

The pros of a model-based collaborative filtering algorithm are users and products that can be described by a small set of latent factors to predict missing entries using the Alternating Least Squares (ALS) algorithm. The con is that user rating or feedback cannot be taken into consideration for predicting an interest. Interestingly, open source developers are also working to develop a memory-based collaborative filtering technique to be incorporated into Spark MLib in which user rating data could be used to compute the similarity between users or items making the ML model more versatile.

Clustering

Clustering is an unsupervised machine learning problem/technique. The aims are to group subsets of entities with one another based on some notion of similarity that is often used for exploratory analysis and for developing hierarchical supervised learning pipelines. Spark MLib provides support for various clustering models such as K-means, Gaussian matrix, Power Iteration Clustering (PIC), Latent Dirichlet Allocation (LDA), Bisecting K-means, and Streaming K-means from real time streaming data. We will discuss more on supervised/unsupervised and reinforcement learning in upcoming chapters.

Dimensionality reduction

Working with high-dimensional data is cool and demanding to meet the big data related complexities. However, one of the problems with high-dimensional data is unwanted features or variables. Since all of the measured variables might not be important for building the model, to answer the questions of interest you might need to reduce the search space. Therefore, based on certain considerations or requirements, we need to reduce the dimension of the original data before creating any model without sacrificing the original structure.

The current implementation of MLib API supports two types of dimensionality reduction techniques: Singular Value Decomposition (SVD) and Principal Component Analysis (PCA) for tall-and-skinny matrices that are stored in row-oriented formats and for any vectors. The SVD technique has some performance issues; however, PCA is the most widely used technique in dimensionality reduction. These two techniques are very useful in large scale ML applications, but they require strong background knowledge of linear algebra.

Feature extraction and transformation

Spark provides different techniques for making the feature engineering easy to use through the Term frequency-inverse document frequency (TF-IDF), Word2Vec, StandardScaler, ChiSqSelector, and so on. If you are working or planning to work in the area of mining towards building a text mining ML application, TF-IDF would be an interesting option from Spark MLlib. TF-IDF provides a feature vectorization method to reflect the importance of a term to a document in the corpus that is very helpful to develop a text analytical pipeline.

In addition, you might be interested in using the Word2Vec computers distributed vector representation of the words or corpus on your ML application for text analysis. This feature of Word2Vec will eventually make your generalization and model estimation more robust in the area of the novel patterns. You also have the StandardScaler to normalize the extracted features by scaling to the unit variance or by removing the mean based on column summary statistics. It is needed in the pre-processing step while building a scalable ML application typically performed on the samples in the training dataset. Well, suppose you have extracted features through this method, now you will need to select the features to be incorporated into your ML model. Therefore, you might also be fascinated in the ChiSqSelector algorithm of Spark MLlib for feature selection. ChiSqSelector tries to identify relevant features during the ML model building. The reason is obviously to reduce the size of the feature space as well as the search space in a tree-based approach and to improve both speed and statistical learning behavior in the reinforcement learning algorithms.

Frequent pattern mining

Mining frequent items, maximal frequent patterns/itemsets, contiguous frequent patterns or subsequences, or other substructures is usually among the first steps to analyze a large-scale dataset before starting to build your ML models. The current implementation of Spark MLib provides a parallel implementation of FP-growth for mining frequent patterns and the association rules. It also provides the implementation of another popular algorithm, PrefixSpan, for mining sequence patterns. However, you will have to customize the algorithm for mining maximal frequent patterns accordingly. We will provide a scalable ML application for mining privacy, and preserving maximal frequent patterns in upcoming chapters.

Spark ML

Spark ML is an ALPHA component that adds a new set of machine learning APIs to let users quickly assemble and configure practical machine learning pipelines on top of DataFrames. Before praising the features and advantages of Spark ML, we should know about the DataFrames machine learning techniques that can be applied and developed to a wide variety of data types, such as vectors, unstructured (that is, raw texts), images, and structured data. In order to support a variety of data types to make the application development easier, recently, Spark ML has adopted the DataFrame and Dataset from Spark SQL.

A DataFrame or Dataset can be created either implicitly or explicitly from an RDD of objects that supports the basic and structured types. The goal of Spark ML is to provide a uniform set of high-level APIs built on top of DataFrames and datasets rather than RDDs. It helps the users to create and tune practical machine learning pipelines. The Spark ML also provides for the feature estimators and transformers for developing scalable ML pipelines. Spark ML systematizes many ML algorithms and APIs to make it even easier to combine multiple algorithms into a single pipeline, or data workflow that uses the concept of DataFrame and datasets.

The three basic steps in feature engineering are feature extraction, feature transformation, and selection. Spark ML provides implementation of several algorithms to make these steps easier. Extraction provides the facility for extracting features from raw data, whereas transformation provides the facility of scaling, converting, or modifying features that are found from the extraction step and the selection helps to select a subset from a larger set of features from the second step. Spark ML also provides several classifications (logistic regression, decision tree classifier, random forest classifier, and more), regression (liner regression, decision tree regression, random forest regression, survival regression, and gradient-boosted tree regression), decision tree and tree ensembles (random forest and gradient-boosted trees), as well as clustering (K-means and LDA) algorithms implemented for developing ML pipelines on top of DataFrames. We will discuss more on RDDs and DataFrames and their underlying operations in Chapter 3, Understanding the Problem by Understanding the Data.

 

Installing and getting started with Spark


Spark is Apache Hadoop's successor. Therefore, it would be better to install and work Spark into a Linux-based system even though you can also try on Windows and Mac OS. It is also very possible to configure your Eclipse environment to work with Spark as a Maven project on any OS and bundle your applications as a jar file with all the dependencies. Secondly, you can try running an application from the Spark shell (Scala shell to be more specific) following the same fashion as SQL or R programming:

The third way is from the command line (Windows)/Terminal (Linux/Mac OS). At first you need to write your ML application using Scala or Java and prepare the jar file with the required dependencies. Then the jar file can be submitted to a cluster to compute a Spark job.

We will show how to develop and deploy a Spark ML application in three ways. However, the very first perquisite is to prepare your Spark application development environment. You can install and configure Spark on a number of operating systems, including:

  • Windows (XP/7/8/10)

  • Mac OS X (10.4.7+)

  • Linux distribution (including Debian, Ubuntu, Fedora, RHEL, CentOS, and so on)

Note

Please check, the Spark website at https://spark.apache.org/documentation.html for the Spark version and OS related documentation. The following steps show you how to install and configure Spark on Ubuntu 14.04 (64-bit). Please note that Spark 2.0.0 runs on Java 7+, Python 2.6+/3.4+, and R 3.1+. For the Scala API, Spark 2.0.0 uses Scala 2.11. Therefore, you will need to use a compatible Scala version (2.11.x).

Step 1: Java installation

Java installation should be considered as one of the mandatory requirements in installing Spark since Java and Scala-based APIs require having a Java virtual machine installed on the system. Try the following command to verify the Java version:

$ java -version 

If Java is already installed on your system, you should see the following message:

java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)

In case you do not have Java installed on your system, make sure you install Java before proceeding to the next step. Please note that to avail and enjoy the lambda expression support it is recommended to install Java 8 on your system, preferably JDK and JRE both. Although for Spark 1.6.2 and prior releases Java 7 should be enough:

$ sudo apt-add-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer

After installing, don't forget to set JAVA_HOME. Just apply the following commands (we assume Java is installed at /usr/lib/jvm/java-8-oracle):

$ echo "export JAVA_HOME=/usr/lib/jvm/java-8-oracle" >> ~/.bashrc  
$ echo "export PATH=$PATH:$JAVA_HOME/bin" >> ~/.bashrc

You can add these environmental variables manually in the.bashrc file located in the home directory. If you cannot find the file, probably it is hidden so it needs to be explored. Just go to the view tab and enable the Show hidden file.

Step 2: Scala installation

Spark is written in Scala itself; therefore, you should have Scala installed on your system. Checking this is so straight forward by using the following command:

$ scala -version

If Scala is already installed on your system, you should get the following message on the terminal:

Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

Note that during the writing of this installation, we used the latest version of Scala, that is 2.11.8. In case you do not have Scala installed on your system, make sure you install it, so before proceeding to the next step, you can download the latest version of Scala from the Scala website at http://www.scala-lang.org/download/. After the download has finished, you should find the Scala tar file in the download folder:

  1. Extract the Scala tar file by extracting, from its location or type the following command for extracting the Scala tar file from the terminal:

        $ tar -xvzf scala-2.11.8.tgz 
    
  2. Now move the Scala distribution to the user’s perspective (for example, /usr/local/scala) by the following command or do it manually:

        $ cd /home/Downloads/ 
        $ mv scala-2.11.8 /usr/local/scala 
    
  3. Set the Scala home:

    $ echo "export SCALA_HOME=/usr/local/scala/scala-2.11.8" >> 
            ~/.bashrc  
    $ echo "export PATH=$PATH:$SCALA_HOME/bin" >> ~/.bashrc
    
  4. After installation has been completed, you should verify it using the following command:

        $ scala -version
    
  5. If Scala has successfully been configured on your system, you should get the following message on your terminal:

    Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
    

Step 3: Installing Spark

Download the latest version of Spark from the Apace Spark website at https://spark.apache.org/downloads.html. For this installation step, we used the latest Spark stable release 2.0.0 version pre-built for Hadoop 2.7 and later. After the download has finished, you will find the Spark tar file in the download folder:

  1. Extract the Scala tar file by extracting it from its location or type the following command for extracting the Scala tar file from the terminal:

        $ tar -xvzf spark-2.0.0-bin-hadoop2.7.tgz  
    
  2. Now move the Scala distribution to the user's perspective (for example, /usr/local/spark) by the following command or do it manually:

        $ cd /home/Downloads/ 
        $ mv spark-2.0.0-bin-hadoop2.7 /usr/local/spark 
    
  3. To set after Spark installing, just apply the following commands:

    $ echo "export SPARK_HOME=/usr/local/spark/spark-2.0.0-bin-hadoop2.7" >>
          ~/.bashrc  
    $ echo "export PATH=$PATH:$SPARK_HOME/bin" >> ~/.bashrc
    

Step 4: Making all the changes permanent

Source the ~/.bashrc file using the following command to make the changes permanent:

$ source ~/.bashrc

If you execute the $ vi ~/. bashrc command, you will see the following entry in your bashrc file as follows:

export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/db/bin:/usr/lib/jvm/java-8-oracle/jre/bin: /usr/lib/jvm/java-8-oracle/bin
export SCALA_HOME=/usr/local/scala/scala-2.11.8
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/db/bin:/usr/lib/jvm/java-8-oracle/jre/bin: /bin
export SPARK_HOME=/usr/local/spark/spark-2.0.0-bin-hadoop2.7
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/db/bin:/usr/lib/jvm/java-8-oracle/jre/bin: /bin

Step 5: Verifying the Spark installation

The verification of the Spark installation is shown in the following screenshot:

Figure 6: The Spark shell confirms the successful Spark installation.

Write the following command to open the Spark shell to verify if Spark has been configured successfully:

$ spark-shell

If Spark is installed successfully, you should see the following message (Figure 6).

The Spark server will start on localhost at port 4040, more precisely at http://localhost:4040/ (Figure 7). Just move there to make sure if it's really running:

Figure 7: Spark is running as a local web server.

Well done! Now you are ready to start writing the Scala code on the Spark shell.

 

Packaging your application with dependencies


Now we will show you how to package the applications as a Java archive (JAR) file with all the required dependencies on Eclipse, which is an Integrated Development Environment (IDE) and an open source tool for Java development as an Apache Maven project (https://maven.apache.org/). Maven is a software project management and comprehension tool like Eclipse. Based on the concept of a Project Object Model (POM), Maven can manage a project's build, reporting and documenting from a central piece of information.

Note that it is possible to export an ML application written in Java or Scala as an archive/executable jar file using Command Prompt. However, for the simplicity and faster application development we will use the same as the Maven project using Eclipse so that readers can enjoy the same facility to submit the application to the master node for computation. Now let's move to the discussion of exporting frequent pattern mining applications as a jar file with all the dependencies.

Step 1: Creating a Maven project in Eclipse

On successful creation of a sample Maven project, you will see the following project structure in Eclipse shown in Figure 8:

Figure 8: Maven project structure in Eclipse.

Step 2: Application development

Create a Java class and copy the following source code to under the src/main/java directory for the mining frequent pattern. Here, inputting the filename has been specified by the filename string to be provided through the command line argument or by specifying the source manually. For the time being, we have just provided line comments, however, you will get to know details from Chapter 3, Understanding the Problem by Understanding the Data and onwards:

import java.util.Arrays; 
import java.util.List; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.mllib.fpm.FPGrowth; 
import org.apache.spark.mllib.fpm.FPGrowthModel; 
import org.apache.spark.rdd.RDD; 
import org.apache.spark.sql.SparkSession; 
    
public class JavaFPGrowthExample { 
  public static void main(String[] args) { 
   //Specify the input transactional as command line argument  
   String fileName = "input/input.txt";  
   //Configure a SparkSession as spark by specifying the application name, master URL, Spark config, and Spark warehouse directory 
  SparkSession spark = SparkSession 
                  .builder() 
                  .appName("JavaFPGrowthExample") 
                  .master("local[*]") 
                  .config("spark.sql.warehouse.dir", "E:/Exp/") 
                  .getOrCreate(); 
   
   //Create an initial RDD by reading the input database  
   RDD<String> data = spark.sparkContext().textFile(fileName, 1); 
   
   //Read the transactions by tab delimiter & mapping RDD(data) 
   JavaRDD<List<String>> transactions = data.toJavaRDD().map( 
                   new Function<String, List<String>>(){ 
                   public List<String> call(String line) { 
                          String[] parts = line.split(" "); 
                          return Arrays.asList(parts); 
                                 } 
                             }); 
 
  //Create FPGrowth object by min. support & no. of partition     
  FPGrowth fpg = new  FPGrowth() 
                       .setMinSupport(0.2) 
                       .setNumPartitions(10); 
 
  //Train and run your FPGrowth model using the transactions 
  FPGrowthModel<String> model = fpg.run(transactions); 
        
  //Convert and then collect frequent patterns as Java RDD. After that print the frequent patterns along with their support 
    for (FPGrowth.FreqItemset<String> itemset :      
          model.freqItemsets().toJavaRDD().collect()) {   
       System.out.println(itemset.javaItems()  
                             + "==> " + itemset.freq()); 
      } 
    }   
  }  

Step 3: Maven configuration

Now you need to configure Maven specifying related dependencies and configurations. First, edit your existing pom.xml file to copy each XML source code snippets inside the <dependencies> tag. Please note that your dependencies might be different based on Spark release so change the version accordingly:

  1. Spark core dependency for Spark context and configuration:

          <dependency> 
          <groupId>org.apache.spark</groupId> 
          <artifactId>spark-core_2.11</artifactId> 
          <version>2.0.0</version> 
         </dependency> 
    
  2. Spark MLib dependency for the FPGrowth:

        <dependency> 
          <groupId>org.apache.spark</groupId> 
          <artifactId>spark-mllib_2.11</artifactId> 
          <version>2.0.0</version> 
         </dependency> 
    

Now you need to add the build requirements. Copy the following code snippets immediately after the </dependencies> tag. Here we are specifying the <groupId> as maven plugins, <artifactId> as maven shade plugins, and specifying the jar file naming convention using the <finalName> tag. Make sure that you have specified the source code download plugin, set the compiler level, and set the assembly plugin for the Maven, described as follows:

  1. Specify the source code download plugin with Maven:

           <plugin> 
            <groupId>org.apache.maven.plugins</groupId> 
            <artifactId>maven-eclipse-plugin</artifactId> 
            <version>2.9</version> 
            <configuration> 
              <downloadSources>true</downloadSources> 
              <downloadJavadocs>false</downloadJavadocs> 
            </configuration> 
          </plugin>  
    
  2. Set the compiler level for Maven:

          <plugin> 
            <groupId>org.apache.maven.plugins</groupId> 
            <artifactId>maven-compiler-plugin</artifactId> 
            <version>2.3.2</version>         
          </plugin> 
          <plugin> 
            <groupId>org.apache.maven.plugins</groupId> 
            <artifactId>maven-shade-plugin</artifactId> 
            <configuration> 
              <shadeTestJar>true</shadeTestJar> 
            </configuration> 
          </plugin> 
    
  3. Set the Maven assembly plugin:

          <plugin> 
            <groupId>org.apache.maven.plugins</groupId> 
            <artifactId>maven-assembly-plugin</artifactId> 
            <version>2.4.1</version> 
            <configuration> 
              <!-- get all project dependencies --> 
              <descriptorRefs> 
                <descriptorRef>jar-with-dependencies</descriptorRef> 
              </descriptorRefs> 
              <!-- MainClass in mainfest make a executable jar --> 
              <archive> 
                <manifest>              <mainClass>com.example.SparkFPGrowth.JavaFPGrowthExample</mainClass>            </manifest> 
              </archive> 
              <property> 
                <name>oozie.launcher.mapreduce.job.user.classpath.first</name> 
                <value>true</value> 
              </property> 
              <finalName>FPGrowth-${project.version}</finalName> 
            </configuration> 
            <executions> 
              <execution> 
                <id>make-assembly</id> 
                <!-- bind to the packaging phase --> 
                <phase>package</phase> 
                <goals> 
                  <goal>single</goal> 
                </goals> 
              </execution> 
            </executions> 
          </plugin> 
    

The complete pom.xml file, input data, and Java source file can be downloaded from our GitHub repositories at https://github.com/rezacsedu/PacktMLwithSpark. Please note that we used Eclipse Mars Eclipse IDE for Java Developers, and the version was Mars Release (4.5.0). You can go for this version or another distribution such as Eclipse Luna.

Step 4: The Maven build

In this section, we will describe how to create a Maven friendly project on Eclipse. After you have followed all the steps, you will be able to run a Maven project successfully. The steps should be in a chronological order as follows:

  1. Run your project as Maven install.

  2. If your code and maven configuration file are okay, the maven build will be successful.

  3. Build the maven project.

  4. Right-click on your project and run the maven project as Maven build... and write clean package in the Goals option.

  5. Check the Maven dependencies.

  6. Expand the Maven dependencies tree and check if the required jar files have been installed.

  7. Check if the jar file is generated with dependencies.

  8. As we specified, you should find two jar files under the /target directory tree (refer to Figure 9). The packaging file should contain exactly the same name as specified in the <finalName> tag. Now move your code (jar file) to a directory that aligns our experiment (that is, /user/local/code) and your data (that is, /usr/local/data/). We will use this jar file to execute the Spark job on an AWS EC2 cluster in a later stage. We will discuss the input dataset in the next step.

    Figure 9: Maven project with the jar generated with all the required dependencies on Eclipse.

 

Running a sample machine learning application


In this section, we will describe how to run a sample machine learning application from the Spark shell, on the local machine as stand-alone mode, and finally we will show you how to deploy and run the application on the Spark cluster using Amazon EC2 (https://aws.amazon.com/ec2/).

Running a Spark application from the Spark shell

Please note that this is just an exercise that checks the installation and running of a sample code. Details on machine learning application development will be covered from Chapter 3, Understanding the Problem by Understanding the Data to Chapter 9, Advanced Machine Learning with Streaming and Graph Data.

Now we will further proceed with one of the popular machine learning problem also called frequent pattern mining using the Frequent Pattern-growth or FP-growth. Suppose we have a transactional database as shown in the following table. Each line indicates a transaction done by a particular customer. Our target is to find the frequent patterns from the database, which is the prerequisite for calculating association rules (https://en.wikipedia.org/wiki/Association_rule_learning) from customer purchase rules. Save this database as input.txt in the /usr/local/data directory without transaction IDs:

Transaction ID

Transaction

1

2

3

4

5

6

7

8

9

10

A B C D F

A B C E

B C D E F

A C D E

C D F

D E F

D E

C D F

C F

A C D E

Table 1: A transactional database.

Now let's move to the Spark shell by specifying the master and number of the computational core to use as standalone mode (here are four cores, for example):

$ spark-shell --master "local[4]" 

Step 1: Loading packages

Load the required FPGrowth package and other dependent packages:

scala>import org.apache.spark.mllib.fpm.FPGrowth
scala>import org.apache.spark.{SparkConf, SparkContext}

Step 2: Creating Spark context

To create a Spark context, at first you need to configure the Spark session by mentioning the application name and master URL. Then you can use the Spark configuration instance variable to create a Spark context as follows:

val conf = new SparkConf().setAppName(s"FPGrowthExample with $params")
val sc = new SparkContext(conf)

Step 3: Reading the transactions

Let's read the transactions as RDDs on the created Spark Context (sc) (see Figure 6):

scala> val transactions = sc.textFile(params.input).map(_.split(" ")).cache()

Step 4: Checking the number of transactions

Here is the code for checking the number of transactions:

Scala>println(s"Number of transactions: ${transactions.count()}")
Number of transactions: 22
Scala>

Step 5: Creating an FPGrowth model

Create the model by specifying the minimum support threshold (see also https://en.wikipedia.org/wiki/Association_rule_learning) and the number of partitions:

scala>val model = new FPGrowth()
                   .setMinSupport(0.2)
                   .setNumPartitions(2)
                   .run(transactions)

Step 6: Checking the number of frequent patterns

The following code explains how to check the number of frequent patterns:

scala> println(s"Number of frequent itemsets:
    ${model.freqItemsets.count()}")
Number of frequent itemsets: 18
Scala>

Step 7: Printing patterns and support

Print the frequent pattern and their corresponding support/frequency counts (see Figure 10). Spark job will be running on localhost (refer to Figure 11):

scala> model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)}

Figure 10: Frequent patterns.

Running a Spark application on the local cluster

Once a user application is bundled as either a jar file (written in Scala or Java) or a Python file, it can be launched using the spark-submit script located under the bin directory in the Spark distribution.

As per the API documentation provided by the Spark website at http://spark.apache.org/docs/2.0.0-preview/submitting-applications.html, this script takes care of setting up the class path with Spark and its dependencies, and can support different cluster managers and deploys models that Spark supports. In a nutshell, Spark job submission syntax is as follows:

$spark-submit [options] <app-jar | python-file> [app arguments]

Here, [options] can be: --class <main-class>--master <master-url>--deploy-mode <deploy-mode>, and a number of other options.

To be more specific, <main-class> is the name of the main class name, which is the entry point for our application. <master-url> specifies the master URL for the cluster (for example, spark://HOST:PORT for connecting to the given Spark standalone cluster master, local for running Spark locally with one worker thread with no parallelism at all, local [k] for running a Spark job locally with K worker threads, which is the number of cores on your machine, local[*] for running a Spark job locally with as many worker threads as logical cores on your machine have, and mesos://IP:PORT for connecting to the available Mesos cluster, and even you could submit your job to the Yarn cluster - for more, see http://spark.apache.org/docs/latest/submitting-applications.html#master-urls).

<deploy-mode> is used to deploy our driver on the worker nodes (cluster) or locally as an external client (client). <app-jar> is the jar file we just built, including all the dependencies. <python-file> is the application main source code written using Python. [app-arguments] could be an input or output argument specified by an application developer:

Figure 11: Spark job running on localhost

Therefore, for our case, the job submit syntax would be as follows:

$./bin/spark-submit --class com.example.SparkFPGrowth.JavaFPGrowthExample --master local[4] FPGrowth-0.0.1-SNAPSHOT-jar-with-dependencies.jar input.txt

Here, JavaFPGrowthExample is the main class file written in Java; local is the master URL; FPGrowth-0.0.1-SNAPSHOT-jar-with-dependencies.jar is the application jar file we just generated by maven project; input.txt is the transactional database as the text file, and output is the directory where the output to be generated (in our case, the output will be shown on the console). Now let's submit this job to be executed locally.

If it is executed successfully, you will find the following message including the output in Figure 12 (abridged):

Figure 12: Spark job output on the terminal.

Running a Spark application on the EC2 cluster

In the previous section, we illustrated how to submit spark jobs in local or standalone mode. Here, we are going to describe how to run a spark application in cluster mode. To make our application run on the spark cluster mode, we consider the Amazon Elastic Compute Cloud (EC2) services, as Infrastructure as a Service (IaaS) or Platform as a Service (PaaS). For pricing and related information, please refer to this URL https://aws.amazon.com/ec2/pricing/.

Step 1: Key pair and access key configuration

We assume you have EC2 accounts already created. The first requirement is to create EC2 key pairs and AWS access keys. The EC2 key pair is the private key that you need when you will make a secure connection through SSH to your EC2 server or instances. To make the key, you have to go through the AWS console at http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#having-ec2-create-your-key-pair. Please refer to Figure 13, which shows the key pair creation page for an EC2 account:

Figure 13: AWS key-pair generation window.

Name it my-key-pair.pem once you have downloaded it and saved it on your local machine. Then ensure the permission by executing the following command (you should store this file in a secure location for security purposes, say /usr/local/key):

$ sudo chmod  400  /usr/local/key/my-key-pair.pem

Now what you need is the AWS access keys, the credentials of your account, which are needed if you want to submit your Spark job to compute nodes from your local machine using spark-ec2 script. To generate and download the keys, login to your AWS IAM services at http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_CreateAccessKey. Upon the download completion (that is, /usr/local/key), you need to set two environment variables in your local machine. Just execute the following commands:

$ echo "export AWS_ACCESS_KEY_ID=<access_key_id>" >> ~/.bashrc  
$ echo " export AWS_SECRET_ACCESS_KEY=<secret_access_key_id>" >> ~/.bashrc  

Step 2: Configuring the Spark cluster on EC2

Spark distribution (that is, /usr/local/spark/ec2) provides a script called spark-ec2 for launching Spark Clusters in EC2 instances from your local machine (driver program), which helps in launching, managing, and shutting down the Spark Cluster.

Note

Please note that starting a cluster on AWS will cost money. Therefore, it is always a good practice to stop or destroy a cluster when the computation is done. Otherwise, it will incur additional costs. For more about AWS pricing, please refer to this URL https://aws.amazon.com/ec2/pricing/.

Once you execute the following command to launch a new instance, it sets up Spark, HDFS, and other dependencies on the cluster automatically:

$./spark-ec2 --key-pair=<name_of_the_key_pair> --identity-file=<path_of_the key_pair>  --instance-type=<AWS_instance_type > --region=<region> zone=<zone> --slaves=<number_of_slaves> --hadoop-major-version=<Hadoop_version> --spark-version=<spark_version> launch <cluster-name>

We believe that these parameters are self-explanatory, or alternatively, please see details at http://spark.apache.org/docs/latest/ec2-scripts.html. For our case, it would be something like this:

$./spark-ec2 --key-pair=my-key-pair --identity-file=/usr/local/key/my-key-pair.pem  --instance-type=m3.2xlarge --region=eu-west-1 --zone=eu-west-1a --slaves=2 --hadoop-major-version=yarn --spark-version=1.6.0 launch ec2-spark-cluster-1

It is shown in the following screenshot:

Figure 14: Cluster home.

After the successful completion, spark cluster will be instantiated with two workers (slave) nodes on your EC2 account. This task; however, sometimes might take half an hour approximately depending on your Internet speed and hardware configuration. Therefore, you'd love to have a coffee break. Upon successful competition of the cluster setup, you will get the URL of the Spark cluster on the terminal.

To check to make sure if the cluster is really running, check this URL https://<master-hostname>:8080 on your browser, where the master hostname is the URL you receive on the terminal. If everything was okay, you will find your cluster is running, see cluster home in Figure 14.

Step 3: Running and deploying Spark job on Spark Cluster

Execute the following command to the SSH remote Spark cluster:

$./spark-ec2 --key-pair=<name_of_the_key_pair> --identity-file=<path_of_the _key_pair> --region=<region> login <cluster-name>   

For our case, it should be something like this:

$./spark-ec2 --key-pair=my-key-pair --identity-file=/usr/local/key/my-key-pair.pem --region=eu-west-1 login ec2-spark-cluster-1 

Now copy your application (the jar we generated as the Maven project on Eclipse) to a remote instance (that is, ec2-52-48-119-121.eu-west-1.compute.amazonaws.com in our case) by executing the following command (in a new terminal):

$ scp -i /usr/local/key/my-key-pair.pem  /usr/local/code/FPGrowth-0.0.1-SNAPSHOT-jar-with-dependencies.jar [email protected]:/home/ec2-user/

Then you need to copy your data (/usr/local/data/input.txt in our case) to the same remote instance by executing the following command:

$ scp -i /usr/local/key/my-key-pair.pem /usr/local/data/input.txt [email protected]:/home/ec2-user/ 

Figure 15: Job running status at Spark cluster.

Well done! You are almost done! Now, finally you will have to submit your Spark job to be computed by the slaves or worker nodes. To do so, just execute the following commands:

$./bin/spark-submit --class com.example.SparkFPGrowth.JavaFPGrowthExample --master spark://ec2-52-48-119-121.eu-west-1.compute.amazonaws.com:7077 /home/ec2-user/FPGrowth-0.0.1-SNAPSHOT-jar-with-dependencies.jar /home/ec2-user/input.txt

Upon successful completion of the job computation, you are supposed to see the status of your job at port 8080 like Figure 15 (the output will be shown on the terminal).

Step 4: Pausing and restarting spark cluster

To stop your clusters, execute the following command from your local machine:

$./ec2/spark-ec2 --region=<ec2-region> stop <cluster-name>

For our case, it would be:

$./ec2/spark-ec2 --region=eu-west-1 stop ec2-spark-cluster-1

To restart the cluster later on, execute the following command:

$./ec2/spark-ec2 -i <key-file> --region=<ec2-region> start <cluster-name>

For our case, it will be something as follows:

$./ec2/spark-ec2 --identity-file=/usr/local/key/my-key-pair.pem --region=eu-west-1 start ec2-spark-cluster-1

Tip

To terminate your spark cluster:$./spark-ec2 destroy <cluster-name>

In our case, it would be: $./spark-ec2 --region=eu-west-1 destroy ec2-spark-cluster-1

If you want your application to scale up for large-scale datasets, the fastest way is to load them from Amazon S3 or an Amazon EBS device into an instance of the Hadoop Distributed File System (HDFS) on your nodes. We will discuss this technique in later chapters throughout practical machine learning examples.

 

References


  • Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica. NSDI 2012 . April 2012.

  • Spark: Cluster Computing with Working Sets, Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica, HotCloud 2010 . June 2010.

  • Spark SQL: Relational Data Processing in Spark, Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, Matei Zaharia, SIGMOD  2015. June 2015.

  • Discretized Streams: Fault-Tolerant Streaming Computation at Scale, Matei Zaharia, Tathagata Das, Haoyuan Li, Timothy Hunter, Scott Shenker, Ion Stoica. SOSP 2013 . November 2013.

  • Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters. Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker, Ion Stoica. HotCloud 2012 . June 2012.

  • GraphX: Unifying Data-Parallel and Graph-Parallel Analytics. Reynold S. Xin, Daniel Crankshaw, Ankur Dave, Joseph E. Gonzalez, Michael J. Franklin, Ion Stoica. OSDI 2014 . October 2014.

  • MLlib: Machine Learning in Apache Spark, Meng et al. arXiv:1505.06807v1, [cs.LG], 26 May 2015.

  • Recommender: An Analysis of Collaborative Filtering Techniques, Christopher R. Aberger, Stanford publication, 2014.

 

Summary


This ends our rather quick tour of Spark. We have tried to cover some of the most basic features of Spark, its computing paradigm, and getting started with Spark by installing and configuring. Use of Spark ML is recommended if they fit the ML pipeline concept well (for example, feature extraction, transformation, and selection) since it is more versatile and flexible with DataFrames and Datasets. However, according to Apache's documentations, they will keep supporting and contributing Spark MLib along with the active development of Spark ML.

On the other hand, data science developers should be comfortable with using Spark MLlib's features and should expect more features in the future. However, some algorithms are not available or are yet to be added to Spark ML, most notably, dimensionality reduction. Nevertheless, developers can seamlessly combine the implementation of these techniques found in Spark MLib with the rest of the algorithms found in Spark ML as hybrid or interoperable ML applications. We also showed some basic techniques to deploy ML applications on clusters and cloud services, though you can also try other deployment options available.

Tip

For more updates, interested readers should refer to the Spark website at http://spark.apache.org/docs/latest/mllib-guide.html for release dates, APIs, and specifications. As Spark's open source community and developers from all over the globe are continually enriching and updating the implementation, therefore, it is better to be updated.

In the next chapter (Chapter 2, Machine Learning Best Practices), we will discuss some best practices while developing advanced machine learning with Spark, including machine learning tasks and classes, some practical machine learning problems and their related discussion, some best practices in machine learning application development, choosing the right algorithm for the ML application, and so on.

About the Authors

  • Md. Rezaul Karim

    Md. Rezaul Karim is a researcher, author, and data science enthusiast with a strong computer science background, coupled with 10 years of research and development experience in machine learning, deep learning, and data mining algorithms to solve emerging bioinformatics research problems by making them explainable. He is passionate about applied machine learning, knowledge graphs, and explainable artificial intelligence (XAI). Currently, he is working as a research scientist at Fraunhofer FIT, Germany. He is also a PhD candidate at RWTH Aachen University, Germany. Before joining FIT, he worked as a researcher at the Insight Centre for Data Analytics, Ireland. Previously, he worked as a lead software engineer at Samsung Electronics, Korea.

    Browse publications by this author
  • Md. Mahedi Kaysar

    Md. Mahedi Kaysar is a Software Engineer and Researcher at the Insight Center for Data Analytics (the largest data analytics center across the Ireland and the largest semantic web research institute in the world), Dublin City University (DCU), Ireland. Before joining the Insight Center at DCU, he worked as a Software Engineer at the Insight Center for Data Analytics, National University of Ireland, Galway and Samsung Electronics, Bangladesh.

    He has more than 5 years of experience in research and development with a strong background in algorithms and data structures concentrating on C, Java, Scala, and Python. He has lots of experience in enterprise application development and big data analytics.

    He obtained a BSc in Computer Science and Engineering from the Chittagong University of Engineering and Technology, Bangladesh. Now, he has started his postgraduate research in Distributed and Parallel Computing at the Dublin City University, Ireland.

    His research interests include Distributed Computing, Semantic Web, Linked Data, big data, Internet of Everything, and machine learning. Moreover, he was involved in a research project in collaboration with CISCO Systems Inc. in the area of Internet of Everything and Semantic Web Technologies. His duties were to develop an IoT-enabled meeting management system, a scalable system for stream processing, designing, and showcasing the use cases of a project.

    Browse publications by this author

Latest Reviews

(1 reviews total)
Ottimo investimento, penso sia la cosa giusta da fare.