Parallel Collections

Pascal Bugnion

December 2015

In this article by Pascal Bugnion, the author of Scala for Data Science, we will look at the ways of parallelizing computation and data processing over a single computer. Virtually, all new computers have more than one processing unit, and distributing a calculation over these cores can be an effective way of hastening medium-sized calculations.

(For more resources related to this topic, see here.)

Data science often involves the processing of medium or large amounts of data. As the previously exponential growth in the speed of a CPU has stalled while the amount of data continues to increase, leveraging the computers effectively must entail parallel computation.

Parallelizing the calculations over a single chip is suitable for the calculations involving gigabytes or a few terabytes of data. For the larger data flows, we must resort to the distribution of the computation over several computers in parallel.

Parallel collections

Parallel collections offer an extremely easy way to parallelize independent tasks. The reader, being familiar with Scala, will know that many tasks can be phrased as operations on collections, such as map, reduce, filter, or groupBy. Parallel collections are an implementation of the Scala collections that perform these operations in parallel over several threads.

Let's start with an example. We want to calculate the frequency of an occurrence of each letter in a sentence:

scala> val sentence = "The quick brown fox jumped over the lazy dog"

scala> val characters = sentence.toCharArray.toVector

Vector[Char] = Vector(T, h, e, , q, u, i, c, k, ...)

Note that we converted characters to a Scala Vector rather than keeping it as an array so as to guarantee immutability. All the examples in this section would work equally well with an array but using Vector is a good practice when we do not explicitly need a mutable iterable.

Let's convert the characters to a parallel vector, ParVector. To do this, we will use the following par method:

scala> val charactersPar = charaters.par

ParVector[Char] = ParVector(T, h, e, , q, u, i, c, k, , ...)

The ParVector objects support the same operations as a regular vector but they perform the operations in parallel over several threads.

Let's start by filtering out the spaces in charactersPar:

scala> val lettersPar = charactersPar.filter { _ != ' ' }

ParVector[Char] = ParVector(T, h, e, q, u, i, c, k, ...)

Notice how Scala hides the execution details. The interface and behavior of a parallel vector is identical to its serial counterpart, save for a few details that we will explore in the next section.

Let's use the toLower function to make all the letters lowercase in our sentence:

scala> val lowerLettersPar = lettersPar.map { _.toLower }

ParVector[Char] = ParVector(t, h, e, q, u, i, c, k, ...)

To find the frequency of the occurrence of each letter, we will use the groupBy method to group the characters in vectors containing all the occurrences of that character:

scala> val intermediateMap = lowerLettersPar.groupBy(identity)

ParMap[ParVector[Char]] = ParMap(e -> ParVector(e, e, e, e), ...)

Note how the groupBy method has created ParMap, the parallel equivalent of an immutable map. To get the number of the occurrences of each letter, we will do a mapValues call on intermediateMap, replacing each vector by its length:

scala> val occurenceNumber = intermediateMap.mapValues { _.length }

ParMap[Char,Int] = ParMap(e -> 4, x -> 1, n -> 1, j -> 1, ...)

Parallel collections make it very easy to parallelize some operation pipelines: all we had to do was call .par on the characters vector. All the subsequent operations were parallelized. This makes switching from a serial to a parallel implementation very easy.

Limitations of parallel collections

A part of the power and appeal of parallel collections is that they present the same interface as their serial counterparts: they have a map method, a foreach method, a filter method, and so on. By and large, these methods work in the same way in parallel collections as they do in serial. There are, however, some notable caveats. The most important one has to do with side effects. If an operation in a parallel collection has a side effect, this may result in a race condition: a situation where the final result depends on the order in which the threads perform their operations.

Side effects in collections arise most commonly when we update a variable defined outside of the collection. To give a trivial example of an unexpected behavior, let's define a count variable and increment it a thousand times using a range:

scala> var count = 0
scala> (0 until 1000).par.foreach { i => count += 1 }
scala> count
Int = 874 // not 1000!

What happened here? The function passed to foreach has a side effect: it increments count, a variable outside of the scope of the function. This is a problem because the +=operator is a sequence of the following two operations:

  1. Retrieve the value of countand add 1 to it.
  2. Assign the result back to count.

Let's imagine that the foreach loop has been parallelized over two threads. Thread A might read a count of 832 and add 1 to it to give 833. Before it has time to reassign 833 to the count, thread B reads the count, still at 832, and adds 1 to give 833. Thread A then assigns 833 to the count. Thread B then assigns 833 to the count. We've run through two updates but only incremented the count by 1. The problem arises because += can be separated in two instructions. This leaves room for the threads to interweave their operations. It is said to be nonatomic, which is shown as follows:

Time

Thread A

Thread B

Count in memory

1

It reads the count from the memory and increments it. The value of the count in the thread' s memory is 833.

 

832

2

 

It reads the count from the memory and increments it. The value of the count in the thread' s memory is 833.

832

3

It writes the count back to the shared memory.

 

833

4

 

It writes the count back to the shared memory.

833

The anatomy of a race condition is that both the threads, A and B, are trying to update count concurrently, resulting in one of the updates being overwritten. The final value of count is 833 instead of 834.

To give a somewhat more realistic example, let's look back at the example described in the previous section. To count the number of the occurrences of each letter, we could, conceivably, define a mutable Char -> Int hash map outside of the loop and increment the values as they arise, as follows:

scala> val occurenceNumber = scala.collection.mutable.Map.empty[Char, Int]

scala> lowerLettersPar.foreach {

|   c => occurenceNumber(c) = occurenceNumber.getOrElse(c, 0) + 1
}
scala> occurenceNumber('e') // Should be 4
Int = 2

Again, the discrepancy occurs because of the nonatomicity of the operations in the foreach loop.

In general, it is a good practice to avoid the side effects when using collections. They make the code harder to understand and preclude switching from the serial to the parallel collections.

Another limitation occurs in the reduction (or folding) operations. The function used to combine the items together must be associative. For instance:

scala> (0 until 1000).par.reduce(_ - _) // should be -499500

Int = 63620

As this seems like a rare use case, we will not dwell on it.

Error handling

In single-threaded programs, exception handling is relatively straightforward: if an exception occurs, the function can either handle it or escalate it. This is not nearly as obvious when parallelism is introduced; a single thread might fail, but the others might return successfully.

A parallel collection will throw an exception if any of its threads fail, as follows:

scala> Vector(2, 1, 3).par.map {

   case(1) => throw new Exception("error")

   case(x) => x

}

java.lang.Exception: error

...

There are cases when this isn't the behavior that we want. For instance, we might be using a parallel collection to retrieve a large number of web pages in parallel. We might not mind if a few of the pages cannot be fetched.

Scala's Try type was designed for this purpose. It is similar to Option in that it is a one element container:

scala> scala.util.Try(2)

Try[Int] = Success(2)

Unlike the Option type, which indicates whether an expression has a useful value, the Try type indicates whether an expression can execute without throwing an exception. Try(expression) will have a Success(expression) value if an expression evaluates without throwing an exception. If an exception occurs, it will have a Failure(exception) value.

This will make more sense with an example. Let's start by importing scala.util to avoid unnecessary typing:

scala> import scala.util._

To see the Try type in action, let's use it to wrap a call to Source.fromURL. Source.fromURL fetches a web page, opening a stream to the page's content if it executes successfully. If it fails, it throws an error:

scala> import scala.io.{Source, BufferedSource}

scala> val html = Source.fromURL("http://www.google.com")

scala.io.BufferedSource = non-empty iterator

scala> val html = Source.fromURL("garbage")

java.net.MalformedURLException: no protocol: garbage

...

Instead of letting the expression propagate and crash the rest of our code, we will wrap the call to Source.fromURL in Try:

scala> def fetchURL(url:String):Try[BufferedSource] =

|         Try(Source.fromURL(url))

scala> fetchURL("www.google.com")

Try[BufferedSource] = Success(non-empty iterator)

scala> fetchURL("garbage")

Try[BufferedSource] = Failure(java.net.MalformedURLException: no protocol: garbage)

All we need to do to retrieve URLs in a fault tolerant manner is to map fetchURL over a vector of URLs. If this vector is parallel, URLs will be fetched concurrently:

scala> val URLs = Vector("http://www.google.com",

|   "http://www.bbc.co.uk",

|   "not-a-url"

|)

scala> val pages = URLs.par.map(fetchURL)

ParVector[Try[BufferedSource]] = ParVector(

   Success(non-empty iterator),

   Success(non-empty iterator),

   Failure(java.net.MalformedURLException: no protocol: not-a-url))

)

We can then use a collect statement to act on the pages that we could fetch successfully:

scala> pages.collect { case(Success(it)) => it.size }

ParVector[Int] = ParVector(17880, 102968)

By making good use of Scala's built-in Try classes and parallel collections, we built a fault tolerant, multithreaded URL retriever in a few lines of code.

The Try type versus the try/catch statements:

The programmers with imperative or object-oriented backgrounds will be more familiar with the try/catch method to handle exceptions. We could have accomplished a similar functionality here by wrapping the code to fetch URLs in a try block, returning null if the call raises an exception.

However, besides being more verbose, returning null is less satisfactory: we lose all the information about the exception and null is less expressive than Failure(exception). Furthermore, returning a Try[T] type forces the caller to consider the possibility that the function might fail. By contrast, just returning T and coding failure with a null value allows the caller to ignore failure, raising the possibility of a confusing NullPointerException being thrown at a completely different point in the program.

Setting the parallelism level

So far, we considered parallel collections as black boxes: add par to a normal collection and all the operations are performed in parallel. Often, we will want more control over how the tasks are executed.

Internally, parallel collections work by distributing an operation over multiple threads. As the threads share the memory, parallel collections do not need to copy any data. Changing the number of threads available to the parallel collection will change the number of CPUs that are used to perform the tasks.

Parallel collections have a tasksupport attribute that controls the task execution, as follows:

scala> val parRange = (0 to 100).par

scala> parRange.tasksupport

TaskSupport = scala.collection.parallel.ExecutionContextTaskSupport@311a0b3e

scala> parRange.tasksupport.parallelismLevel

Int: 8 // Number of threads to be used

The task support object of a collection is an execution context and an abstraction capable of executing a Scala expression in parallel.

By default, the execution context in Scala 2.11 is a work-stealing thread pool. When a parallel collection submits the tasks, the context allocates the tasks to its threads. If a thread finds that it has finished its queued tasks, it will try and steal the outstanding tasks from the other threads. The default execution context maintains a thread pool with a number of threads that are equal to the number of CPUs.

The number of threads over which the parallel collection distributes the work can be changed by changing the task support. For instance, parallelizing the operations performed by a range over four threads can be done in the following way:

scala> val parRange = (0 to 1000).par

scala> parRange.tasksupport = new ForkJoinTaskSupport(

   new scala.concurrent.forkjoin.ForkJoinPool(4)

)

parRange.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@6e1134e1

scala> parRange.tasksupport.parallelismLevel

Int: 4

Futures

Parallel collections offer a simple yet powerful framework for parallel operations. However, they are limited in one respect: the total amount of work must be known in advance. This limitation is prohibitive for some problems.

Imagine that we want to write, for instance, a web crawler. The crawler is given an initial list of pages to crawl. It reads each page that it fetches, looking for additional URLs and adds these URLs to the set of pages to crawl. We could not use parallel collections for this problem as we will build the list of URLs to crawl on the fly.

We can, instead, use futures. A future represents the result of a unit of work that is being executed in a non-blocking manner. For instance, let's create a function that simulates a long calculation:

scala> def calculation(x:Int):Int = { Thread.sleep(10000) ; 2*x }

If we run calculation in the shell, execution is blocked for ten seconds while the calculation gets completed. Instead, let's run it in a different thread using a future so that we can carry on working while the calculation runs. We start by importing the concurrent package:

scala> import scala.concurrent._

To use futures, we need an execution context that will manage the tasks submitted to the threads. We have already come across execution contexts when discussing parallel collections: execution contexts are an abstraction to control the concurrent execution of the programming tasks. Futures expect an implicit execution context to be defined. For now, let's just use the default execution context. Let's bring it in the namespace:

scala> import scala.concurrent.ExecutionContext.Implicits.global

We are now ready to define the future:

scala> val f = Future { calculation(10) }

Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@156b88f5

Note that the shell doesn't block: it returns instantly. The calculation runs in a background thread. We can check whether the calculation has finished by checking the future's isCompleted attribute:

scala> f.isCompleted

Boolean = true

Let's see what the function returned:

scala> f.value

Option[scala.util.Try[Int]] = Some(Success(20))

The value attribute of a future has an Option[Try[T]] type. We have already seen how to use the Try type to handle exceptions gracefully in the context of parallel collections. It is used in much the same way here. A future's value is None until the future is completed, then it is set to Some(Success(value)) if the future is completed successfully or Some(Failure(error)) if an exception is thrown.

Repeatedly calling f.value until the future is completed works well in the shell but it does not generalize to the more complex programs. Instead, we want to tell the computer to do something once the future is complete. In the context of our web crawler, we might wrap the function that fetches a web page in a future. Once the web page has been fetched, the program should run a function to read the source code and extract a list of the links that it finds.

We can do this by setting the future's onComplete attribute:

scala> val f = Future { calculation(10) }

scala> f.onComplete(processWebPage)

scala> // wait 10 seconds

Success(20)

The function passed to onComplete is run when the future is finished. It takes a single argument of the Try[T] type, containing the result of the future.

Futures let us very easily wrap a functionality to make it execute asynchronously, abstracting away much of the complexity of multithreading. They provide a very versatile alternative to parallel collections when the total amount of work is not known in advance or when we want to access the intermediate results as soon as they appear.

Failure is normal: how to build resilient applications

By wrapping the output of the code such that it runs in a Try type, futures force the client code to consider the possibility that the code might fail. The client can isolate the effect of the failure to avoid the crashing of the whole application. He might, for instance, log the exception. In the case of the web crawler example, he might also readd the offending website to be scraped again at a later date. In the case of a database failure, he might roll back the transaction.

By treating the failure as a first-class citizen, rather than through an exceptional control flow bolted on at the end, we can build applications that are much more resilient.

Summary

By providing very high-level abstractions, Scala makes the writing of the parallel code intuitive and straightforward. Parallel collections and futures form an invaluable part of a data scientist's toolbox, allowing them to parallelize their code with minimal effort. However, while these high-level abstractions obviate the need to deal directly with threads, an understanding of the internals of Scala's concurrency model is necessary to avoid the race conditions.

Resources for Article:


Further resources on this subject:


You've been reading an excerpt of:

Scala for Data Science

Explore Title