Before I dive into more complex methods to analyze your data later in the book, I would like to stop at basic data exploratory tasks on which almost all data scientists spend at least 80-90% of their productive time. The data preparation, cleansing, transforming, and joining the data alone is estimated to be a $44 billion/year industry alone (Data Preparation in the Big Data Era by Federico Castanedo and Best Practices for Data Integration, O'Reilly Media, 2015). Given this fact, it is surprising that people only recently started spending more time on the science of developing best practices and establishing good habits, documentation, and teaching materials for the whole process of data preparation (Beautiful Data: The Stories Behind Elegant Data Solutions, edited by Toby Segaran and Jeff Hammerbacher, O'Reilly Media, 2009 and Advanced Analytics with Spark: Patterns for Learning from Data at Scale by Sandy Ryza et al., O'Reilly Media, 2015).
Few data scientists would agree on specific tools and techniques—and there are multiple ways to perform the exploratory data analysis, ranging from Unix command line to using very popular open source and commercial ETL and visualization tools. The focus of this chapter is how to use Scala and a laptop-based environment to benefit from techniques that are commonly referred as a functional paradigm of programming. As I will discuss, these techniques can be transferred to exploratory analysis over distributed system of machines using Hadoop/Spark.
What has functional programming to do with it? Spark was developed in Scala for a good reason. Many basic principles that lie at the foundation of functional programming, such as lazy evaluation, immutability, absence of side effects, list comprehensions, and monads go really well with processing data in distributed environments, specifically, when performing the data preparation and transformation tasks on big data. Thanks to abstractions, these techniques work well on a local workstation or a laptop. As mentioned earlier, this does not preclude us from processing very large datasets up to dozens of TBs on modern laptops connected to distributed clusters of storage/processing nodes. We can do it one topic or focus area at the time, but often we even do not have to sample or filter the dataset with proper partitioning. We will use Scala as our primary tool, but will resort to other tools if required.
While Scala is complete in the sense that everything that can be implemented in other languages can be implemented in Scala, Scala is fundamentally a high-level, or even a scripting, language. One does not have to deal with low-level details of data structures and algorithm implementations that in their majority have already been tested by a plethora of applications and time, in, say, Java or C++—even though Scala has its own collections and even some basic algorithm implementations today. Specifically, in this chapter, I'll be focusing on using Scala/Spark only for high-level tasks.
In this chapter, we will cover the following topics:
Installing Scala
Learning simple techniques for initial data exploration
Learning how to downsample the original dataset for faster turnover
Discussing the implementation of basic data transformation and aggregations in Scala
Getting familiar with big data processing tools such as Spark and Spark Notebook
Getting code for some basic visualization of datasets
If you have already installed Scala, you can skip this paragraph. One can get the latest Scala download from http://www.scala-lang.org/download/. I used Scala version 2.11.7 on Mac OS X El Capitan 10.11.5. You can use any other version you like, but you might face some compatibility problems with other packages such as Spark, a common problem in open source software as the technology adoption usually lags by a few released versions.
Tip
In most cases, you should try to maintain precise match between the recommended versions as difference in versions can lead to obscure errors and a lengthy debugging process.
If you installed Scala correctly, after typing scala
, you should see something similar to the following:
[akozlov@Alexanders-MacBook-Pro ~]$ scala Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40). Type in expressions to have them evaluated. Type :help for more information. scala>
This is a Scala read-evaluate-print-loop (REPL) prompt. Although Scala programs can be compiled, the content of this chapter will be in REPL, as we are focusing on interactivity with, maybe, a few exceptions. The :help
command provides a some utility commands available in REPL (note the colon at the start):

Now, you have a dataset and a computer. For convenience, I have provided you a small anonymized and obfuscated sample of clickstream data with the book repository that you can get at https://github.com/alexvk/ml-in-scala.git. The file in the chapter01/data/clickstream
directory contains lines with timestamp, session ID, and some additional event information such as URL, category information, and so on at the time of the call. The first thing one would do is apply transformations to find out the distribution of values for different columns in the dataset.
Figure 01-1 shows screenshot shows the output of the dataset in the terminal window of the gzcat chapter01/data/clickstream/clickstream_sample.tsv.gz | less –U
command. The columns are tab (^I
) separated. One can notice that, as in many real-world big data datasets, many values are missing. The first column of the dataset is recognizable as the timestamp. The file contains complex data such as arrays, structs, and maps, another feature of big data datasets.
Unix provides a few tools to dissect the datasets. Probably, less, cut, sort, and uniq are the most frequently used tools for text file manipulations. Awk, sed, perl, and tr can do more complex transformations and substitutions. Fortunately, Scala allows you to transparently use command-line tools from within Scala REPL, as shown in the following screenshot:

Figure 01-1. The clickstream file as an output of the less -U Unix command
Fortunately, Scala allows you to transparently use command-line tools from within Scala REPL:
[akozlov@Alexanders-MacBook-Pro]$ scala … scala> import scala.sys.process._ import scala.sys.process._ scala> val histogram = ( "gzcat chapter01/data/clickstream/clickstream_sample.tsv.gz" #| "cut -f 10" #| "sort" #| "uniq -c" #| "sort -k1nr" ).lineStream histogram: Stream[String] = Stream(7731 http://www.mycompany.com/us/en_us/, ?) scala> histogram take(10) foreach println 7731 http://www.mycompany.com/us/en_us/ 3843 http://mycompanyplus.mycompany.com/plus/ 2734 http://store.mycompany.com/us/en_us/?l=shop,men_shoes 2400 http://m.mycompany.com/us/en_us/ 1750 http://store.mycompany.com/us/en_us/?l=shop,men_mycompanyid 1556 http://www.mycompany.com/us/en_us/c/mycompanyid?sitesrc=id_redir 1530 http://store.mycompany.com/us/en_us/ 1393 http://www.mycompany.com/us/en_us/?cp=USNS_KW_0611081618 1379 http://m.mycompany.com/us/en_us/?ref=http%3A%2F%2Fwww.mycompany.com%2F 1230 http://www.mycompany.com/us/en_us/c/running
I used the scala.sys.process
package to call familiar Unix commands from Scala REPL. From the output, we can immediately see the customers of our Webshop are mostly interested in men's shoes and running, and that most visitors are using the referral code, KW_0611081618.
Tip
One may wonder when we start using complex Scala types and algorithms. Just wait, a lot of highly optimized tools were created before Scala and are much more efficient for explorative data analysis. In the initial stage, the biggest bottleneck is usually just the disk I/O and slow interactivity. Later, we will discuss more iterative algorithms, which are usually more memory intensive. Also note that the UNIX pipeline operations can be implicitly parallelized on modern multi-core computer architectures, as they are in Spark (we will show it in the later chapters).
It has been shown that using compression, implicit or explicit, on input data files can actually save you the I/O time. This is particularly true for (most) modern semi-structured datasets with repetitive values and sparse content. Decompression can also be implicitly parallelized on modern fast multi-core computer architectures, removing the computational bottleneck, except, maybe in cases where compression is implemented implicitly in hardware (SSD, where we don't need to compress the files explicitly). We also recommend using directories rather than files as a paradigm for the dataset, where the insert operation is reduced to dropping the data file into a directory. This is how the datasets are presented in big data Hadoop tools such as Hive and Impala.
Let's look at the numeric data, even though most of the columns in the dataset are either categorical or complex. The traditional way to summarize the numeric data is a five-number-summary, which is a representation of the median or mean, interquartile range, and minimum and maximum. I'll leave the computations of the median and interquartile ranges till the Spark DataFrame is introduced, as it makes these computations extremely easy; but we can compute mean, min, and max in Scala by just applying the corresponding operators:
scala> import scala.sys.process._ import scala.sys.process._ scala> val nums = ( "gzcat chapter01/data/clickstream/clickstream_sample.tsv.gz" #| "cut -f 6" ).lineStream nums: Stream[String] = Stream(0, ?) scala> val m = nums.map(_.toDouble).min m: Double = 0.0 scala> val m = nums.map(_.toDouble).sum/nums.size m: Double = 3.6883642764024662 scala> val m = nums.map(_.toDouble).max m: Double = 33.0
Sometimes one needs to get an idea of how a certain value looks across multiple fields—most common are IP/MAC addresses, dates, and formatted messages. For examples, if I want to see all IP addresses mentioned throughout a file or a document, I need to replace the cut
command in the previous example by grep -o -E [1-9][0-9]{0,2}(?:\\.[1-9][0-9]{0,2}){3}
, where the –o
option instructs grep
to print only the matching parts—a more precise regex for the IP address should be grep –o –E (?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)
, but is about 50% slower on my laptop and the original one works in most practical cases. I'll leave it as an excursive to run this command on the sample file provided with the book.
I've met quite a few data practitioners who scorn sampling. Ideally, if one can process the whole dataset, the model can only improve. In practice, the tradeoff is much more complex. First, one can build more complex models on a sampled set, particularly if the time complexity of the model building is non-linear—and in most situations, if it is at least N* log(N). A faster model building cycle allows you to iterate over models and converge on the best approach faster. In many situations, time to action is beating the potential improvements in the prediction accuracy due to a model built on complete dataset.
Sampling may be combined with appropriate filtering—in many practical situation, focusing on a subproblem at a time leads to better understanding of the whole problem domain. In many cases, this partitioning is at the foundation of the algorithm, like in decision trees, which are considered later. Often the nature of the problem requires you to focus on the subset of original data. For example, a cyber security analysis is often focused around a specific set of IPs rather than the whole network, as it allows to iterate over hypothesis faster. Including the set of all IPs in the network may complicate things initially if not throw the modeling off the right track.
When dealing with rare events, such as clickthroughs in ADTECH, sampling the positive and negative cases with different probabilities, which is also sometimes called oversampling, often leads to better predictions in short amount of time.
Fundamentally, sampling is equivalent to just throwing a coin—or calling a random number generator—for each data row. Thus it is very much like a stream filter operation, where the filtering is on an augmented column of random numbers. Let's consider the following example:
import scala.util.Random import util.Properties val threshold = 0.05 val lines = scala.io.Source.fromFile("chapter01/data/iris/in.txt").getLines val newLines = lines.filter(_ => Random.nextDouble() <= threshold ) val w = new java.io.FileWriter(new java.io.File("out.txt")) newLines.foreach { s => w.write(s + Properties.lineSeparator) } w.close
This is all good, but it has the following disadvantages:
To fix the first point, we'll need to pass a more complex object to the function, as we need to maintain the state during the original list traversal, which makes the original algorithm less functional and parallelizable (this will be discussed later):
import scala.reflect.ClassTag import scala.util.Random import util.Properties def reservoirSample[T: ClassTag](input: Iterator[T],k: Int): Array[T] = { val reservoir = new Array[T](k) // Put the first k elements in the reservoir. var i = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 } if (i < k) { // If input size < k, trim the array size reservoir.take(i) } else { // If input size > k, continue the sampling process. while (input.hasNext) { val item = input.next val replacementIndex = Random.nextInt(i) if (replacementIndex < k) { reservoir(replacementIndex) = item } i += 1 } reservoir } } val numLines=15 val w = new java.io.FileWriter(new java.io.File("out.txt")) val lines = io.Source.fromFile("chapter01/data/iris/in.txt").getLines reservoirSample(lines, numLines).foreach { s => w.write(s + scala.util.Properties.lineSeparator) } w.close
This will output numLines
lines. Similarly to reservoir sampling, stratified sampling is guaranteed to provide the same ratios of input/output rows for all strata defined by levels of another attribute. We can achieve this by splitting the original dataset into N subsets corresponding to the levels, performing the reservoir sampling, and merging the results afterwards. However, MLlib library, which will be covered in Chapter 3, Working with Spark and MLlib, already has stratified sampling implementation:
val origLinesRdd = sc.textFile("file://...") val keyedRdd = origLines.keyBy(r => r.split(",")(0)) val fractions = keyedRdd.countByKey.keys.map(r => (r, 0.1)).toMap val sampledWithKey = keyedRdd.sampleByKeyExact(fractions) val sampled = sampledWithKey.map(_._2).collect
The other bullet point is more subtle; sometimes we want a consistent subset of values across multiple datasets, either for reproducibility or to join with another sampled dataset. In general, if we sample two datasets, the results will contain random subsets of IDs which might have very little or no intersection. The cryptographic hashing functions come to the help here. The result of applying a hash function such as MD5 or SHA1 is a sequence of bits that is statistically uncorrelated, at least in theory. We will use the MurmurHash
function, which is part of the scala.util.hashing
package:
import scala.util.hashing.MurmurHash3._ val markLow = 0 val markHigh = 4096 val seed = 12345 def consistentFilter(s: String): Boolean = { val hash = stringHash(s.split(" ")(0), seed) >>> 16 hash >= markLow && hash < markHigh } val w = new java.io.FileWriter(new java.io.File("out.txt")) val lines = io.Source.fromFile("chapter01/data/iris/in.txt").getLines lines.filter(consistentFilter).foreach { s => w.write(s + Properties.lineSeparator) } w.close
This function is guaranteed to return exactly the same subset of records based on the value of the first field—it is either all records where the first field equals a certain value or none—and will come up with approximately one-sixteenth of the original sample; the range of hash
is 0
to 65,535
.
Often the most frequent values or five-number summary are not sufficient to get the first understanding of the data. The term descriptive statistics is very generic and may refer to very complex ways to describe the data. Quantiles, a Paretto chart or, when more than one attribute is analyzed, correlations are also examples of descriptive statistics. When sharing all these ways to look at the data aggregates, in many cases, it is also important to share the specific computations to get to them.
Scala or Spark Notebook https://github.com/Bridgewater/scala-notebook, https://github.com/andypetrella/spark-notebook record the whole transformation path and the results can be shared as a JSON-based *.snb
file. The Spark Notebook project can be downloaded from http://spark-notebook.io, and I will provide a sample Chapter01.snb
file with the book. I will use Spark, which I will cover in more detail in Chapter 3, Working with Spark and MLlib.
For this particular example, Spark will run in the local mode. Even in the local mode Spark can utilize parallelism on your workstation, but it is limited to the number of cores and hyperthreads that can run on your laptop or workstation. With a simple configuration change, however, Spark can be pointed to a distributed set of machines and use resources across a distributed set of nodes.
Here is the set of commands to download the Spark Notebook and copy the necessary files from the code repository:
[akozlov@Alexanders-MacBook-Pro]$ wget http://s3.eu-central-1.amazonaws.com/spark-notebook/zip/spark-notebook-0.6.3-scala-2.11.7-spark-1.6.1-hadoop-2.6.4-with-hive-with-parquet.zip ... [akozlov@Alexanders-MacBook-Pro]$ unzip -d ~/ spark-notebook-0.6.3-scala-2.11.7-spark-1.6.1-hadoop-2.6.4-with-hive-with-parquet.zip ... [akozlov@Alexanders-MacBook-Pro]$ ln -sf ~/ spark-notebook-0.6.3-scala-2.11.7-spark-1.6.1-hadoop-2.6.4-with-hive-with-parquet ~/spark-notebook [akozlov@Alexanders-MacBook-Pro]$ cp chapter01/notebook/Chapter01.snb ~/spark-notebook/notebooks [akozlov@Alexanders-MacBook-Pro]$ cp chapter01/ data/kddcup/kddcup.parquet ~/spark-notebook [akozlov@Alexanders-MacBook-Pro]$ cd ~/spark-notebook [akozlov@Alexanders-MacBook-Pro]$ bin/spark-notebook Play server process ID is 2703 16/04/14 10:43:35 INFO play: Application started (Prod) 16/04/14 10:43:35 INFO play: Listening for HTTP on /0:0:0:0:0:0:0:0:9000 ...
Now you can open the notebook at http://localhost:9000
in your browser, as shown in the following screenshot:

Figure 01-2. The first page of the Spark Notebook with the list of notebooks
Open the Chapter01
notebook by clicking on it. The statements are organized into cells and can be executed by clicking on the small right arrow at the top, as shown in the following screenshot, or run all cells at once by navigating to Cell | Run All:

Figure 01-3. Executing the first few cells in the notebook
First, we will look at the discrete variables. For example, to get the other observable attributes. This task would be totally impossible if distribution of the labels, issue the following code:
val labelCount = df.groupBy("lbl").count().collect labelCount.toList.map(row => (row.getString(0), row.getLong(1)))
The first time I read the dataset, it took about a minute on MacBook Pro, but Spark caches the data in memory and the subsequent aggregation runs take only about a second. Spark Notebook provides you the distribution of the values, as shown in the following screenshot:

Figure 01-4. Computing the distribution of values for a categorical field
I can also look at crosstab counts for pairs of discrete variables, which gives me an idea of interdependencies between the variables using http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions—the object does not support computing correlation measures such as chi-square yet:

Figure 01-5. Contingency table or crosstab
However, we can see that the most popular service is private and it correlates well with the SF
flag. Another way to analyze dependencies is to look at 0
entries. For example, the S2
and S3
flags are clearly related to the SMTP and FTP traffic since all other entries are 0
.
Of course, the most interesting correlations are with the target variable, but these are better discovered by supervised learning algorithms that I will cover in Chapter 3, Working with Spark and MLlib, and Chapter 5, Regression and Classification.

Figure 01-6. Computing simple aggregations using org.apache.spark.sql.DataFrameStatFunctions.
Analogously, we can compute correlations for numerical variables with the dataFrame.stat.corr()
and dataFrame.stat.cov()
functions (refer to Figure 01-6). In this case, the class supports the Pearson correlation coefficient. Alternatively, we can use the standard SQL syntax on the parquet file directly:
sqlContext.sql("SELECT lbl, protocol_type, min(duration), avg(duration), stddev(duration), max(duration) FROM parquet.`kddcup.parquet` group by lbl, protocol_type")
Finally, I promised you to compute percentiles. Computing percentiles usually involves sorting the whole dataset, which is expensive; however, if the tile is one of the first or the last ones, usually it is possible to optimize the computation:
val pct = sqlContext.sql("SELECT duration FROM parquet.`kddcup.parquet` where protocol_type = 'udp'").rdd.map(_.getLong(0)).cache pct.top((0.05*pct.count).toInt).last
Computing the exact percentiles for a more generic case is more computationally expensive and is provided as a part of the Spark Notebook example code.
You probably noticed that detecting correlations from contingency tables is hard. Detecting patterns takes practice, but many people are much better at recognizing the patterns visually. Detecting actionable patterns is one of the primary goals of machine learning. While advanced supervised machine learning techniques that will be covered in Chapter 4, Supervised and Unsupervised Learning and Chapter 5, Regression and Classification exist, initial analysis of interdependencies between variables can help with the right transformation of variables or selection of the best inference technique.
Multiple well-established visualization tools exist and there are multiple sites, such as http://www.kdnuggets.com, which specialize on ranking and providing recommendations on data analysis, data explorations, and visualization software. I am not going to question the validity and accuracy of such rankings in this book, and very few sites actually mention Scala as a specific way to visualize the data, even if this is possible with, say, a D3.js
package. A good visualization is a great way to deliver your findings to a larger audience. One look is worth a thousand words.
For the purposes of this chapter, I will use Grapher that is present on every Mac OS notebook. To open Grapher, go to Utilities (shift + command + U in Finder) and click on the Grapher icon (or search by name by pressing command + space). Grapher presents many options, including the following Log-Log and Polar coordinates:

Figure 01-7. The Grapher window
Fundamentally, the amount of information that can be delivered through visualization is limited by the number of pixels on the screen, which, for most modern computers, is in millions and color variations, which arguably can also be in millions (Judd, Deane B.; Wyszecki, Günter (1975). Color in Business, Science and Industry. Wiley Series in Pure and Applied Optics (3rd ed.). New York). If I am working on a multidimensional TB dataset, the dataset first needs to be summarized, processed, and reduced to a size that can be viewed on a computer screen.
For the purpose of illustration, I will use the Iris UCI dataset that can be found at https://archive.ics.uci.edu/ml/datasets/Iris. To bring the dataset into the tool, type the following code (on Mac OS):
[akozlov@Alexanders-MacBook-Pro]$ pbcopy < chapter01/data/iris/in.txt
Open the new Point Set in the Grapher (command + alt + P), press Edit Points… and paste the data by pressing command + V. The tools has line-fitting capabilities with basic linear, polynomial, and exponential families and provides the popular chi-squared metric to estimate the goodness of the fit with respect to the number of free parameters:

Figure 01-8. Fitting the Iris dataset using Grapher on Mac OS X
We will cover how to estimate the goodness of model fit in the following chapters.
I've tried to establish a common ground to perform a more complex data science later in the book. Don't expect these to be a complete set of exploratory techniques, as the exploratory techniques can extend to running very complex modes. However, we covered simple aggregations, sampling, file operations such as read and write, working with tools such as notebooks and Spark DataFrames, which brings familiar SQL constructs into the arsenal of an analyst working with Spark/Scala.
The next chapter will take a completely different turn by looking at the data pipelines as a part of a data-driven enterprise and cover the data discovery process from the business perspective: what are the ultimate goals we are trying to accomplish by doing the data analysis. I will cover a few traditional topics of ML, such as supervised and unsupervised learning, after this before delving into more complex representations of the data, where Scala really shows it's advantage over SQL.