Reader small image

You're reading from  Scala for Data Science

Product typeBook
Published inJan 2016
Reading LevelIntermediate
Publisher
ISBN-139781785281372
Edition1st Edition
Languages
Right arrow
Author (1)
Pascal Bugnion
Pascal Bugnion
author image
Pascal Bugnion

Pascal Bugnion is a data engineer at the ASI, a consultancy offering bespoke data science services. Previously, he was the head of data engineering at SCL Elections. He holds a PhD in computational physics from Cambridge University. Besides Scala, Pascal is a keen Python developer. He has contributed to NumPy, matplotlib and IPython. He also maintains scikit-monaco, an open source library for Monte Carlo integration. He currently lives in London, UK.
Read more about Pascal Bugnion

Right arrow

Chapter 9. Concurrency with Akka

Much of this book focusses on taking advantage of multicore and distributed architectures. In Chapter 4, Parallel Collections and Futures, you learned how to use parallel collections to distribute batch processing problems over several threads and how to perform asynchronous computations using futures. In Chapter 7, Web APIs, we applied this knowledge to query the GitHub API with several concurrent threads.

Concurrency abstractions such as futures and parallel collections simplify the enormous complexity of concurrent programming by limiting what you can do. Parallel collections, for instance, force you to phrase your parallelization problem as a sequence of pure functions on collections.

Actors offer a different way of thinking about concurrency. Actors are very good at encapsulating state. Managing state shared between different threads of execution is probably the most challenging part of developing concurrent applications, and, as we will discover in this...

GitHub follower graph


In the previous two chapters, we explored the GitHub API, learning how to query the API and parse the results using json-4s.

Let's imagine that we want to extract the GitHub follower graph: we want a program that will start from a particular user, extract this user followers, and then extract their followers until we tell it to stop. The catch is that we don't know ahead of time what URLs we need to fetch: when we download the login names of a particular user's followers, we need to verify whether we have fetched these users previously. If not, we add them to a queue of users whose followers we need to fetch. Algorithm aficionados might recognize this as breadth-first search.

Let's outline how we might write this in a single-threaded way. The central components are a set of visited users and queue of future users to visit:

val seedUser = "odersky" // the origin of the network

// Users whose URLs need to be fetched 
val queue = mutable.Queue(seedUser) 

// set of users...

Actors as people


In the previous section, you learned that an actor encapsulates state, interacting with the outside world through messages. Actors make concurrent programming more intuitive because they behave a little bit like an ideal workforce.

Let's think of an actor system representing a start-up with five people. There's Chris, the CEO, and Mark, who's in charge of marketing. Then there's Sally, who heads the engineering team. Sally has two minions, Bob and Kevin. As every good organization needs an organizational chart, refer to the following diagram:

Let's say that Chris receives an order. He will look at the order, decide whether it is something that he can process himself, and if not, he will forward it to Mark or Sally. Let's assume that the order asks for a small program so Bob forwards the order to Sally. Sally is very busy working on a backlog of orders so she cannot process the order message straightaway, and it will just sit in her mailbox for a short while. When she finally...

Hello world with Akka


Let's install Akka. We add it as a dependency to our build.sbt file:

scalaVersion := "2.11.7"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.0"

We can now import Akka as follows:

import akka.actor._

For our first foray into the world of actors, we will build an actor that echoes every message it receives. The code examples for this section are in a directory called chap09/hello_akka in the sample code provided with this book (https://github.com/pbugnion/s4ds):

// EchoActor.scala
import akka.actor._

class EchoActor extends Actor with ActorLogging {
  def receive = {
    case msg:String => 
      Thread.sleep(500)
      log.info(s"Received '$msg'") 
  }
}

Let's pick this example apart, starting with the constructor. Our actor class must extend Actor. We also add ActorLogging, a utility trait that adds the log attribute.

The Echo actor exposes a single method, receive. This is the actor's only way of communicating with the external world. To be useful...

Case classes as messages


In our "hello world" example, we constructed an actor that is expected to receive a string as message. Any object can be passed as a message, provided it is immutable. It is very common to use case classes to represent messages. This is better than using strings because of the additional type safety: the compiler will catch a typo in a case class but not in a string.

Let's rewrite our EchoActor to accept instances of case classes as messages. We will make it accept two different messages: EchoMessage(message) and EchoHello, which just echoes a default message. The examples for this section and the next are in the chap09/hello_akka_case_classes directory in the sample code provided with this book (https://github.com/pbugnion/s4ds).

A common Akka pattern is to define the messages that an actor can receive in the actor's companion object:

// EchoActor.scala

object EchoActor { 
  case object EchoHello
  case class EchoMessage(msg:String)
}

Let's change the actor definition...

Actor construction


Actor construction is a common source of difficulty for people new to Akka. Unlike (most) ordinary objects, you never instantiate actors explicitly. You would never write, for instance, val echo = new EchoActor. In fact, if you try this, Akka raises an exception.

Creating actors in Akka is a two-step process: you first create a Props object, which encapsulates the properties needed to construct an actor. The way to construct a Props object differs depending on whether the actor takes constructor arguments. If the constructor takes no arguments, we simply pass the actor class as a type parameter to Props:

val echoProps = Props[EchoActor]

If we have an actor whose constructor does take arguments, we must pass these as additional arguments when defining the Props object. Let's consider the following actor, for instance:

class TestActor(a:String, b:Int) extends Actor { ... }

We pass the constructor arguments to the Props object as follows:

val testProps = Props(classOf[TestActor...

Anatomy of an actor


Before diving into a full-blown application, let's look at the different components of the actor framework and how they fit together:

  • Mailbox: A mailbox is basically a queue. Each actor has its own mailbox. When you send a message to an actor, the message lands in its mailbox and does nothing until the actor takes it off the queue and passes it through its receive method.

  • Messages: Messages make synchronization between actors possible. A message can have any type with the sole requirement that it should be immutable. In general, it is better to use case classes or case objects to gain the compiler's help in checking message types.

  • Actor reference: When we create an actor using val echo1 = system.actorOf(Props[EchoActor]), echo1 has type ActorRef. An ActorRef is a proxy for an actor and is what the rest of the world interacts with: when you send a message, you send it to the ActorRef, not to the actor directly. In fact, you can never obtain a handle to an actor directly...

Follower network crawler


The end game for this chapter is to build a crawler to explore GitHub's follower graph. We have already outlined how we can do this in a single-threaded manner earlier in this chapter. Let's design an actor system to do this concurrently.

The moving parts in the code are the data structures managing which users have been fetched or are being fetched. These need to be encapsulated in an actor to avoid race conditions arising from multiple actors trying to change them concurrently. We will therefore create a fetcher manager actor whose job is to keep track of which users have been fetched and which users we are going to fetch next.

The part of the code that is likely to be a bottleneck is querying the GitHub API. We therefore want to be able to scale the number of workers doing this concurrently. We will create a pool of fetchers, actors responsible for querying the API for the followers of a particular user. Finally, we will create an actor whose responsibility is to...

Fetcher actors


The workhorse of our application is the fetcher, the actor responsible for fetching the follower details from GitHub. In the first instance, our actor will accept a single message, Fetch(user). It will fetch the followers corresponding to user and log the response to screen. We will use the recipes developed in Chapter 7, Web APIs, to query the GitHub API with an OAuth token. We will inject the token through the actor constructor.

Let's start with the companion object. This will contain the definition of the Fetch(user) message and two factory methods to create the Props instances. You can find the code examples for this section in the chap09/fetchers_alone directory in the sample code provided with this book (https://github.com/pbugnion/s4ds):

// Fetcher.scala
import akka.actor._
import scalaj.http._
import scala.concurrent.Future

object Fetcher {
  // message definitions
  case class Fetch(login:String)

  // Props factory definitions
  def props(token:Option[String]):Props...

Routing


In the previous example, we created four fetchers and dispatched messages to them, one after the other. We have a pool of identical actors among which we distribute tasks. Manually routing the messages to the right actor to maximize the utilization of our pool is painful and error-prone. Fortunately, Akka provides us with several routing strategies that we can use to distribute work among our pool of actors. Let's rewrite the previous example with automatic routing. You can find the code examples for this section in the chap09/fetchers_routing directory in the sample code provided with this book (https://github.com/pbugnion/s4ds). We will reuse the same definition of Fetchers and its companion object as we did in the previous section.

Let's start by importing the routing package:

// FetcherDemo.scala
import akka.routing._

A router is an actor that forwards the messages that it receives to its children. The easiest way to define a pool of actors is to tell Akka to create a router and...

Message passing between actors


Merely logging the API response is not very useful. To traverse the follower graph, we must perform the following:

  • Check the return code of the response to make sure that the GitHub API was happy with our request

  • Parse the response as JSON

  • Extract the login names of the followers and, if we have not fetched them already, push them into the queue

You learned how to do all these things in Chapter 7, Web APIs, but not in the context of actors.

We could just add the additional processing steps to the receive method of our Fetcher actor: we could add further transformations to the API response by future composition. However, having actors do several different things, and possibly failing in several different ways, is an anti-pattern: when we learn about managing the actor life cycle, we will see that it becomes much more difficult to reason about our actor systems if the actors contain several bits of logic.

We will therefore use a pipeline of three different actors:

  • The...

Queue control and the pull pattern


We have now defined the three worker actors in our crawler application. The next step is to define the manager. The fetcher manager is responsible for keeping a queue of logins to fetch as well as a set of login names that we have already seen in order to avoid fetching the same logins more than once.

A first attempt might involve building an actor that keeps a set of users that we have already seen and just dispatches it to a round-robin router for fetchers when it is given a new user to fetch. The problem with this approach is that the number of messages in the fetchers' mailboxes would accumulate quickly: for each API query, we are likely to get tens of followers, each of which is likely to make it back to a fetcher's inbox. This gives us very little control over the amount of work piling up.

The first problem that this is likely to cause involves the GitHub API rate limit: even with authentication, we are limited to 5,000 requests per hour. It would be...

Accessing the sender of a message


When our fetcher manager receives a GiveMeWork request, we will need to send work back to the correct fetcher. We can access the actor who sent a message using the sender method, which is a method of Actor that returns the ActorRef corresponding to the actor who sent the message currently being processed. The case statement corresponding to GiveMeWork in the fetcher manager is therefore:

def receive = {
  case GiveMeWork =>
    login = // get next login to fetch
    sender ! Fetcher.Fetch(login)
  ...
}

As sender is a method, its return value will change for every new incoming message. It should therefore only be used synchronously with the receive method. In particular, using it in a future is dangerous:

def receive = {
  case DoSomeWork =>
    val work = Future { Thread.sleep(20000) ; 5 }
    work.onComplete { result => 
      sender ! Complete(result) // NO!
    }
}

The problem is that when the future is completed 20 seconds after the message is...

Stateful actors


The behavior of the fetcher manager depends on whether it has work to give out to the fetchers:

  • If it has work to give, it needs to respond to GiveMeWork messages with a Fetcher.Fetch message

  • If it does not have work, it must ignore the GiveMeWork messages and, if work gets added, it must send a WorkAvailable message to the fetchers

Encoding the notion of state is straightforward in Akka. We specify different receive methods and switch from one to the other depending on the state. We will define the following receive methods for our fetcher manager, corresponding to each of the states:

// receive method when the queue is empty
def receiveWhileEmpty: Receive = { 
    ... 
}

// receive method when the queue is not empty
def receiveWhileNotEmpty: Receive = {
    ...
}

Note that we must define the return type of the receive methods as Receive. To switch the actor from one method to the other, we can use context.become(methodName). Thus, for instance, when the last login name is popped...

Follower network crawler


We are now ready to code up the remaining pieces of our network crawler. The largest missing piece is the fetcher manager. Let's start with the companion object. As with the worker actors, this just contains the definitions of the messages that the actor can receive and a factory to create the Props instance:

// FetcherManager.scala
import scala.collection.mutable
import akka.actor._

object FetcherManager {
  case class AddToQueue(login:String)
  case object GiveMeWork

  def props(token:Option[String], nFetchers:Int) = 
    Props(classOf[FetcherManager], token, nFetchers)
}

The manager can receive two messages: AddToQueue, which tells it to add a username to the queue of users whose followers need to be fetched, and GiveMeWork, emitted by the fetchers when they are unemployed.

The manager will be responsible for launching the fetchers, response interpreter, and follower extractor, as well as maintaining an internal queue of usernames and a set of usernames that we...

Fault tolerance


Real programs fail, and they fail in unpredictable ways. Akka, and the Scala community in general, favors planning explicitly for failure rather than trying to write infallible applications. A fault tolerant system is a system that can continue to operate when one or more of its components fails. The failure of an individual subsystem does not necessarily mean the failure of the application. How does this apply to Akka?

The actor model provides a natural unit to encapsulate failure: the actor. When an actor throws an exception while processing a message, the default behavior is for the actor to restart, but the exception does not leak out and affect the rest of the system. For instance, let's introduce an arbitrary failure in the response interpreter. We will modify the receive method to throw an exception when it is asked to interpret the response for misto, one of Martin Odersky's followers:

// ResponseInterpreter.scala
def receive = {
  case InterpretResponse("misto", r...

Custom supervisor strategies


The default strategy of restarting an actor on failure is not always what we want. In particular, for actors that carry a lot of data, we might want to resume processing after an exception rather than restarting the actor. Akka lets us customize this behavior by setting a supervisor strategy in the actor's supervisor.

Recall that all actors have parents, including the top-level actors, who are children of a special actor called the user guardian. By default, an actor's supervisor is his parent, and it is the supervisor who decides what happens to the actor on failure.

Thus, to change how an actor reacts to failure, you must set its parent's supervisor strategy. You do this by setting the supervisorStrategy attribute. The default strategy is equivalent to the following:

val supervisorStrategy = OneForOneStrategy() {
  case _:ActorInitializationException => Stop
  case _:ActorKilledException => Stop
  case _:DeathPactException => Stop
  case _:Exception ...

Life-cycle hooks


Akka lets us specify code that runs in response to specific events in an actor's life, through life-cycle hooks. Akka defines the following hooks:

  • preStart(): This runs after the actor's constructor has finished but before it starts processing messages. This is useful to run initialization code that depends on the actor being fully constructed.

  • postStop(): This runs when the actor dies after it has stopped processing messages. This is useful to run cleanup code before terminating the actor.

  • preRestart(reason: Throwable, message: Option[Any]): This is called just after an actor receives an order to restart. The preRestart method has access to the exception that was thrown and to the offending message, allowing for corrective action. The default behavior of preRestart is to stop each child and then call postStop.

  • postRestart(reason:Throwable): This is called after an actor has restarted. The default behavior is to call preStart().

Let's use system hooks to persist the state...

What we have not talked about


Akka is a very rich ecosystem, far too rich to do it justice in a single chapter. There are some important parts of the toolkit that you will need, but we have not covered them here. We will give brief descriptions, but you can refer to the Akka documentation for more details:

Summary


In this chapter, you learned how to weave actors together to tackle a difficult concurrent problem. More importantly, we saw how Akka's actor framework encourages us to think about concurrent problems in terms of many separate chunks of encapsulated mutable data, synchronized through message passing. Akka makes concurrent programming easier to reason about and more fun.

References


Derek Wyatt's book, Akka Concurrency, is a fantastic introduction to Akka. It should definitely be the first stop for anyone wanting to do serious Akka programming.

The LET IT CRASH blog (http://letitcrash.com) is the official Akka blog, and contains many examples of idioms and patterns to solve common issues.

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Scala for Data Science
Published in: Jan 2016Publisher: ISBN-13: 9781785281372
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Pascal Bugnion

Pascal Bugnion is a data engineer at the ASI, a consultancy offering bespoke data science services. Previously, he was the head of data engineering at SCL Elections. He holds a PhD in computational physics from Cambridge University. Besides Scala, Pascal is a keen Python developer. He has contributed to NumPy, matplotlib and IPython. He also maintains scikit-monaco, an open source library for Monte Carlo integration. He currently lives in London, UK.
Read more about Pascal Bugnion