Reactive Data Streams

In this article by Shiti Saxena, author of the book Mastering Play Framework for Scala, we will discuss the Iteratee approach used to handle such situations. This article also covers the basics of handling data streams with a brief explanation of the following topics:

  • Iteratees
  • Enumerators
  • Enumeratees

(For more resources related to this topic, see here.)

Iteratee

Iteratee is defined as a trait, Iteratee[E, +A], where E is the input type and A is the result type. The state of an Iteratee is represented by an instance of Step, which is defined as follows:

sealed trait Step[E, +A] {
def it: Iteratee[E, A] = this match {
case Step.Done(a, e) => Done(a, e)
case Step.Cont(k) => Cont(k)
case Step.Error(msg, e) => Error(msg, e)
}
}
object Step {
//done state of an iteratee
case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
//continuing state of an iteratee.
case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) extends
Step[E, A]
//error state of an iteratee
case class Error[E](msg: String, input: Input[E]) extends Step[E,
Nothing]
}

The input used here represents an element of the data stream, which can be empty, an element, or an end of file indicator. Therefore, Input is defined as follows:

sealed trait Input[+E] {
def map[U](f: (E => U)): Input[U] = this match {
case Input.El(e) => Input.El(f(e))
case Input.Empty => Input.Empty
case Input.EOF => Input.EOF
}
}
object Input {
//An input element
case class El[+E](e: E) extends Input[E]
// An empty input
case object Empty extends Input[Nothing]
// An end of file input
case object EOF extends Input[Nothing]
}

An Iteratee is an immutable data type and each result of processing an input is a new Iteratee with a new state.

To handle the possible states of an Iteratee, there is a predefined helper object for each state. They are:

  • Cont
  • Done
  • Error

Let's see the definition of the readLine method, which utilizes these objects:

def readLine(line: List[Array[Byte]] = Nil): Iteratee[Array[Byte],
String] = Cont {
case Input.El(data) => {
val s = data.takeWhile(_ != '\n')
if (s.length == data.length) {
readLine(s :: line)
} else {
Done(new String(Array.concat((s :: line).reverse: _*),
"UTF-8").trim(), elOrEmpty(data.drop(s.length + 1)))
}
}
case Input.EOF => {
Error("EOF found while reading line", Input.Empty)
}
case Input.Empty => readLine(line)
}

The readLine method is responsible for reading a line and returning an Iteratee. As long as there are more bytes to be read, the readLine method is called recursively. On completing the process, an Iteratee with a completed state (Done) is returned, else an Iteratee with state continuous (Cont) is returned. In case the method encounters EOF, an Iteratee with state Error is returned.

In addition to these, Play Framework exposes a companion Iteratee object, which has helper methods to deal with Iteratees. The API exposed through the Iteratee object is documented at https://www.playframework.com/documentation/2.3.x/api/scala/index.html#play.api.libs.iteratee.Iteratee$.

The Iteratee object is also used internally within the framework to provide some key features. For example, consider the request body parsers. The apply method of the BodyParser object is defined as follows:

def apply[T](debugName: String)(f: RequestHeader =>
Iteratee[Array[Byte], Either[Result, T]]): BodyParser[T] = new
BodyParser[T] {
def apply(rh: RequestHeader) = f(rh)
override def toString = "BodyParser(" + debugName + ")"
}

So, to define BodyParser[T], we need to define a method that accepts RequestHeader and returns an Iteratee whose input is an Array[Byte] and results in Either[Result,T].

Let's look at some of the existing implementations to understand how this works.

The RawBuffer parser is defined as follows:

def raw(memoryThreshold: Int): BodyParser[RawBuffer] =
BodyParser("raw, memoryThreshold=" + memoryThreshold) { request =>
import play.core.Execution.Implicits.internalContext
val buffer = RawBuffer(memoryThreshold)
Iteratee.foreach[Array[Byte]](bytes => buffer.push(bytes)).map {
_ =>
buffer.close()
Right(buffer)
}
}

The RawBuffer parser uses Iteratee.forEach method and pushes the input received into a buffer.

The file parser is defined as follows:

def file(to: File): BodyParser[File] = BodyParser("file, to=" +
to) { request =>
import play.core.Execution.Implicits.internalContext
Iteratee.fold[Array[Byte], FileOutputStream](new
FileOutputStream(to)) {
(os, data) =>
os.write(data)
os
}.map { os =>
os.close()
Right(to)
}
}

The file parser uses the Iteratee.fold method to create FileOutputStream of the incoming data.

Now, let's see the implementation of Enumerator and how these two pieces fit together.

Enumerator

Similar to the Iteratee, an Enumerator is also defined through a trait and backed by an object of the same name:

trait Enumerator[E] {
parent =>
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]
...
}
object Enumerator{
def apply[E](in: E*): Enumerator[E] = in.length match {
case 0 => Enumerator.empty
case 1 => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
i.pureFoldNoEC {
case Step.Cont(k) => k(Input.El(in.head))
case _ => i
}
}
case _ => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
enumerateSeq(in, i)
}
}
...
}

Observe that the apply method of the trait and its companion object are different. The apply method of the trait accepts Iteratee[E, A] and returns Future[Iteratee[E, A]], while that of the companion object accepts a sequence of type E and returns an Enumerator[E].

Now, let's define a simple data flow using the companion object's apply method; first, get the character count in a given (Seq[String]) line:

val line: String = "What we need is not the will to believe, but
the wish to find out."
val words: Seq[String] = line.split(" ")
val src: Enumerator[String] = Enumerator(words: _*)
val sink: Iteratee[String, Int] = Iteratee.fold[String,
Int](0)((x, y) => x + y.length)
val flow: Future[Iteratee[String, Int]] = src(sink)
val result: Future[Int] = flow.flatMap(_.run)

The variable result has the Future[Int] type. We can now process this to get the actual count.

In the preceding code snippet, we got the result by following these steps:

  1. Building an Enumerator using the companion object's apply method:
    val src: Enumerator[String] = Enumerator(words: _*)
  2. Getting Future[Iteratee[String, Int]] by binding the Enumerator to an Iteratee:
    val flow: Future[Iteratee[String, Int]] = src(sink)
  3. Flattening Future[Iteratee[String,Int]] and processing it:
    val result: Future[Int] = flow.flatMap(_.run)
  4. Fetching the result from Future[Int]:

Thankfully, Play provides a shortcut method by merging steps 2 and 3 so that we don't have to repeat the same process every time. The method is represented by the |>>> symbol. Using the shortcut method, our code is reduced to this:

val src: Enumerator[String] = Enumerator(words: _*)
val sink: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y)
=> x + y.length)
val result: Future[Int] = src |>>> sink

Why use this when we can simply use the methods of the data type? In this case, do we use the length method of String to get the same value (by ignoring whitespaces)?

In this example, we are getting the data as a single String but this will not be the only scenario. We need ways to process continuous data, such as a file upload, or feed data from various networking sites, and so on.

For example, suppose our application receives heartbeats at a fixed interval from all the devices (such as cameras, thermometers, and so on) connected to it. We can simulate a data stream using the Enumerator.generateM method:

val dataStream: Enumerator[String] = Enumerator.generateM {
Promise.timeout(Some("alive"), 100 millis)
}

In the preceding snippet, the "alive" String is produced every 100 milliseconds. The function passed to the generateM method is called whenever the Iteratee bound to the Enumerator is in the Cont state. This method is used internally to build enumerators and can come in handy when we want to analyze the processing for an expected data stream.

An Enumerator can be created from a file, InputStream, or OutputStream. Enumerators can be concatenated or interleaved. The Enumerator API is documented at https://www.playframework.com/documentation/2.3.x/api/scala/index.html#play.api.libs.iteratee.Enumerator$.

Using the Concurrent object

The Concurrent object is a helper that provides utilities for using Iteratees, enumerators, and Enumeratees concurrently. Two of its important methods are:

  • Unicast: It is useful when sending data to a single iterate.
  • Broadcast: It facilitates sending the same data to multiple Iteratees concurrently.

Unicast

For example, the character count example in the previous section can be implemented as follows:

val unicastSrc = Concurrent.unicast[String](
channel =>
channel.push(line)
)
val unicastResult: Future[Int] = unicastSrc |>>> sink

The unicast method accepts the onStart, onError, and onComplete handlers. In the preceding code snippet, we have provided the onStart method, which is mandatory. The signature of unicast is this:

def unicast[E](onStart: (Channel[E]) ⇒ Unit,
onComplete: ⇒ Unit = (),
onError: (String, Input[E]) ⇒ Unit = (_: String, _: Input[E])
=> ())(implicit ec: ExecutionContext): Enumerator[E] {…}

So, to add a log for errors, we can define the onError handler as follows:

val unicastSrc2 = Concurrent.unicast[String](
channel => channel.push(line),
onError = { (msg, str) => Logger.error(s"encountered $msg for
$str")}
)

Now, let's see how broadcast works.

Broadcast

The broadcast[E] method creates an enumerator and a channel and returns a (Enumerator[E], Channel[E]) tuple. The enumerator and channel thus obtained can be used to broadcast data to multiple Iteratees:

val (broadcastSrc: Enumerator[String], channel:
Concurrent.Channel[String]) = Concurrent.broadcast[String]
private val vowels: Seq[Char] = Seq('a', 'e', 'i', 'o', 'u')
def getVowels(str: String): String = {
val result = str.filter(c => vowels.contains(c))
result
}
def getConsonants(str: String): String = {
val result = str.filterNot(c => vowels.contains(c))
result
}
val vowelCount: Iteratee[String, Int] = Iteratee.fold[String,
Int](0)((x, y) => x + getVowels(y).length)
val consonantCount: Iteratee[String, Int] =
Iteratee.fold[String, Int](0)((x, y) => x +
getConsonants(y).length)
val vowelInfo: Future[Int] = broadcastSrc |>>> vowelCount
val consonantInfo: Future[Int] = broadcastSrc |>>>
consonantCount
words.foreach(w => channel.push(w))
channel.end()
vowelInfo onSuccess { case count => println(s"vowels:$count")}
consonantInfo onSuccess { case count =>
println(s"consonants:$count")}

Enumeratee

Enumeratee is also defined using a trait and its companion object with the same Enumeratee name.

It is defined as follows:

trait Enumeratee[From, To] {
...
def applyOn[A](inner: Iteratee[To, A]): Iteratee[From,
Iteratee[To, A]]
def apply[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To,
A]] = applyOn[A](inner)
...
}

An Enumeratee transforms the Iteratee given to it as input and returns a new Iteratee. Let's look at a method that defines an Enumeratee by implementing the applyOn method. An Enumeratee's flatten method accepts Future[Enumeratee] and returns an another Enumeratee, which is defined as follows:

def flatten[From, To](futureOfEnumeratee:
Future[Enumeratee[From, To]]) = new Enumeratee[From, To] {
def applyOn[A](it: Iteratee[To, A]): Iteratee[From,
Iteratee[To, A]] =
Iteratee.flatten(futureOfEnumeratee.map
(_.applyOn[A](it))(dec))
}

In the preceding snippet, applyOn is called on the Enumeratee whose future is passed and dec is defaultExecutionContext.

Defining an Enumeratee using the companion object is a lot simpler. The companion object has a lot of methods to deal with enumeratees, such as map, transform, collect, take, filter, and so on. The API is documented at https://www.playframework.com/documentation/2.3.x/api/scala/index.html#play.api.libs.iteratee.Enumeratee$.

Let's define an Enumeratee by working through a problem. The example we used in the previous section to find the count of vowels and consonants will not work correctly if a vowel is capitalized in a sentence, that is, the result of src |>>> vowelCount will be incorrect when the line variable is defined as follows:

val line: String = "What we need is not the will to believe, but the wish to find out.".toUpperCase

To fix this, let's alter the case of all the characters in the data stream to lowercase. We can use an Enumeratee to update the input provided to the Iteratee.

Now, let's define an Enumeratee to return a given string in lowercase:

val toSmallCase: Enumeratee[String, String] =
Enumeratee.map[String] {
s => s.toLowerCase
}

There are two ways to add an Enumeratee to the dataflow. It can be bound to the following:

  • Enumerators
  • Iteratees

Summary

In this article, we discussed the concept of Iteratees, Enumerators, and Enumeratees. We also saw how they were implemented in Play Framework and used internally.

Resources for Article:


Further resources on this subject:


You've been reading an excerpt of:

Mastering Play Framework for Scala

Explore Title