The reader of this post should be familiar with basic concepts of Spark, such as the shell and RDDs.
Data sizes have increased, but our exploration tools and techniques have not evolved as fast. Traditional Hadoop Map Reduce jobs are cumbersome and time consuming to develop. Also, Pig isn't quite as fully featured and easy to work with. Exploration can mean parsing/analyzing raw text documents, analyzing log files, processing tabular data in various formats, and exploring data that may or may not be correctly formatted.
This is where a tool like Spark excels. It provides an interactive shell for quick processing, prototyping, exploring, and slicing and dicing data. Spark works with R, Scala, and Python. In conjunction with Jupyter notebooks, we get a clean web interface to write out python, R, or Scala code backed by a Spark cluster. Jupyter notebook is also a great tool for presenting our findings, since we can do inline visualizations and easily share them as a PDF on GitHub or through a web viewer. The power of this set up is that we make Spark do the heavy lifting while still having the flexibility to test code on a small subset of data via the interactive notebooks. Another powerful capability of Spark is its Data Frames API. After we have cleaned our data (dealt with badly formatted rows that can't be loaded correctly), we can load it as a Data Frame. Once the data is a loaded as a Data Frame, we can use the Spark SQL to explore the data. Since notebooks can be shared, this is also a great way to let the developers do the work of cleaning the data and loading it as a Data Frame. Analysts, data scientists, and the likes can then use this data for their tasks. Data Frames can also be exported as Hive tables, which are commonly used in Hadoop-based warehouses.
For this section, we will be using examples that I have uploaded on GitHub. These examples can be found at here.
In addition to the examples, there is also a Docker container for running these examples that have been provided. The container runs Spark in a pseudo-distributed mode, and has Jupyter notebook configured with to run Python/PYspark.
To set this up, in your environment, you need a running spark cluster with Jupyter notebook installed. Jupyter notebook, by default, only has the Python kernel configured. You can download additional kernels for Jupyter notebook to run R and Scala.
To run Jupyter notebook with Pyspark, use the following command on your cluster:
IPYTHON_OPTS="notebook --pylab inline --notebook-dir=<directory sto store notebooks>" MASTER=local ./bin/pyspark
When you start Jupyter notebook in the way we mentioned earlier, it initializes a few critical variables. One of them is the Spark Context (sc), which is used to interact with all spark-related tasks. The other is sqlContext, which is the Spark SQL context. This is used to interact with Spark SQL (create Data Frames, run queries, and so on).
You need to understand the following:
- Log Analysis
In this example, we use a log file from Apache Server. The code for this example can be found at here.
We load our log file in question using:
log_file = sc.textFile("../data/log_file.txt")
Spark can load files from HDFS, local filesystem, and S3 natively. Other storage formats libraries can be found freely on the Internet, or you could write you own formats (Blog post for another time). The previous command loads the log file.
We then use Python’s native shlex library to split the file into different fields and use the Sparks map command to load them as a Row. An RDD consisting of rows can easily be registered as a DataFrame.
How we arrived at this solution is where data exploration comes in. We use the Sparks takeSample method to sample the file and get five rows:
These sample rows are helpful in determining how to parse and load the file.
Once we have written our code to load the file, we can apply it to the dataset using map to create a new RDD consisting of Rows to test code on a subset of data in a similar manner using the take or takeSample methods. The take method sequentially reads rows from the file, so although it is faster, it may not be a good representation of the dataset. The take sample method on the other hand randomly picks sample rows from the file; this has a better representation.
To create the new RDD and register it as a DataFrame, we use the following code:
schema_DF = splits.map(create_schema).toDF()
Once we have created the DataFrame and tested it using take/takeSample to make sure that our loading code is working, we can register it as a table using the following:
Once it is registered as a table, we can run SQL queries on the log file:
sample = sqlCtx.sql('SELECT * FROM logs LIMIT 10').collect()
Note that the collect() method collects the result to the driver’s memory so this may not be feasible for large datasets. Use take/takeSample instead to sample data if your dataset is large.
The beauty of using Spark with Jupyter is that all this exploration work takes only a few lines of code. It can be written interactively with all the trial and error we needed, the processed data can be easily shared, and running interactive queries on this data is easy. Last but not least, this can easily scale to massive (GB, TB) data sets.
- k-means on the Iris dataset
In this example, we use data from the Iris dataset, which contains measurements of sepal and petal length and width. This is a popular open source dataset used to showcase classification algorithms.
In this case, we use Spark’s k-Means algorithm from the MLlib library of Spark. MLlib is Spark’s machine learning library.
The code and the output can be found at here.
In this example, we are not going to get into too much detail since some of the concepts are outside the scope of this blog post.
This example showcases how we load the Iris dataset and create a DataFrame with it. We then train a k-means classifier on this dataset, and then we visualize our classification results.
The power of this is that we did a somewhat complex task of parsing a dataset, creating a DataFrame, training a machine learning classifier, and visualizing the data in an interactive and scalable manner.
The repository contains several more examples. Feel free to reach out to me if you have any questions. If you would like to see more posts with practical examples, please let us know.
About the Author
Anant Asthana is a data scientist and principal architect at Pythian, and he can be found on Github at anantasty.