Using the Spark Shell

Spark offers a streamlined way to write distributed programs and this tutorial gives you the know-how as a software developer to make the most of Spark’s many great features, providing an extra string to your bow.

(For more resources related to this topic, see here.)

Loading a simple text file

When running a Spark shell and connecting to an existing cluster, you should see something specifying the app ID like Connected to Spark cluster with app ID app-20130330015119-0001. The app ID will match the application entry as shown in the web UI under running applications (by default, it would be viewable on port 8080). You can start by downloading a dataset to use for some experimentation. There are a number of datasets that are put together for The Elements of Statistical Learning, which are in a very convenient form for use. Grab the spam dataset using the following command:

wget http://www-stat.stanford.edu/~tibs/ElemStatLearn/datasets/spam.data

Now load it as a text file into Spark with the following command inside your Spark shell:

scala> val inFile = sc.textFile("./spam.data")

This loads the spam.data file into Spark with each line being a separate entry in the RDD (Resilient Distributed Datasets).

Note that if you've connected to a Spark master, it's possible that it will attempt to load the file on one of the different machines in the cluster, so make sure it's available on all the cluster machines. In general, in future you will want to put your data in HDFS, S3, or similar file systems to avoid this problem. In a local mode, you can just load the file directly, for example, sc.textFile([filepah]). To make a file available across all the machines, you can also use the addFile function on the SparkContext by writing the following code:

scala> import spark.SparkFiles; scala> val file = sc.addFile("spam.data") scala> val inFile = sc.textFile(SparkFiles.get("spam.data"))

Just like most shells, the Spark shell has a command history.You can press the up arrow key to get to the previous commands. Getting tired of typing or not sure what method you want to call on an object? Press Tab, and the Spark shell will autocomplete the line of code as best as it can.

For this example, the RDD with each line as an individual string isn't very useful, as our data input is actually represented as space-separated numerical information. Map over the RDD, and quickly convert it to a usable format (note that _.toDouble is the same as x => x.toDouble):

scala> val nums = inFile.map(x => x.split(' ').map(_.toDouble))

Verify that this is what we want by inspecting some elements in the nums RDD and comparing them against the original string RDD. Take a look at the first element of each RDD by calling .first() on the RDDs:

scala> inFile.first() [...] res2: String = 0 0.64 0.64 0 0.32 0 0 0 0 0 0 0.64 0 0 0 0.32 0 1.29 1.93
0 0.96 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.778 0 0 3.756 61 278 1 scala> nums.first() [...] res3: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)

Using the Spark shell to run logistic regression

When you run a command and have not specified a left-hand side (that is, leaving out the val x of val x = y), the Spark shell will print the value along with res[number]. The res[number] function can be used as if we had written val res[number] = y.Now that you have the data in a more usable format, try to do something cool with it! Use Spark to run logistic regression over the dataset as follows:

scala> import spark.util.Vector
import spark.util.Vector
scala> case class DataPoint(x: Vector, y: Double)
defined class DataPoint
scala> def parsePoint(x: Array[Double]): DataPoint = {
DataPoint(new Vector(x.slice(0,x.size-2)) , x(x.size-1))
}
parsePoint: (x: Array[Double])this.DataPoint
scala> val points = nums.map(parsePoint(_))
points: spark.RDD[this.DataPoint] = MappedRDD[3] at map at
<console>:24
scala> import java.util.Random
import java.util.Random
scala> val rand = new Random(53)
rand: java.util.Random = java.util.Random@3f4c24
scala> var w = Vector(nums.first.size-2, _ => rand.nextDouble)
13/03/31 00:57:30 INFO spark.SparkContext: Starting job: first at
<console>:20
...
13/03/31 00:57:30 INFO spark.SparkContext: Job finished: first at
<console>:20, took 0.01272858 s
w: spark.util.Vector = (0.7290865701603526, 0.8009687428076777,
0.6136632797111822, 0.9783178194773176, 0.3719683631485643,
0.46409291255379836, 0.5340172959927323, 0.04034252433669905,
0.3074428389716637, 0.8537414030626244, 0.8415816118493813,
0.719935849109521, 0.2431646830671812, 0.17139348575456848,
0.5005137792223062, 0.8915164469396641, 0.7679331873447098,
0.7887571495335223, 0.7263187438977023, 0.40877063468941244,
0.7794519914671199, 0.1651264689613885, 0.1807006937030201,
0.3227972103818231, 0.2777324549716147, 0.20466985600105037,
0.5823059390134582, 0.4489508737465665, 0.44030858771499415,
0.6419366305419459, 0.5191533842209496, 0.43170678028084863,
0.9237523536173182, 0.5175019655845213, 0.47999523211827544,
0.25862648071479444, 0.020548000101787922, 0.18555332739714137, 0....
scala> val iterations = 100
iterations: Int = 100
scala> import scala.math._
scala> for (i <- 1 to iterations) {
val gradient = points.map(p =>
(1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
).reduce(_ + _)
w -= gradient
}
[....]
scala> w
res27: spark.util.Vector = (0.2912515190246098, 1.05257972144256,
1.1620192443948825, 0.764385365541841, 1.3340446477767611,
0.6142105091995632, 0.8561985593740342, 0.7221556020229336,
0.40692442223198366, 0.8025693176035453, 0.7013618380649754,
0.943828424041885, 0.4009868306348856, 0.6287356973527756,
0.3675755379524898, 1.2488466496117185, 0.8557220216380228,
0.7633511642942988, 6.389181646047163, 1.43344096405385,
1.729216408954399, 0.4079709812689015, 0.3706358251228279,
0.8683036382227542, 0.36992902312625897, 0.3918455398419239,
0.2840295056632881, 0.7757126171768894, 0.4564171647415838,
0.6960856181900357, 0.6556402580635656, 0.060307680034745986,
0.31278587054264356, 0.9273189009376189, 0.0538302050535121,
0.545536066902774, 0.9298009485403773, 0.922750704590723,
0.072339496591

If things went well, you just used Spark to run logistic regression. Awsome! We have just done a number of things: we have defined a class, we have created an RDD, and we have also created a function. As you can see the Spark shell is quite powerful. Much of the power comes from it being based on the Scala REPL (the Scala interactive shell), so it inherits all the power of the Scala REPL (Read-Evaluate-Print Loop). That being said, most of the time you will probably want to work with a more traditionally compiled code rather than working in the REPL environment.

Summary

In this article, you have learned how to load our data and how to use Spark to run logistic regression.

Resources for Article:


Further resources on this subject:


Books to Consider

comments powered by Disqus
X

An Introduction to 3D Printing

Explore the future of manufacturing and design  - read our guide to 3d printing for free