Dispatchers and Routers

Exclusive offer: get 50% off this eBook here
Akka Essentials

Akka Essentials — Save 50%

A practical, step-by-step guide to learn and build Akka's actor-based, distributed, concurrent, and scalable Java applications with this book and ebook.

$29.99    $15.00
by Munish K. Gupta | November 2012 | Java

Akka is a toolkit and runtime for building highly concurrent, distributed, and fault-tolerant event-driven applications on the JVM. Dispatchers are the heart of the Akka application. It is the engine that powers the Akka application. It is very important to understand the switches and knobs that need to be tuned to extract the maximum concurrency and scalability out of the application. Routers on the other hand, route incoming messages to outbound actors.

In this article by Munish K. Gupta, author of Akka Essentials, we will cover:

  • What a dispatcher is and how it works, various types of dispatchers and theirusage and configuration settings
  • Different types of mailboxes, and their usage and configuration
  • What a router is, and different types of routers and their usage
  • How to write a custom router

(For more resources related to this topic, see here.)

Dispatchers

In the real world, dispatchers are the communication coordinators that are responsible for receiving and passing messages. For the emergency services (for example, in U.S. – 911), the dispatchers are the people responsible for taking in the call, and passing on the message to the other departments (medical, police, fire station, or others). The dispatcher coordinates the route and activities of all these departments, to make sure that the right help reaches the destination as early as possible.

Another example is how the airport manages airplanes taking off. The air traffic controllers (ATCs) coordinate the use of the runway between the various planes taking off and landing. On one side, air traffic controllers manage the runways (usually ranging from 1 to 3), and on the other, aircrafts of different sizes and capacity from different airlines ready to take off and land. An air traffic controller coordinates the various airplanes, gets the airplanes lined up, and allocates the runways to take off and land:

As we can see, there are multiple runways available and multiple airlines, each having a different set of airplanes needing to take off. It is the responsibility of air traffic controller(s) to coordinate the take-off and landing of planes from each airline and do this activity as fast as possible.

Dispatcher as a pattern

Dispatcher is a well-recognized and used pattern in the Java world. Dispatchers are used to control the flow of execution. Based on the dispatching policy, dispatchers will route the incoming message or request to the business process. Dispatchers as a pattern provide the following advantages:

  • Centralized control: Dispatchers provide a central place from where various messages/requests are dispatched. The word "centralized" means code is re-used, leading to improved maintainability and reduced duplication of code.
  • Application partitioning: There is a clear separation between the business logic and display logic. There is no need to intermingle business logic with the display logic.
  • Reduced inter-dependencies: Separation of the display logic from the business logic means there are reduced inter-dependencies between the two. Reduced inter-dependencies mean less contention on the same resources, leading to a scalable model.

Dispatcher as a concept provides a centralized control mechanism that decouples different processing logic within the application, which in turn reduces inter-dependencies.

Executor in Java

In Akka, dispatchers are based on the Java Executor framework (part of java.util.concurrent).Executor provides the framework for the execution of asynchronous tasks. It is based on the producer–consumer model, meaning the act of task submission (producer) is decoupled from the act of task execution (consumer). The threads that submit tasks are different from the threads that execute the tasks.

Two important implementations of the Executor framework are as follows:

  • ThreadPoolExecutor: It executes each submitted task using thread from a predefined and configured thread pool.
  • ForkJoinPool: It uses the same thread pool model but supplemented with work stealing. Threads in the pool will find and execute tasks (work stealing) created by other active tasks or tasks allocated to other threads in the pool that are pending execution.

Fork/join is based a on fine-grained, parallel, divide-andconquer style, parallelism model. The idea is to break down large data chunks into smaller chunks and process them in parallel to take advantage of the underlying processor cores.

Executor is backed by constructs that allow you to define and control how the tasks are executed. Using these Executor constructor constructs, one can specify the following:

  • How many threads will be running? (thread pool size)
  • How are the tasks queued until they come up for processing?
  • How many tasks can be executed concurrently?
  • What happens in case the system overloads, when tasks to be rejected are selected?
  • What is the order of execution of tasks? (LIFO, FIFO, and so on)
  • Which pre- and post-task execution actions can be run?

In the book Java Concurrency in Practice, Addison-Wesley Publishing, the authors have described the Executor framework and its usage very nicely. It will be useful to read the book for more details on the concurrency constructs provided by Java language.

Dispatchers in Akka

In the Akka world, the dispatcher controls and coordinates the message dispatching to the actors mapped on the underlying threads. They make sure that the resources are optimized and messages are processed as fast as possible. Akka provides multiple dispatch policies that can be customized according to the underlying hardware resource (number of cores or memory available) and type of application workload.

If we take our example of the airport and map it to the Akka world, we can see that the runways are mapped to the underlying resources—threads. The airlines with their planes are analogous to the mailbox with the messages. The ATC tower employs a dispatch policy to make sure the runways are optimally utilized and the planes are spending minimum time on waiting for clearance to take off or land:

For Akka, the dispatchers, actors, mailbox, and threads look like the following diagram:

The dispatchers run on their threads; they dispatch the actors and messages from the attached mailbox and allocate on heap to the executor threads. The executor threads are configured and tuned to the underlying processor cores that available for processing the messages.

Akka Essentials A practical, step-by-step guide to learn and build Akka's actor-based, distributed, concurrent, and scalable Java applications with this book and ebook.
Published: October 2012
eBook Price: $29.99
Book Price: $49.99
See more
Select your format and quantity:

Types of dispatcher

In the case of Akka, the framework provides the following four types of dispatchers out of the box:

  • Dispatcher
  • Pinned dispatcher
  • Balancing dispatcher
  • Calling thread dispatcher

Similarly, there are four default mailbox implementations provided as follows:

  • Unbounded mailbox
  • Bounded mailbox
  • Unbounded priority mailbox
  • Bounded priority mailbox

Threads are the underlying resources and they are optimized based on the available CPU cores and the type of application workload. The number of threads is configured in conjunction with the dispatcher policy employed for the application.

Akka allows you to write your own dispatcher implementation or your own mailbox implementation.

Dispatcher

This is the default dispatcher used by the Akka application in case there is nothing defined. This is an event-based dispatcher that binds a set of actors to a thread pool backed up by a BlockingQueue method.

The following are the characteristics of the default dispatcher:

  • Every actor is backed by its own mailbox
  • The dispatcher can be shared with any number of actors
  • The dispatcher can be backed by either thread pool or fork join pool
  • The dispatcher is optimized for non-blocking code

Pinned dispatcher

This dispatcher provides a single, dedicated thread (pinned) for each actor. This dispatcher is useful when the actors are doing I/O operations or performing long-running calculations. The dispatcher will deallocate the thread attached to the actor after a configurable period of inactivity.

The following are the characteristics of the pinned dispatcher:

  • Every actor is backed by its own mailbox.
  • A dedicated thread for each actor implies that this dispatcher cannot be shared with any other actors.
  • The dispatcher is backed by the thread pool executor.
  • The dispatcher is optimized for blocking operations. For example, if the code is making I/O calls or database calls, then such actors will wait until the task is finished. For such blocking operation, the pinned dispatcher performs better than the default dispatcher.

Balancing dispatcher

The balancing dispatcher, as the name suggests is an event-based dispatcher that tries to redistribute work from busy actors and allocate it to idle ones. Redistribution of tasks can only work if all actors are of the same type (requirement). This task redistribution is similar to the work-stealing technique, as described in the fork join pool. The dispatcher looks for actors that are idle and dispatches the message(s) to them for processing.

The following are the characteristics of the balancing dispatcher:

  • There is only one mailbox for all actors
  • The dispatcher can be shared only with actors of the same type
  • The dispatcher can be backed by a either thread pool or fork join pool

Calling thread dispatcher

The calling thread dispatcher is primarily used for testing. This dispatcher runs the task execution on the current thread only. It does not create any new threads and provides a deterministic execution order. The dispatch strategy is to run on the current thread, unless the target actor is either suspended or already running on the current thread. If the invocation is not run immediately, the task is queued in a thread-local queue to be executed once the active invocation(s) further up in the call stack are finished. If you make use of blocking code with calling thread dispatcher, then the blocking code will hold the thread for processing, leaving other messages in the queue for a long time.

The following are the characteristics of the calling thread dispatcher:

  • Every actor is backed by its own mailbox
  • The dispatcher can be shared with any number of actors
  • The dispatcher is backed by the calling thread

Types of mailboxes

Mailboxes are backed by queue implementation from the Java concurrent package. The queues are characterized by two factors as follows:

  • Blocking queue: Blocking queue means a queue that waits for space to become available before putting in an element and similarly waits for the queue to become non-empty before retrieving an element
  • Bounded queue: Bounded queue means a queue that limits the size of the queue; meaning you cannot add more elements than the specified size

In Akka, the following queue implementations are based on the blocking/bounded factors available:

 

Types

Implementation

Blocking

Bounded

Unbounded

mailbox

java.util.concurrent.ConcurrentLinkedQueue

No

No

Bounded

mailbox

java.util.concurrent.LinkedBlockingQueue

Yes

Yes

Unbounded

priority

mailbox

java.util.concurrent.PriorityBlockingQueue

Yes

No

Bounded

priority

mailbox

java.util.concurrent.PriorityBlockingQueue

wrapped in akka.util.BoundedBlockingQueue

Yes

Yes

 

You can choose between the unbounded and bounded mailbox via the confiuration. In the case of a priority mailbox (unbounded or bounded), a simple implementation needs to be provided to for use by the dispatcher.

Dispatcher usage

The Executor contexts supported by Akka are as follows:

  • Thread pool executor: Here, the idea is to create a pool of worker threads. Tasks are assigned to the pool using a queue. If the number of tasks exceeds the number of threads, then the tasks are queued up until a thread in the pool is available. Worker threads minimize the overhead of allocation/ deallocation of threads.
  • Fork join executor: This is based on the premise of divide-and-conquer. The idea is to divide a large task into smaller tasks whose solution can then be combined for the final answer. The tasks need to be independent to be able run in parallel.

For each of these execution contexts, Akka allows us to specify the configuration parameters that will define and construct the underlying resources. The parameters define the following:

  • Minimum number of threads that will be allocated
  • Maximum number of threads that will be allocated
  • Multiplier factor to be used (based on number of CPU cores available)

For example, if the minimum number is defined as 3 and the multiplier factor is 2, then the dispatcher starts with a minimum of 3 x 2 = 6 threads. The maximum number defines the upper limit on the number of threads. If the maximum number is 8, then the maximum number of threads will be 8 x 2 = 16 threads.

Thread pool executor

The following are the key parameters that need to be configured for the thread pool executor:

# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads
core-pool-size-min = 2
# available processors * factor
core-pool-size-factor = 2.0
# maximum number of threads
core-pool-size-max = 10
}

Fork join executor

The following are the key parameters that need to be configured for the fork join executor:

# Configuration for the fork join pool
fork-join-executor {
# Min number of threads
parallelism-min = 2
# available processors * factor
parallelism-factor = 2.0
# Max number of threads
parallelism-max = 10
}

Depending on the type of dispatcher being used and support provided for the executor, the preceding configuration parameters can be used in conjunction with other settings.

To define the dispatcher for a set of actors, the following are the important configuration parameters:

 

Parameter name

Description

Potential values

type

Identifies the name of the

event-type dispatcher being used

Dispatcher or PinnedDispatcher

or BalancingDispatcher

or FQCN of a class extending

MessageDispatcherConfigurator

executor

Decides what kind of Executor

service to use

fork-join-executor or

thread-pool-executor

or FQCN of a class extending

ExecutorServiceConfigurator

fork-joinexecutor

Section for defining the

fork-join-executor

parameters as defined above

 

thread-poolexecutor

Section for defining the

thread-pool-executor

parameters as defined previously

 

throughput

Identifies the maximum number

of messages to be processed per

actor before the thread jumps to

the next actor

One (to be fair for everyone)

mailboxcapacity

(optional)

Specifies the mailbox capacity to

be used for the actor queue

Negative (or zero) implies usage of an

unbounded mailbox (default).

 

A positive number implies bounded

mailbox and with the specified size.

mailbox-type

(optional)

Specifies the mailbox type to be

used

Bounded or unbounded mailbox used

if nothing is specified (dependent on

mailbox capacity) or FQCN of the

mailbox implementation (for example,

priority mailbox implementations if

defined, need to be specified here)

 

The sample Dispatcher parameter's definition in application.conf looks something like the following code snippet:

my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""

}

Or another example can be the PinnedDispatcher parameter's definition along with the thread-pool-executor parameter as follows:

my-dispatcher {
type = PinnedDispatcher

executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""

}

We define the dispatcher identifier that will be used to inform the underlying actors about the dispatcher policy to be used. Next we define the type of the dispatcher being used, executor policy being used, the parameters required for the executor policy, and the throughput parameter. In case we are using a bounded mailbox, we will define the mailbox size. When using custom-defined mailboxes, we will specify the FQCN of the mailbox class.

The following are the key parameters for your application performance:

  • Choice of dispatcher: Based on the type of activity being performed by your actor, the right dispatcher needs to be selected. Look at parameters such as blocking versus non-blocking operations, homogeneity versus heterogeneity of actors, to determine the right choice of dispatcher.
  • Choice of executor: Choosing between thread pool or fork join depends upon the characteristics of your application logic. The choice of executor comes into play in the case of default dispatcher and balancing dispatcher. For most cases, fork join is excellent when large numbers of tasks can be forked (started).
  • Number of threads (min/max) factored to the CPU cores: The number of min/max threads that are deployed for the dispatcher and mapped to the underlying cores determine the processing power of the application. Define it too high and threads might end up doing a lot of context switching; define too low and the processing power is not fully optimized.
  • Throughput factor: This determines the number of messages that are processed by one actor as a batch or in one go. For example, if the throughput is 50, then the actor will process 50 messages (if available in the queue) before returning the thread to the pool. On the flip side, other actors will wait for the thread (using the same dispatcher) to be available before they begin to process their set of messages. The optimal value depends upon the processing time taken by your message.

Once the dispatcher configuration has been defined in application.conf, the application needs to specify which dispatcher policy is used for which actors. Using the combination of the right type of dispatcher and supported executor, the various combinations of the dispatcher policy are realized.Remember, you can define a different dispatcher policy for a different set of actors depending upon the functionality of the actors.

Java:

ActorSystem _system = ActorSystem.create("dispatcher",
ConfigFactory.load().getConfig("MyDispatcherExample"));
ActorRef actor = _system.actorOf(new Props(MsgEchoActor.class)
.withDispatcher("my-dispatcher"));

Scala:


val _system = ActorSystem("dispatcher", ConfigFactory.load().

getConfig ("MyDispatcherExample"))

val actor = _system.actorOf(Props[MsgEchoActor].
withDispatcher ("my-dispatcher"))

When defining the actor, the Props class provides the withDispatcher() method, which is invoked by passing the string name of the dispatcher configuration defined in application.conf.

You can define multiple instances of an actor of the same type and pass the dispatcher policy. The dispatcher is mostly used in conjunction with the router functionality provided by Akka.

Akka Essentials A practical, step-by-step guide to learn and build Akka's actor-based, distributed, concurrent, and scalable Java applications with this book and ebook.
Published: October 2012
eBook Price: $29.99
Book Price: $49.99
See more
Select your format and quantity:

Routers

When a large number of actors are working in parallel to process the incoming stream of messages, there is need of an entity that directs the message from the source to the destination actor. This entity is called the router.

In Akka, a router is also a type of actor, which routes the incoming messages to the outbound actors. For the router, the outbound actors are also called routees. The router employs a different set of algorithms to route the messages to the routee actors:

In order to avoid the single point of bottleneck, the router actors are of a special type—RouterActorRef. RouterActorRef does not make use of the store-and-forward mechanism to route the messages to it routees. Instead, routers dispatch the incoming messages directly to the routee's mailboxes and avoid the router's mailbox.

By default, the Akka router supports the following router mechanisms:

  • Round robin router: It routes the incoming messages in a circular order to all its routees
  • Random router: It randomly selects a routee and routes the message to the same
  • Smallest mailbox router: It identifies the actor with the least number of messages in its mailbox and routes the message to the same
  • Broadcast router: It forwards the same message to all the routees
  • Scatter gather first completed router: It forwards the message to all its routees as a future, then whichever routee actor responds back, it takes the results and sends them back to the caller

Router usage

In order to create the router and set the number of routee actors, we need to have the following information—the router mechanism to be used and the number of instances of routee actors.

Java:

ActorRef router = system.actorOf(new Props(MyActor.class).
withRouter(new RoundRobinRouter(nrOfInstances)),"myRouterActor");

Scala:

val router = system.actorOf(Props[MyActor].withRouter
(RoundRobinRouter
(nrOfInstances = 5)) , name = "myRouterActor")

Here, when defining the actor, we pass the router instance, which in this case is RoundRobinRouter, whose constructor takes in the number of instances (nrOfInstances) that need to be created for the routees.

When we defined the router actor, we provided a name—myRouterActor. As one actor can only have one given name within the parent context, the router actor becomes the parent (head) and the routees are the child actors spawned by the parent. The parent actor is now the supervisor for the routees and manages the lifecycle—creation, restarting, and termination—of the actors.

All of the router types are used in a similar manner; we pass the router object along with the necessary constructor parameters.

We will examine router-type definitions and their usage as follows:

 

Router type

Usage

 

RoundRobinRouter

Java:

ActorRef router = system.actorOf(new Props

(MyActor.class).withRouter(new RoundRobinRouter

(nrOfInstances)));

 

Scala:

val router = system.actorOf(Props[MyActor].

withRouter(RoundRobinRouter(nrOfInstances = 5)))

RandomRouter

Java:

ActorRef router = system.actorOf(new

Props(MyActor.class).withRouter(new

RandomRouter(nrOfInstances)));

 

Scala:

val router = system.actorOf(Props[MyActor].

withRouter(RandomRouter(nrOfInstances = 5)))

SmallestMailbox Router

BroadcastRouter

Java:

ActorRef router = system.actorOf(new Props(MyActor.

class).withRouter(new SmallestMailboxRouter

(nrOfInstances)));

 

Scala:

val router = system.actorOf(Props[MyActor].

withRouter(SmallestMailboxRouter

(nrOfInstances = 5)))

BroadcastRouter

Java:

ActorRef router = system.actorOf(new Props

(MyActor.class).withRouter(new BroadcastRouter

(nrOfInstances)));

 

Scala:

val router = system.actorOf(Props[MyActor].

withRouter(BroadcastRouter (nrOfInstances = 5)))

ScatterGatherFirst

CompletedRouter

Java:

ActorRef router = system.actorOf(new Props(MyActor.

class).withRouter(new ScatterGatherFirstCompletedRo

uter(nrOfInstances, Duration.parse("5 seconds"))));

 

Scala:

val router = system.actorOf(Props[MyActor].

withRouter(ScatterGatherFirstCompletedRouter

(nrOfInstances = 5, within = 5 seconds)))

 

In this case, in addition to the number of instances, we also pass the

future duration timeout period.

 

Router usage via application.conf

The router for the actor can also be described using the configuration file—application.conf. In application.conf, we describe the router configuration as follows:

MyRouterExample{
akka.actor.deployment {
/myRandomRouterActor {
router = random
nr-of-instances = 5
}
}
}

Next, in the code, we load the application.conf file using ActorSystem.

Java:

ActorSystem _system = ActorSystem.create("RandomRouterExample",
ConfigFactory.load().getConfig("MyRouterExample"));

ActorRef randomRouter = _system.actorOf(
new Props(MsgEchoActor.class).withRouter(new FromConfig()),
"myRandomRouterActor");

Scala:

val _system = ActorSystem.create("RandomRouterExample",
ConfigFactory.load()
.getConfig("MyRouterExample"))

val randomRouter = _system.actorOf(Props[MsgEchoActor].
withRouter(FromConfig()), name = "myRandomRouterActor")

When defining the router to be used, we pass on the FromConfig() parameter and the name of the router actor—myRandomRouterActor, which is used to read the config file settings for the router information.

Router usage for distributed actors

It is possible that we may want to make use of distributed actors and route the incoming message to them. In this case, each actor has a different address. For handling such cases, we first need to create the different Address objects with the remote node details and add them to an array. Subsequently the array of the addresses is passed as a parameter to the router.

Java:

Address addr1 = new Address("akka", "remotesys", "host1", 1234);
Address addr2 = new Address("akka", "remotesys", "host2", 1234);

Address[] addresses = new Address[] { addr1, addr2 };

ActorRef routerRemote = system.actorOf(new Props(MyEchoActor.class)
.withRouter(new RemoteRouterConfig(new RoundRobinRouter(5),
addresses)));

Scala:

val addresses = Seq( Address("akka", "remotesys", "host1",
1234),Address("akka", "remotesys", "host2", 1234))

val routerRemote = system.actorOf(Props[MyEchoActor].withRouter(
RemoteRouterConfig(RoundRobinRouter(5), addresses)))

The remote nodes' addresses can also be read via application.conf. So, in addition to defining the router configuration, we can also define the target nodes' address for each of the participating remote nodes:

akka.actor.deployment {
/myRandomRouterActor {
router = round-robin
nr-of-instances = 5
target {
nodes = ["akka://app@192.168.0.5:2552",
"akka://app@192.168.0.6:2552"]
}
}
}

Dynamically resizing routers

To handle the variability of the incoming message traffic, it might be important to increase the number of actors available to handle the load at runtime. For this, routers provide a construct called resize, which allows us to define the range bound in terms of minimum and maximum instances.

Java:

int lowerBound = 2;
int upperBound = 15;
DefaultResizer resizer = new DefaultResizer(lowerBound,
upperBound);

ActorRef randomRouter = _system.actorOf(new Props(MsgEchoActor.class).
withRouter(new RandomRouter(resizer)));

Scala:

val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)

val randomRouter = system.actorOf(Props[MsgEchoActor].withRouter(
RandomRouter (resizer = Some(resizer))))

The range can also be specified in application.conf as follows:

akka.actor.deployment {
/myRandomRouterActor {
router = round-robin
nr-of-instances = 5
resizer {
lower-bound = 2
upper-bound = 15
}
}
}

Custom router

In case the default router types are not sufficient, Akka also allows you to write your own custom router. Akka provides the RouterConfig interface, which can be used to write your own router. Akka also provides the ScatterGatherFirstCompletedLike interface , which can be used to implement your own implementation of the scatter gather first completed router model. In our custom router, we will make use of RouterConfig.

Let's go ahead and create a custom router. We will create a bursty message router—meaning the router will route a predefined number of messages to one actor before moving to the next one.

If we define 10 as the message burst size and if there are five instances of actors running, then 1 to 10 messages will go to actor 1, 11 to 20 messages will go to actor 2, and so on:

We define the BurstyMessageRouter class and create a constructor that takes in two parameters. First is the number of instances that needs to be created for the actor and second is the messageBurst rate, which identifies the number of messages that need to be passed on to one actor before moving to next.

Java:

public class BurstyMessageRouter extends CustomRouterConfig {
int noOfInstances;
int messageBurst;
public BurstyMessageRouter(int inNoOfInstances, int
inMessageBurst) {
noOfInstances = inNoOfInstances;
messageBurst = inMessageBurst;
}
}

Scala:

class BurstyMessageRouter(noOfInstances: Int, messageBurst: Int)
extends RouterConfig {
}

Next, we define the dispatcher and supervisor policy for the router. In our case we are using the default policies only.

Java:

public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.defaultStrategy();
}

Scala:

def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.
defaultStrategy

Next is the key piece of the router, which is defining the routing mechanism of our bursty message router.

Java:

public CustomRoute createCustomRoute(Props props,
RouteeProvider routeeProvider) {
//create the routee actors and register with routeeprovider
//return CustomRoute()
}

Scala:

def createRoute(props: Props, routeeProvider: RouteeProvider):
Route = {
}

In this, we override the createCustomRoute() method. There are two distinct parts of this method. First, we need to create the number of instances passed for the routee actors. We create the list of routee actors and register it with routeeProvider.

Java:


// create the arraylist for holding the actors
final List routees = new ArrayList(noOfInstances);
for (int i = 0; i < noOfInstances; i++) {
// initialize the actors and add to the arraylist
routees.add(routeeProvider.context().actorOf(props));
}
// register the list
routeeProvider.registerRoutees(routees);

Scala:

def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
{
routeeProvider.createAndRegisterRoutees(props,
noOfInstances, Nil)
}

Next, we return the CustomRoute() method, which needs to be used for routing the messages to the routee actors.

Java:

return new CustomRoute() {
public Iterable
destinationsFor(ActorRef sender,
Object message) {
//logic for routing goes here
}
};

Scala:

{
case (sender, message) =>
List(Destination(sender, actor))
}

Here we created the CustomRoute class and implemented the destinationsFor() method, which returns the Iterable <Destination> object with the destination logic implemented.

Here is the complete code for BurstyMessageRouter, for your reference.

Java:

public class BurstyMessageRouter extends CustomRouterConfig {
int noOfInstances;
int messageBurst;
public BurstyMessageRouter(int inNoOfInstances,
int inMessageBurst) {
noOfInstances = inNoOfInstances;
messageBurst = inMessageBurst;
}
public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.defaultStrategy();
}
@Override
public CustomRoute createCustomRoute(Props props,
RouteeProvider routeeProvider) {
// create the arraylist for holding the actors
final List routees =
new ArrayList(noOfInstances);
for (int i = 0; i < noOfInstances; i++) {
// initialize the actors and add to the arraylist
routees.add(routeeProvider.context().actorOf(props));
}
// register the list
routeeProvider.registerRoutees(routees);
return new CustomRoute() {
int messageCount = 0;
int actorSeq = 0;
public Iterable
destinationsFor(ActorRef sender,
Object message) {
ActorRef actor = routees.get(actorSeq);
List destinationList = Arrays
.asList(new Destination[]
{ new Destination(sender,
actor) });
//increment message count
messageCount++;
//check message count
if (messageCount == messageBurst) {
actorSeq++;
//reset the counter
messageCount = 0;
//reset actorseq counter
if (actorSeq == noOfInstances) {
actorSeq = 0;
}
}
return destinationList;
}
};
}
}

Scala:

class BurstyMessageRouter(noOfInstances: Int, messageBurst: Int)
extends RouterConfig {
var messageCount = 0
var actorSeq = 0
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy =
SupervisorStrategy.defaultStrategy
def createRoute(props: Props, routeeProvider:
RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props,
noOfInstances, Nil)
{
case (sender, message) =>
var actor = routeeProvider.routees(actorSeq)
//increment message count
messageCount += 1
//check message count
if (messageCount == messageBurst) {
actorSeq += 1
//reset the counter
messageCount = 0
//reset actorseq counter
if (actorSeq == noOfInstances) {
actorSeq = 0
}
}
List(Destination(sender, actor))
}
}
}

That's it. We have written our own custom router. We can invoke the custom router as any other router call.

Java:

ActorSystem _system = ActorSystem.create("CustomRouterExample");
ActorRef burstyPacketRouter = _system.actorOf(new Props(
MsgEchoActor.class).withRouter(new BurstyMessageRouter(5,2)));

Scala:

val _system = ActorSystem.create("CustomRouterExample")

val burstyMessageRouter = _system.actorOf(Props[MsgEchoActor].
withRouter(new BurstyMessageRouter(5,2)))

The message burst size can be configured along with the dispatcher throughput setting to get the optimal throughput.

This brings us to the completion of the router usage within an Akka application.

Summary

We saw the role played by dispatchers and how they can be chosen and configured based on the type of application. Dispatcher's tuning will have maximum impact on the application throughput, so make sure you tune your engine to extract the maximum power. On the other hand, routers allow us to load-balance the incoming message traffic and distribute the same to the routee actors. Application scalability is achieved by using the appropriate router type. Together, dispatchers and routers are responsible for achieving the maximum throughput and scalability of the application.

Resources for Article :

 


Further resources on this subject:


About the Author :


Munish K. Gupta

Munish K. Gupta is a Senior Architect working for Wipro Technologies. Based in Bangalore, India, his day-to-day work involves solution architecture for applications with stringent non-functional requirements (NFRs), Application Performance Engineering, and exploring the readiness of cutting-edge, open source technologies for enterprise adoption.

He advises enterprise customers to help them solve performance and scalability issues, and implement innovative differentiating solutions to achieve business and technology goals. He believes that technology is meant to enable the business, and technology by itself is not a means to an end.

He is very passionate about software programming and craftsmanship. He is always looking for patterns in solving problems, writing code, and making optimum use of tools and frameworks. He blogs about technology trends and Application Performance Engineering at http://www.techspot.co.in and about Akka at http://www.akkaessentials.in

Books From Packt


Google App Engine Java and GWT Application Development
Google App Engine Java and GWT Application Development

Java 7 JAX-WS Web Services
Java 7 JAX-WS Web Services

Service   Oriented Java Business Integration
Service Oriented Java Business Integration

Ext GWT 2.0: Beginner's Guide
Ext GWT 2.0: Beginner's Guide

Apache   MyFaces 1.2 Web Application Development
Apache MyFaces 1.2 Web Application Development

Java EE 6 Cookbook for   Securing, Tuning, and Extending Enterprise Applications
Java EE 6 Cookbook for Securing, Tuning, and Extending Enterprise Applications

Spring Web Flow 2 Web Development
Spring Web Flow 2 Web Development

Apache Maven 3 Cookbook
Apache Maven 3 Cookbook


No votes yet

Post new comment

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.
a
2
L
E
i
t
Enter the code without spaces and pay attention to upper/lower case.
Code Download and Errata
Packt Anytime, Anywhere
Register Books
Print Upgrades
eBook Downloads
Video Support
Contact Us
Awards Voting Nominations Previous Winners
Judges Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software
Resources
Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software