Cluster Computing Using Scala

In this article by Vytautas Jančauskas the author of the book Scientific Computing with Scala, explains the way of writing software to be run on distributed computing clusters. We will learn the MPJ Express library here.

(For more resources related to this topic, see here.)

Very often when dealing with intense data processing tasks and simulations of physical phenomena, there comes a time when no matter how many CPU cores and memory your workstation has, it is not enough. At times like these, you will want to turn to supercomputing clusters for help. These distributed computing environments consist of many nodes (each node being a separate computer) connected into a computer network using specialized high bandwidth and low latency connections (or if you are on a budget standard Ethernet hardware is often enough).

These computers usually utilize a network filesystem allowing each node to see the same files. They communicate using messaging libraries, such as MPI. Your program will run on separate computers and utilize the message passing framework to exchange data via the computer network.

Using MPJ Express for distributed computing

MPJ Express is a message passing library for distributed computing. It works in programming languages using Java Virtual Machine (JVM). So, we can use it from Scala. It is similar in functionality and programming interface to MPI. If you know MPI, you will be able to use MPJ Express pretty much the same way. The differences specific to Scala are explained in this section. We will start with how to install it. For further reference, visit the MPJ Express website given here:

http://mpj-express.org/

Setting up and running MPJ Express

The steps to set up and run MPJ Express are as follows:

  1. First, download MPJ Express from the following link. The version at the time of this writing is 0.44.
    http://mpj-express.org/download.php
  1. Unpack the archive and refer to the included README file for installation instructions. Currently, you have to set MPJ_HOME to the folder you unpacked the archive to and add the bin folder in that archive to your path. For example, if you are a Linux user using bash as your shell, you can add the following two lines to your .bashrc file (the file is in your home directory at /home/yourusername/.bashrc):
    export MPJ_HOME=/home/yourusername/mpj
    export PATH=$MPJ_HOME/bin:$PATH
  1. Here, mpj is the folder you extracted the archive you downloaded from the MPJ Express website to. If you are using a different system, you will have to do the equivalent of the above for your system to use MPJ Express. We will want to use MPJ Express with Scala Build Tool (SBT), which we used previously to build and run all of our programs. Create the following directory structure:
    scalacluster/
       lib/
    
       project/
    
           plugins.sbt
    
       build.sbt
  1. I have chosen to name the project folder asscalacluster here, but you can call it whatever you want. The .jar files in the lib folder will be accessible to your program now. Copy the contents of the lib folder from the mpj directory to this folder. Finally, create an empty build.sbt and plugins.sbt files. Let’s now write and run a simple "Hello, World!" program to test our setup:
    import mpi._
    
    object MPJTest {
    def main(args: Array[String]) {
       MPI.Init(args)
       val me: Int = MPI.COMM_WORLD.Rank
       val size: Int = MPI.COMM_WORLD.Size
       println("Hello, World, I'm <" + me + ">")
       MPI.Finalize()
    }
    }

    This should be familiar to everyone who has ever used MPI.

    First, we import everything from the mpj package. Then, we initialize MPJ Express by calling MPI.Initialize, the arguments to MPJ Express will be passed from the command-line arguments you will enter when running the program.

    The MPI.COMM_WORLD.Rank() function returns the MPJ processes rank. A rank is a unique identifier used to distinguish processes from one another. They are used when you want different processes to do different things. A common pattern is to use the process with rank 0 as the master process and the processes with other ranks as workers.

    Then, you can use the processes rank to decide what action to take in the program. We also determine how many MPJ processes were launched by checking MPI.COMM_WORLD.Size. Our program will simply print a processes rank for now. We will want to run it.

    If you don't have a distributed computing cluster readily available, don't worry. You can test your programs locally on your desktop or laptop. The same program will work without changes on clusters as well.

    To run programs written using MPJ Express, you have to use the mpjrun.sh script. This script will be available to you if you have added the bin folder of the MPJ Express archive to your PATH as described in the section on installing MPJ Express. The mpjrun.sh script will setup the environment for your MPJ Express processes and start said processes.

    The mpjrun.sh script takes a .jar file, so we need to create one. Unfortunately for us, this cannot easily be done using the sbt package command in the directory containing our program. This worked previously, because we used Scala runtime to execute our programs. MPJ Express uses Java.

    The problem is that the .jar package created with sbt package does not include Scala's standard library. We need what is called a fat .jar—one that contains all the dependencies within itself. One way of generating it is to use a plugin for SBT called sbt-assembly. The website for this plugin is given here:

    https://github.com/sbt/sbt-assembly

  1. There is a simple way of adding the plugin for use in our project. Remember that project/plugins.sbt file we created? All you need to do is add the following line to it (the line may be different for different versions of the plugin. Consult the website):
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
  1. Now, add the following to the build.sbt file you created:
    lazy val root = (project in file(".")).
    settings(
       name := "mpjtest",
       version := "1.0",
       scalaVersion := "2.11.7"
    )
  1. Then, execute the sbt assembly command from the shell to build the .jar file. The file will be put under the following directory if you are using the preceding build.sbt file. That is, if the folder you put the program and build.sbt in is /home/you/cluster:
    /home/you/cluster/target/scala-2.11/mpjtest-assembly-
    1.0.jar
  1. Now, you can run the mpjtest-assembly-1.0.jar file as follows:
    $ mpjrun.sh -np 4 -jar target/scala-2.11/mpjtest-assembly-1.0.jar
    MPJ Express (0.44) is started in the multicore configuration
    Hello, World, I'm <0>
    Hello, World, I'm <2>
    Hello, World, I'm <3>
    Hello, World, I'm <1>

    Argument -np specifies how many processes to run. Since we specified -np 4, four processes will be started by the script. The order of the "Hello, World" messages can differ on your system since the precise order of execution of different processes is undetermined. If you got the output similar to the one shown here, then congratulations, you have done the majority of the work needed to write and deploy applications using MPJ Express.

Using Send and Recv

MPJ Express processes can communicate using Send and Recv. These methods constitute arguably the simplest and easiest to understand mode of operation that is also probably the most error prone. We will look at these two first. The following are the signatures for the Send and Recv methods:

public void Send(java.lang.Object buf, int offset, int count,
Datatype datatype, int dest, int tag) throws MPIException

public Status Recv(java.lang.Object buf, int offset, int count,
Datatype datatype, int source, int tag) throws MPIException

Both of these calls are blocking. This means that after calling Send, your process will block (will not execute the instructions following it) until a corresponding Recv is called by another process. Also Recv will block the process, until a corresponding Send happens. By corresponding, we mean that the dest and source arguments of the calls have the values corresponding to receivers and senders ranks, respectively.

The two calls will be enough to implement many complicated communication patterns. However, they are prone to various problems such as deadlocks. Also, they are quite difficult to debug, since you have to make sure that each Send has the correct corresponding Recv and vice versa.

The parameters for Send and Recv are basically the same. The meanings of those parameters are summarized in the following table:

Argument

Type

Description

Buf

java.lang.Object

It has to be a one-dimensional Java array. When using from Scala, use the Scala array, which is a one-to-one mapping to a Java array.

offset

int

The start of the data you want to pass from the start of the array.

Count

int

This shows the number items of the array you want to pass.

datatype

Datatype

The type of data in the array. Can be one of the following: MPI.BYTE, MPI.CHAR, MPI.SHORT, MPI.BOOLEAN, MPI.INT, MPI.LONG, MPI.FLOAT, MPI.DOUBLE, MPI.OBJECT, MPI.LB, MPI.UB, and MPI.PACKED.

dest/source

int

Either the destination to send the message to or the source to get the message from. You use the rank of the process to identify sources and destinations.

tag

int

Used to tag the message. Can be used to introduce different message types. Can be ignored for most common applications.

Let’s look at a simple program using these calls for communication. We will implement a simple master/worker communication pattern:

import mpi._
import scala.util.Random

object MPJTest {
def main(args: Array[String]) {
   MPI.Init(args)
   val me: Int = MPI.COMM_WORLD.Rank()
   val size: Int = MPI.COMM_WORLD.Size()
   if (me == 0) {

Here, we use an if statement to identify who we are based on our rank. Since each process gets a unique rank, this allows us to determine what action should be taken. In our case, we assigned the role of the master to the process with rank 0 and the role of a worker to processes with other ranks:

     for (i <- 1 until size) {
       val buf = Array(Random.nextInt(100))
       MPI.COMM_WORLD.Send(buf, 0, 1, MPI.INT, i, 0)
       println("MASTER: Dear <" + i + "> please do work on " +
               buf(0))
     }

We iterate over workers, who have the ranks from 1 to whatever is the argument for number of processes you passed to the mpjrun.sh script. Let’s say that number is four. This gives us one master process and three worker processes. So, each process with a rank from 1 to 3 will get a randomly generated number. We have to put that number in an array even though it is a single number. This is because both Send and Recv methods expect an array as their first argument. We then use the Send method to send the data. We specified the array as argument buf, offset of 0, size of 1, type MPI.INT, destination as the for loop index, and tag as 0. This means that each of our three worker processes will receive a (most probably) different number:

     for (i <- 1 until size) {
       val buf = Array(0)
       MPI.COMM_WORLD.Recv(buf, 0, 1, MPI.INT, i, 0)
        println("MASTER: Dear <" + i + "> thanks for the reply,
               which was " + buf(0))
     }

Finally, we collect the results from the workers. For this, we iterate over the worker ranks and use the Recv method on each one of them. We print the result we got from the worker, and this concludes the master's part. We now move on to the workers:

   } else {
     val buf = Array(0)
     MPI.COMM_WORLD.Recv(buf, 0, 1, MPI.INT, 0, 0)
     println("<" + me + ">: " + "Understood, doing work on " +
             buf(0))
     buf(0) = buf(0) * buf(0)
     MPI.COMM_WORLD.Send(buf, 0, 1, MPI.INT, 0, 0)
     println("<" + me + ">: " + "Reporting back")
   }

The workers code is identical for all of them. They receive a message from the master, calculate the square of it, and send it back:

   MPI.Finalize()
}
}

After you run the program, the results should be akin to the following, which I got when running this program on my system:

MASTER: Dear <1> please do work on 71
MASTER: Dear <2> please do work on 12
MASTER: Dear <3> please do work on 55
<1>: Understood, doing work on 71
<1>: Reported back
MASTER: Dear <1> thanks for the reply, which was 5041
<3>: Understood, doing work on 55
<2>: Understood, doing work on 12
<2>: Reported back
MASTER: Dear <2> thanks for the reply, which was 144
MASTER: Dear <3> thanks for the reply, which was 3025
<3>: Reported back

Sending Scala objects in MPJ Express messages

Sometimes, the types provided by MPJ Express for use in the Send and Recv methods are not enough. You may want to send your MPJ Express processes a Scala object. A very realistic example of this would be to send an instance of a Scala case class.

These can be used to construct more complicated data types consisting of several different basic types. A simple example is a two-dimensional vector consisting of x and y coordinates. This can be sent as a simple array, but more complicated classes can't. For example, you may want to use a case class as the one shown here. It has two attributes of type String and one attribute of type Int. So what do we do with a data type like this? The simplest answer to that problem is to serialize it.

Serializing converts an object to a stream of characters or a string that can be sent over the network (or stored to a file or done other things with) and later on deserialized to get the original object back:

scala> case class Person(name: String, surname: String, age: Int)
defined class Person

scala> val a = Person("Name", "Surname", 25)
a: Person = Person(Name,Surname,25)

A simple way of serializing is to use a format such as XML or JSON. This can be done automatically using a pickling library. Pickling is a term that comes from the Python programming language. It is the automatic conversion of an arbitrary object into a string representation that can later be de-converted to get the original object back. The reconstructed object will behave the same way as it did before conversion. This allows one to store arbitrary objects to files for example.

There is a pickling library available for Scala as well. You can of course do serialization in several different ways (for example, using the powerful support for XML available in Scala).

We will use the pickling library that is available from the following website for this example:

https://github.com/scala/pickling

You can install it by adding the following line to your build.sbt file:

libraryDependencies += "org.scala-lang.modules" %% "scala-
pickling" % "0.10.1"

After doing that, use the following import statements to enable easy pickling in your projects:

scala> import scala.pickling.Defaults._
import scala.pickling.Defaults._

scala> import scala.pickling.json._
import scala.pickling.json._

Here, you can see how you can then easily use this library to pickle and unpickle arbitrary objects without the use of annoying boiler plate code:

scala> val pklA = a.pickle
pklA: pickling.json.pickleFormat.PickleType =
JSONPickle({
"$type": "Person",
"name": "Name",
"surname": "Surname",
"age": 25
})
scala> val unpklA = pklA.unpickle[Person]
unpklA: Person = Person(Name,Surname,25)

Let’s see how this would work in an application using MPJ Express for message passing. A program using pickling to send a case class instance in a message is given here:

import mpi._
import scala.pickling.Defaults._
import scala.pickling.json._

case class ArbitraryObject(a: Array[Double], b: Array[Int],
c: String)

Here, we have chosen to define a fairly complex case class, consisting of two arrays of different types and a string:

object MPJTest {
def main(args: Array[String]) {
   MPI.Init(args)
   val me: Int = MPI.COMM_WORLD.Rank()
   val size: Int = MPI.COMM_WORLD.Size()
   if (me == 0) {
     val obj = ArbitraryObject(Array(1.0, 2.0, 3.0), Array(1, 2,
                               3), "Hello")
     val pkl = obj.pickle.value.toCharArray
     MPI.COMM_WORLD.Send(pkl, 0, pkl.size, MPI.CHAR, 1, 0)

In the preceding bit of code, we create an instance of our case class. We then pickle it to JSON and get the string representation of said JSON with the value method. However, to send it in an MPJ message, we need to convert it to a one-dimensional array of one of the supported types. Since it is a string, we convert it to a char array. This is done using the toCharArray method:

   } else if (me == 1) {
     val buf = new Array[Char](1000)
     MPI.COMM_WORLD.Recv(buf, 0, 1000, MPI.CHAR, 0, 0)
     val msg = buf.mkString
     val obj = msg.unpickle[ArbitraryObject]

On the receiving end, we get the raw char array, convert it back to string using mkString method, and then unpickle it using unpickle[T]. This will return an instance of the case class that we can use as any other instance of a case class. It is in its functionality the same object that was sent to us:

     println(msg)
     println(obj.c)
   }
   MPI.Finalize()
}
}

The following is the result of running the preceding program. It prints out the JSON representation of our object, and also show that we can access the attributes of said object by printing the c attribute.

MPJ Express (0.44) is started in the multicore configuration:

{
"$type": "ArbitraryObject",
"a": [
   1.0,
   2.0,
   3.0
],
"b": [
   1,
   2,
   3
],
"c": "Hello"
}
Hello

You can use this method to send arbitrary objects in an MPJ Express message. However, this is just one of many ways of doing this. As mentioned previously, an example of another way is to use the XML representation. XML support is strong in Scala, and you can use it to serialize objects as well. This will usually require you to add some boiler plate code to your program to serialize to XML. The method discussed earlier has the advantage of requiring no boiler plate code.

Non-blocking communication

So far, we examined only blocking (or synchronous) communication between two processes. This means that the process is blocked (halted their execution) until the Send or Recv methods have been completed successfully. This is simple to understand and enough for most cases. The problem with synchronous communication is that you have to be very careful otherwise deadlocks may occur.

Deadlocks are situations when processes wait for each other to release a resource first. Mexican standoff including the dining philosophers problem is one of the famous example of Deadlock in Operating System. The point is that if you are unlucky, you may end up with a program that is seemingly stuck and you don't know why.

Using nonlocking communication allows you to avoid these problems most of the time. If you think you may be at risk of deadlocks, you will probably want to use it. The signatures for the primary methods used in asynchronous communication are given here:

Request Isend(java.lang.Object buf, int offset, int count,
Datatype datatype, int dest, int tag)

Isend works similar to its Send counterpart. The main differences are that it does not block (the program continues execution after the call rather than waiting for a corresponding send), and then it returns a Request object. This object is used to check the status of your Send request, block until it is complete if required, and so on:

Request Irecv(java.lang.Object buf, int offset, int count, Datatype datatype, int src, int tag)

Irecv is again the same as Recv only non-blocking and returns a Request object used to handle your receive request. The operation of these methods can be seen in action in the following example:

import mpi._

object MPJTest {
def main(args: Array[String]) {
   MPI.Init(args)
   val me: Int = MPI.COMM_WORLD.Rank()
   val size: Int = MPI.COMM_WORLD.Size()
   if (me == 0) {
     val requests = for (i <- 0 until 10) yield {
       val buf = Array(i * i)
       MPI.COMM_WORLD.Isend(buf, 0, 1, MPI.INT, 1, 0)
     }
   } else if (me == 1) {
     for (i <- 0 until 10) {
       Thread.sleep(1000)
       val buf = Array[Int](0)
       val request = MPI.COMM_WORLD.Irecv
(buf, 0, 1, MPI.INT, 0, 0)
       request.Wait()
       println("RECEIVED: " + buf(0))
     }
   }
   MPI.Finalize()
}
}

This is a very simplistic example used simply to demonstrate the basics of using the asynchronous message passing methods. First, the process with rank 0 will send 10 messages to process with rank 1 using Isend. Since Isend does not block, the loop will finish quickly and the messages it sent will be buffered until they are retrieved using Irecv.

The second process (the one with rank 1) will wait for one second before retrieving each message. This is to demonstrate the asynchronous nature of these methods. The messages are in the buffer waiting to be retrieved. Therefore, Irecv can be used at your leisure when convenient. The Wait() method of the Request object, it returns, has to be used to retrieve results. The Wait() method blocks until the message is successfully received from the buffer.

Summary

Extremely computationally intensive programs are usually parallelized and run on supercomputing clusters. These clusters consist of multiple networked computers. Communication between these computers is usually done using messaging libraries such as MPI. These allow you to pass data between processes running on different machines in an efficient manner.

In this article, you have learned how to use MPJ Express—an MPI like library for JVM. We saw how to carry out process to process communication as well as collective communication. Most important MPJ Express primitives were covered and example programs using them were given.

Resources for Article:


Further resources on this subject:


You've been reading an excerpt of:

Scientific Computing with Scala

Explore Title
comments powered by Disqus