Introduction to Akka

In this article written by Prasanna Kumar Sathyanarayanan and Suraj Atreya (the authors of this book, Reactive Programming with Scala and Akka), we will see what the Akka framework is all about in detail.

Akka is a framework to write distributed, asynchronous, concurrent, and scalable applications. Akka actors react to the messages that are sent to them. Actors can be viewed as a passive component unless an external message is triggered. They are a higher level abstraction and provide a neat way to handle concurrency instead of a traditional multithreaded application.

(For more resources related to this topic, see here.)

One of the examples of Akka actor is handling request response in a web server. A web server typically handles millions of requests per second. These requests must be handled concurrently to cater to the user requests. One way is to have a pool of threads and let these threads accept the requests and hand it off to the actual worker threads. In this case, the thread pool has to be managed by the application developer including error handling, thread locking, and synchronization. Most often, the application logic is intertwined with the business logic. In case of the web server, the thread pool has to be manually handled. Whereas, using actors, the thread pool is managed by the Akka engine and actors receive messages asynchronously. Each request can be thought of as a message to the actor, and the actor reacts to the message.

Actors are very light weight event-driven processes. Several million actors can exist within a GB of heap memory.

Actor mailbox

Every actor has a mailbox. Since actors communicate exclusively using messages, every actor maintains a queue of messages called mailbox. Therefore, an actor will read the messages in the order that it was sent.

Actor systems

An ActorSystem is a heavyweight process which is the first step before creating actors. During initialization, it allocates 1 to N threads per ActorSystem. Before creating an actor, an actor system must be created and this process involves the creation of a hierarchical structure of actors. Since an actor can create other actors, the handling of failure is also a vital part of the Akka engine, which handles it gracefully. This design helps to take action if an actor dies or has an unexpected exception for some reason.

When an actor system is first created, three actors are created as shown in the figure:

  • Root guardian (/): The root guardian is the grand parent of all the actors. This actor supervises all actors under the user actors. Since the root guardian is the supervisor of all the actors underneath it, there is no supervisor for the root guardian itself. Root guardian actor is not a real actor and it will terminate the actor system if it finds any throwables from its children.
  • User ( /user): The user guardian actor supervises all the actors that are created by the users. If the user guardian actor terminates, all its children will also terminate.
  • System guardian ( /system): This is a special actor that oversees the orderly shutdown of actors while logging remains active. This is achieved by having the system guardian watch the user guardian and initialize a shut-down sequence when the Terminated message is received.

Message passing

The figure at the end of this section shows the different steps that are involved when a message is passed to an actor. For example, let's assume there is a pizza website and a customer wants to order some pizzas. For simplicity, let's remove the non-essential details, such as billing and other information, and instead focus on just the order of a pizza. If a customer is some kind of an application (pizza customer) and the one who receives orders is a chef (pizza chef), then each request for a pizza can be illustrated as an asynchronous request to the chef.

The figure shows how when a message is passed to an actor, all the different components such as mailbox and dispatcher does its job.

Broadly these are explained in the following six steps when a message is passed to the actor:

  1. A PizzaCustomer creates and uses the ActorSystem. This is the first step before sending a message to an actor.
  2. The PizzaCustomer acquires a PizzaChef. In Akka, an actor is created using the actorOf(...) function call. Akka doesn't return the actual actor but instead returns a reference to the actor reference PizzaChef for safety. The PizzaRequest is sent to this PizzaChef.
  3. The PizzaChef sends this message to the Dispatcher.
  4. Dispatcher then enqueues the message into the PizzaChef's actor mailbox.
  5. Dispatcher then puts the mailbox on the thread.
  6. Finally, the mailbox dequeues and sends the message to the PizzaChef receive method.

Creating an actor system

An actor system is the first thing that should be created before creating actors. The actor system is created using the following API:

val system = ActorSystem("Pizza")

The string "Pizza" is just a name given to the actor system.

Creating an ActorRef

The following snippet shows the creation of an actor inside the previously created actor system:

val pizzaChef: ActorRef = system.actorOf(Props[PizzaChef])

An actor is created using the actorOf(...) function call. The actorOf() call doesn't return the actor itself, instead returns a reference to the actor.

Once an actor's reference is obtained, clients can send messages using the ActorRef. This is a safe way of communicating between actors since the state of the actor itself is not manipulated in any way.

Sending a PizzaRequest to the ActorRef

Now that we have an actor system and a reference to the actor, we would like to send requests to the pizzaChef actor reference. We send the message to an actor using the ! also called Tell.

Here in the code snippet, we Tell the message MarinaraRequest to the pizza ActorRef:

pizzaChef ! MarinaraRequest

The Tell is also called as fire-forget. There is no acknowledgement returned from a Tell.

When the message is sent to an actor, the actor's receive method will receive the message and processes it further. receive is a partial function and has the following signature:

def receive: PartialFunction[Any, Unit]

The return type receive suggests it is Unit and therefore this function is side effecting.

The following code is what we discussed:

import akka.actor.{Actor, ActorRef, Props, ActorSystem}

sealed trait PizzaRequest

case object MarinaraRequest extends PizzaRequest

case object MargheritaRequest extends PizzaRequest

class PizzaChef extends Actor {

  def receive = {

    case MarinaraRequest => println("I have a Marinara request!")

    case MargheritaRequest => println("I have a Margherita request!")

  }

}

object PizzaCustomer{

 

  def main(args: Array[String]) : Unit = {

    val system = ActorSystem("Pizza")

    val pizzaChef: ActorRef = system.actorOf(Props[PizzaChef])

    pizzaChef ! MarinaraRequest

    pizzaChef ! MargheritaRequest

  }

}

The preceding code shows the receive block that handles two kinds of requests; one for MarinaraRequest, and the other for MargheritaRequest. These two requests are defined as case objects.

Actor message

We saw when a message needs to be sent, ! (Tell) was used. But, we didn't discuss how exactly this message is processed. We will explore how the ideas of dispatcher and execution context are used to carry out the message passing techniques between actors.

In the pizza example, we used two kinds of messages: MarinaraRequest and MargheritaRequest. For simplicity, all that these messages did was to print on the console.

When PizzaCustomer sent PizzaRequest to PizzaChef ActorRef, the messages are sent to the dispatcher. The dispatcher then sends this message to the corresponding actor's mailbox.

Mailbox

Every time a new actor is created, a corresponding mailbox is also created. There are exceptions to this rule when multiple actors share a same mailbox. PizzaChef will have a mailbox. This mailbox stores the messages that appear asynchronously in a FIFO manner. Therefore, when a new message is sent to an actor, Akka guarantees that the messages are enqueued and dequeued in a FIFO manner.

Here is the signature of the mailbox from the Akka source:

It can be found at https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala

private[akka] abstract class Mailbox(val messageQueue: MessageQueue)

    extends ForkJoinTask[Unit]

      with SystemMessageQueue with Runnable

From this signature, we can see that Mailbox takes a MessageQueue as an input. Also, we can see that it extends Runnable and suggests that Mailbox is a thread. We will see why the Mailbox is a thread, in a bit.

Dispatcher

Dispatchers dispatch actors to threads. There is no one-to-one mapping between actors and threads. If that was the case, then the whole system would crumble under its own weight. Also, the amount of context switching would be much more than the actual work. Therefore, it is important to understand that creating a number of actors is not equal to creating the same number of threads.

The main objective of dispatcher is to coordinate between the actors and its messages to the underlying threads. A dispatcher picks the actor next in the queue based on the dispatcher policy and the actor's message in the queue. These two are then passed on to one of the available threads in the execution context.

To illustrate this point, let's see the code snippet:

protected[akka] override

  def registerForExecution(mbox: Mailbox, ...): Boolean = {

  ...

     if (mbox.setAsScheduled()) {

     try {

         executorService execute mbox

         true

      }

  }

This code snippet shows us that the dispatcher accepts a Mailbox as a parameter and has ExecutorService wrapped around to execute the mailbox.

We saw that Mailbox is a thread and the dispatcher executes this Mailbox against this ExecutorService.

When the mailbox's run method is triggered, it dequeues a message from Mailbox and passes it to the actor for processing.

This is the code snippet of run from Mailbox.scala from the Akka source code:

The source code can be found at https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala

 override final def run(): Unit = {

    try {

      if (!isClosed) { //Volatile read, needed here

        processAllSystemMessages() //First, deal with any system messages

        processMailbox() //Then deal with messages

      }

    } finally {

      setAsIdle() //Volatile write, needed here

      dispatcher.registerForExecution(this, false, false)

    }

  }

Actor Path

The interesting thing about actor system is that actors are created in a hierarchical manner. All the user created actors are created under the /user actor.

The actor path looks very similar to the UNIX file system hierarchy, for example, /home/akka/akka_book.

We will see how this is similar to the Akka path in the following code example.

Let's take our pizza example, and let's add a few toppings on the pizza. So, whenever a customer gives MarinaraRequest, he will get extra cheese too:

class PizzaToppings extends Actor{

  def receive = {

    case ExtraCheese => println("Aye! Extra cheese it is")

    case Jalapeno => println("More Jalapenos!")

  }

}

class PizzaSupervisor extends Actor {

  val pizzaToppings =

        context.actorOf(Props[PizzaToppings], "PizzaToppings")

  def receive = {

    case MarinaraRequest   =>

      println("I have a Marinara request with extra cheese!")

      println(pizzaToppings.path)

      pizzaToppings ! ExtraCheese

    case MargheritaRequest =>

      println("I have a Margherita request!")

    case PizzaException    =>

     throw new Exception("Pizza fried!")

  }

}

The PizzaSupervisor class is very similar to our earlier example of pizza actor. However, if you observe carefully, there is another actor created within this PizzaSupervisor called the PizzaToppings actor. This PizzaToppings is created using context.actorOf(...) instead of using system.actorOf(...). Therefore, PizzaToppings will become the child of PizzaSupervisor.

The actor path of PizzaSupervisor will look like this:

akka://Pizza/user/PizzaSupervisor

The actor path for PizzaToppings will look like this:

akka://Pizza/user/PizzaSupervisor/PizzaToppings

When this main program is run, the actor system is created using system.actorOf(...) and prints the path of the pizza actor system and its corresponding child as shown previously:

object TestActorPath {

  def main(args: Array[String]): Unit = {

    val system = ActorSystem("Pizza")

    val pizza: ActorRef =  system.actorOf(Props[PizzaSupervisor],

                   "PizzaSupervisor")

    println(pizza.path)

    pizza ! MarinaraRequest

    system.shutdown()

  }

}

The following is the output:
akka://Pizza/user/PizzaSupervisor
I have a Marinara request with extra cheese!
akka://Pizza/user/PizzaSupervisor/PizzaToppings
Aye! Extra cheese it is

The name akka in the actor path is fixed and the actors created are shown under the user. If you remember from earlier discussions, all the user-created actors are created under the user guardian. Therefore, the actor path shows that it is a user-created actor.

The name pizza is the name we gave to the actor system while it was being created. Therefore, the hierarchy explains that pizza is the actor system and all the actors are children below it. In the following figure, we can clearly see the actor hierarchy and its actor path:

Actor lifecycle

Akka actors have a life cycle that is very useful for writing a bug free concurrent code. Akka follows a philosophy of Let it crash and it is assumed that actors too can crash. But if an actor crashes, several actions can be taken including restarting it.

As usual let's look at our pizza baking process. As before, we will have an actor to accept the pizza requests. But, this time we will see the workflow of the pizza baking process! Using this example, we will see how the actor life cycle works:

class PizzaLifeCycle extends Actor with ActorLogging {

  override def preStart() = {

    log.info("Pizza request received!")

  }

  def receive = {

    case MarinaraRequest   => log.info("I have a Marinara request!")

    case MargheritaRequest => log.info("I have a Margherita request!")

    case PizzaException    => throw new Exception("Pizza fried!")

  }

 //Old actor instance

  override def preRestart(reason: Throwable, message: Option[Any]) = {

    log.info("Pizza baking restarted because " + reason.getMessage)

    postStop()

  }

  //New actor instance

  override def postRestart(reason: Throwable) = {

    log.info("New Pizza process started because earlier " + reason.getMessage)

    preStart()

  }

  override def postStop() = {

    log.info("Pizza request finished")

  }

}

The PizzaLifeCycle actor takes pizza requests but with additional states. An actor can go through many different states during its lifetime.

Let's send some messages to find out what happens with our PizzaLifeCycle actor and how it behaves:

    pizza ! MarinaraRequest

        pizza ! PizzaException

        pizza ! MargheritaRequest

Here is the output for the preceding requests sent:
Pizza request received!
I have a Marinara request!
Pizza fried!
java.lang.Exception: Pizza fried!
at PizzaLifeCycle$$anonfun$receive$1.applyOrElse(PizzaLifeCycle.scala:12)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at PizzaLifeCycle.aroundReceive(PizzaLifeCycle.scala:3)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
Pizza baking restarted because Pizza fried!
Pizza request finished
New Pizza process started because Pizza fried!
Pizza request received!
I have a Margherita request!

When we sent our first MarinaraRequest request, we see the following in the log we received:

Pizza request received!

I have a Marinara request!

Akka called the preStart() method and then entered the receive block.

Then, we simulated an exception by sending PizzaException and as expected, we got an exception:

Pizza fried!
java.lang.Exception: Pizza fried!
  at PizzaLifeCycle$$anonfun$receive$1.applyOrElse(PizzaLifeCycle.scala:12)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  at PizzaLifeCycle.aroundReceive(PizzaLifeCycle.scala:3)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
Pizza baking restarted because Pizza fried!
Pizza request finished

There are some interesting things to note here. Although we got an exception Pizza fried!, we also got two other log messages. The reason for this is quite simple. When we have an exception, Akka called preRestart(). During preRestart(), is called on the old instance of the actor and have a chance to clean-up some of the resources here. But in our example, we just called postStop(). During preRestart(), the old instance prepares to handoff to the new actor instance.

Finally, we sent another request called MargheritaRequest. Here, we get these log messages:

New Pizza process started because Pizza fried!
Pizza request received!
I have a Margherita request!

We saw that the old instance actor was stopped. Here, the requests are handled by a new actor instance. The postRestart()is now called on the new actor instance, which calls preStart() to resume normal operations of our pizza baking process.

During the preRestart() and postRestart() methods, we got a reason as to why the old actor died.

Summary

In this article, you learned about the details of the Akka framework, the actor mailbox, actor systems, how to create an actor system and ActorRef, how to send PizzaRequest to ActorRef, and so on.

Resources for Article:

 


Further resources on this subject:


You've been reading an excerpt of:

Reactive Programming with Scala and Akka

Explore Title
comments powered by Disqus