In this chapter, we are going to introduce Spark and learn the core concepts, such as, SparkContext, and Spark tools such as SparkConf andÂ Spark shell. The only prerequisite is the knowledge of basic Python concepts and the desire to seek insight from big data. We will learn how to analyze and discover patterns with Spark SQL to improve our businessÂ intelligence. Also, you will be able to quickly iterate through your solution by setting to PySpark for your own computer.Â By the end of the book, you will be able to work with real-life messy data sets using PySpark to get practical big data experience.
In this chapter, we will cover the following topics:
- An overview ofÂ PySpark
- Setting up Spark on Windows and PySpark
- Core concepts in Spark and PySpark
Before we start with installingÂ PySpark, which is the Python interface for Spark, let's go through some core concepts in Spark and PySpark. Spark is the latest big data tool from Apache, which can be found by simply going toÂ http://spark.apache.org/. It's a unified analytics engine for large-scale data processing. This means that, if you have a lot of data, you can feed that data into Spark to create some analytics at a good speed. If we look at the running times between Hadoop and Spark, Spark is more than a hundred times faster than Hadoop. It is very easy to use because there are very good APIs for use with Spark.
The core concept in Spark is an RDD,Â which is similar to the pandas DataFrame, or a Python dictionary or list. It is a way for Spark to store large amounts of data on the infrastructure for us. The key difference ofÂ an RDDÂ versus something that is in your local memory, such as a pandas DataFrame, is that an RDD is distributed across many machines, but it appears like one unifiedÂ dataset. What this means is, if you have large amounts of data that you want to operate on in parallel, you can put it in an RDD and Spark will handle parallelization and the clustering of the data for you.
Python is similar to PySpark integration, which we will cover soon. For now, we will import some libraries from the PySpark package to help us work with Spark. The best way for us to understand Spark is to look at an example, as shown in the following screenshot:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
In the preceding code, we have created a new variable calledÂ
linesÂ by callingÂ
scÂ is our Python objects that represent our Spark cluster. A Spark cluster is a series of instances or cloud computers that store our Spark processes. By calling aÂ
textFileÂ constructor and feeding inÂ
data.text, we have potentially fed in a large text file and created an RDD just using this one line. In other words, what we are trying to do here is to feed a large text file into a distributed cluster and Spark, and Spark handles this clustering for us.
In line two and line three, we have a MapReduce function. In line two, we have mapped the length function using aÂ
lambdaÂ function to each line ofÂ
data.text. In line three, we have called a reduction function to add all
lineLengthsÂ together to produce the total length of the documents. While Python'sÂ
linesÂ is a variable that contains all the lines in
data.text, under the hood, Spark is actually handling the distribution of fragments ofÂ
data.textÂ in two different instances on the Spark cluster, and is handling the MapReduce computation over all of these instances.
Spark SQL is one of the four components on top of the Spark platform, as we saw earlier in the chapter. It can be used to execute SQL queries or read data from any existing Hive insulation, where Hive is a database implementation also from Apache. Spark SQL looks very similar to MySQL or Postgres. The following code snippet is a good example:
#Register the DataFrame as a SQL temporary view df.CreateOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() #+----+-------+ #| age| name| #+----+-------+ #+null|Jackson| #| 30| Martin| #| 19| Melvin| #+----|-------|
You'll need to select all the columns from a certain table,Â such asÂ
people, and using the Spark objects, you'll feed in a very standard-looking SQL statement, which is going to show an SQL result much like what you would expect from a normal SQL implementation.
Let's now look at datasets and DataFrames. A dataset is a distributed collection of data. It is an interface added in Spark 1.6 that provides benefits on top of RDDs. A DataFrame, on the other hand, is very familiar to those who have used pandas or R. A DataFrame is simply a dataset organized into named columns, which is similar to a relational database or a DataFrame in Python. The main difference between a dataset and a DataFrame is that DataFrames have column names. As you can imagine, this would be very convenient for machine learning work and feeding into things such as scikit-learn.
Let's look at how DataFrames can be used. The following code snippet is a quick example of a DataFrame:
# spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() #+----+-------+ #| age| name| #+----+-------+ #+null|Jackson| #| 30| Martin| #| 19| Melvin| #+----|-------|
In the same way, as pandas or R would do,
read.json allows us to feed in some data from a JSON file, andÂ
df.showÂ shows us the contents of the DataFrame in a similar way to pandas.
MLlib, as we know, is used to make machine learning scalable and easy. MLlib allows you to do common machine learning tasks, such as featurization; creating pipelines; saving and loading algorithms, models, and pipelines; and also some utilities, such as linear algebra, statistics, and data handling. The other thing to note is that Spark and RDD are almost inseparable concepts. If your main use case for Spark is machine learning, Spark now actually encourages you to use the DataFrame-based API for MLlib, which is quite beneficial to us as we are already familiar with pandas, which means a smooth transition into Spark.
In the next section, we will see how we can set up Spark on Windows,Â and set up PySpark as the interface.
- Â DownloadÂ Gnu on WindowsÂ (GOW) fromÂ https://github.com/bmatzelle/gow/releases/download/v0.8.0/Gow-0.8.0.exe.
- GOW allows the use of Linux commands on Windows. We can use the following command to see the basic Linux commands allowed by installing GOW:
This gives the following output:
- Download and install Anaconda. If you need help, you can go through the following tutorial:Â https://medium.com/@GalarnykMichael/install-python-on-windows-anaconda-c63c7c3d1444.
- Close the previous command line and open a new command line.
- Go to the Apache Spark website (https://spark.apache.org/).
- To download Spark, choose the following from the drop-down menu:
The following screenshot shows the download page of Apache Spark:
- Then, download Spark. Once it is downloaded, move the file to the folder where you want to unzip it.
- You can either unzip it manuallyÂ or use the following commands:
gzip -d spark-2.1.0-bin-hadoop2.7.tgz tar xvf spark-2.1.0-bin-hadoop2.7.tar
- Now, downloadÂ
winutils.exeÂ into yourÂ
spark-2.1.0-bin-hadoop2.7\binÂ folder using the following command:
curl -k -L -o winutils.exehttps://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true
This gives the following output:
This gives the following output:
setx SPARK_HOME C:\opt\spark\spark-2.1.0-bin-hadoop2.7 setx HADOOP_HOME C:\opt\spark\spark-2.1.0-bin-hadoop2.7 setx PYSPARK_DRIVER_PYTHON ipython setx PYSPARK_DRIVER_PYTHON_OPTS notebook
C:\opt\spark\spark-2.1.0-bin-hadoop2.7\binÂ to your path.
- Close the Terminal, open a new one, and type the following command:
PYSPARK_DRIVER_PYTHONÂ and theÂ
PYSPARK_DRIVER_PYTHON_OPTSÂ parameters are used to launch the PySpark shell in Jupyter Notebook. TheÂ
--masterÂ parameter is used for setting the master node address.
- Spark shell
The power of Spark can be seen when you have a large amount of data that doesn'tÂ fitÂ into your local machine or your laptop, so you need two or more computers to process it. You also need to maintain the speed of processing this data while working on it. We not only want the data to be split among a few computers for computation; we also want the computation to be parallel. Lastly, youÂ wantÂ this computation to look like one single computation.
Let's consider an example where we have a large contact database that has 50 million names, and we might want to extractÂ the firstÂ name from each of these contacts. Obviously, it is difficult to fit 50 million names into your local memory, especially if each name is embedded within a larger contacts object. This is where Spark comes into the picture. Spark allows you to give it a bigÂ data file,Â and will help in handling and uploading this data file, while handling all the operations carried out on this data for you. This power is managed by Spark's cluster manager, as shown in the following diagram:
The cluster manager manages multiple workers; there could be 2, 3, or even 100. The main point is that Spark's technology helps in managing this cluster of workers, and you need a way to control how the cluster is behaving, and also pass data back and forth from the clustered rate.
A SparkContextÂ lets you use the power of Spark's cluster manager as withÂ Python objects. So with aÂ SparkContext, you can pass jobs and resources, schedule tasks, and complete tasks the downstream from theÂ SparkContextÂ down to theÂ Spark Cluster Manager, which will then take the results back from theÂ Spark Cluster Manager
Â once it has completed its computation.
Let's see what this looks like in practice andÂ see how to set up a SparkContext:
- First, we need to importÂ
- Create a new object in theÂ
scÂ variable standing for the SparkContext using theÂ
- In theÂ
SparkContextÂ constructor,Â pass aÂ
localÂ context. We are looking atÂ
hands onÂ PySparkÂ in this context, as follows:
from pyspark import SparkContext sc = SparkContext('local', 'hands on PySpark')
- After we've established this, all we need to do is then useÂ
scÂ as an entry point to our Spark operation, as demonstrated in the following code snippet:
visitors = [10, 3, 35, 25, 41, 9, 29] df_visitors = sc.parallelize(visitors) df_visitors_yearly = df_visitors.map(lambda x: x*365).collect() print(df_visitors_yearly)
Let's take an example; if we were to analyze the synthetic datasets of visitor counts to our clothing store, we might have a list ofÂ
visitorsÂ denoting the daily visitors to our store. We can then create a parallelized version of the DataFrame, callÂ
sc.parallelize(visitors), and feed in theÂ
df_visitorsÂ then creates for us a DataFrame of visitors. We can then map a function; for example, making the daily numbers and extrapolating them into a yearly number by mapping aÂ
lambdaÂ function that multiplies the daily number (
365, which is theÂ numberÂ of days in a year. Then, we call aÂ
collect()Â function to make sure that Spark executes on thisÂ
lambdaÂ call. Lastly, we print outÂ
df_ visitors_yearly.Â Now, we have Spark working on this computation on our synthetic data behind the scenes, while this is simply a Python operation.Â
We can see that we've started a shell session with Spark in the following screenshot:
Spark is now available to us as a
spark variable. Let's try a simple thing in Spark.The first thing to do is to load a random file. In each Spark installation, there is a
README.md markdown file, so let's load it into our memory as follows:
text_file = spark.read.text("README.md")
If we use
spark.read.text and then put in
README.md, we get a few warnings, but we shouldn't be too concerned about that at the moment, as we will see later how we are going to fix these things. The main thing here is that we can use Python syntax to access Spark.
What we have done here is put
README.md as text data read by
spark into Spark, and we can use
text_file.count() can get Spark to count how many characters are in our text file as follows:
From this, we get the following output:
We can also see what the first line is with the following:
We will get the following output:
Row(value='# Apache Spark')
lines_with_spark = text_file.filter(text_file.value.contains("Spark"))
Here, we have filtered for lines using theÂ
filter()Â function, and within theÂ
filter()Â function, we have specified thatÂ
text_file_value.containsÂ includes the wordÂ
"Spark", and we have put those results into theÂ
We can modify the preceding command and simply addÂ
.count(), as follows:Â
We will now get the following output:
We can see thatÂ
20Â lines in the text fileÂ containÂ the wordÂ
Spark. This is just a simple exampleÂ ofÂ how we can useÂ the Spark shell.
SparkConf allows us to configure a Spark application. It sets various Spark parameters as key-value pairs, and so will usually create a
SparkConf object with a
SparkConf() constructor, which would then load values from the
spark.* underlying Java system.
There are a few useful functions; for example,we can use the
sets() function to set the configuration property. We can use the
setMaster() function to set the master URL to connect to. We can use the
setAppName()function to set the application name, and
setSparkHome() in order to set the path where Spark will be installed on worker nodes.
You can learn more about SparkConf atÂ https://spark.apache.org/docs/0.9.0/api/pyspark/pysaprk.conf.SparkConf-class.html.Â
In this chapter, we learnedabout the core concepts in Spark and PySpark. We learned about setting up Spark and using PySpark onWindows. We alsowent through the three main pillars of Spark, which are SparkContext,Spark shell, and SparkConf.
In the next chapter,Â we're going to look at getting your big data into Spark environments using RDDs.