Reader small image

You're reading from  Asynchronous Android Programming - Second Edition

Product typeBook
Published inJul 2016
Reading LevelBeginner
PublisherPackt
ISBN-139781785883248
Edition2nd Edition
Languages
Tools
Right arrow
Author (1)
Steve Liles
Steve Liles
author image
Steve Liles

Steve Liles is a self-confessed geek and has been an Android fan since the launch day of the G1. When he isn't at work building publishing systems and apps for newspapers and magazines, you'll find him tinkering with his own apps, building 3D printers, or playing RTS games. He is currently working with a start-up to build an advertising system that links the print and digital worlds using computer vision on Android and iOS devices.
Read more about Steve Liles

Right arrow

Chapter 12. Asynchronous Programing with RxJava

In previous chapters, we have been using Android-based constructs such as Loader and AsyncTask to offload work from the main thread to low priority background threads.

Although these straightforward constructs are able to deliver results that require intensive IO operations or network data, they don't provide out-of-the-box solutions for exception handling, task composition, and asynchronous event processing.

Beyond that, the popular AsyncTask construct is not able to deal with Activity or fragment configuration changes or cache results between configuration changes. Therefore, to cope with these kinds of problem, most of time the developer ends up creating a lot of extra code and complicated flows to handle the traits of these simple constructs.

To simplify the development of composable asynchronous work, we will introduce you to RxJava, a functional framework that allow us to observe, transform, filter, and react to streams of events (click...

Introduction to RxJava


RxJava is an implementation of Reactive Extensions (ReactiveX) on JVM, which was developed by Netflix and is used to compose asynchronous event processing that reacts to an observable source of events.

The framework extends the Observer pattern by allowing us to create a stream of events that could be intercepted by operator (input/output) functions that modify the original stream of events and deliver the result or an error to a final Observer. This framework abstracts away concerns about things such as low-level threading, synchronization, thread safety, concurrent data structures, and non-blocking I/O.

There are three main basic building blocks that interact with each other in RxJava processing, the Observable, the Observer, and the Subscriber.

An Observable is an entity that emits a sequence of events (zero or more events) of the generic type T (such as String or any Java type) at any point in time, or emits a Throwable when a failure occurs during the event processing...

RxJava setup


Before we move further, let's add the required libraries to your project. If you're using Android Studio, just add the following dependencies to the module build.gradle script:

dependencies {
    …
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'io.reactivex:rxjava:1.1.0'
}

rxjava is a library that implements the Reactive Extensions (http://reactivex.io/) on Java , and rxandroid is a library that adds classes to help write reactive components with RxJava in Android applications.

Creating Observables


To create an Observable, we can either create an Observable from scratch using the create function and calling Observer methods explicitly, or we can use built-in Observable creation methods that convert common data types to Observable streams.

Let's start with a simple example and create an observable that emits a String using the creating Observable.from operator:

Observable<String> myObservable =
  Observable.from(Arrays.asList("Hello from RxJava",
                                "Welcome...",
                                "Goodbye"));

The Observable.from static function creates Observable from an array that will synchronously emit String items to any Observer. The Observable created will be a cold Observable and will only start emitting events after an Observer subscribes to it.

Now, let's create a Subscriber that consumes the data and prints each String to the Android Log until Observable invokes the onComplete callback:

Subscriber<String> mySubscriber...

Transforming Observables


Apart from the ability to widely implement the Observable-Subscribe software pattern, the RxJava framework allows us to transform, filter, convert, aggregate, manipulate, and work with the stream of items emitted by Observable by using Observable operators. These entities are able to completely transform the event stream before the events are delivered to the final Subscriber.

RxJava comes with a handy collection of operators that are able to transform the event's content and control the time that the event is delivered.

Let's describe the most common operators available on RxJava:

  • map: Applies a function to each item emitted and emits the result of the function as a new item.

  • flatMap: Applies a function to each item emitted by the source Observable where the function returns an Observable that could emit a different number of items or a different type of event.

  • filter: A transformation operator that uses a function that verifies if each item emitted by the source...

Understanding Schedulers


There is an incorrect misconception and belief that RxJava processing is multithreaded by default. An Observable and the list of transformations applied by the specified operators occur on the same thread that the subscription is made.

Hence, on Android, if the subscription is carried out on the main thread, the operators chain processing will run on the main thread, blocking the UI until the work is done.

While this behavior might work for lightweight processing tasks, when the operation requires IO interaction or CPU-intensive computing, the task execution might block the main Thread and crash the application with an ANR.

To simplify the asynchronous and concurrent executions, the RxJava framework allows us to define a Scheduler entity that defines the thread where a unit of work is executed.

The subscribeOn(Scheduler) operator allows us to set the Scheduler that defines the thread on which the subscription has been made and the Observable will start to operate.

When...

Performing IO operations with Schedulers


In the next example, we will use Schedulers to mirror the behavior of AsyncTask and retrieve text from the network on the background thread. Subsequently, the result will be published to a Subscriber that runs on the main Thread.

First of all, we will create a function that creates an Observable that emits the String retrieved from the network:

Observable<String> getTextFromNetwork(final String url) {

  return Observable.create(
    new Observable.OnSubscribe<String>() {
      @Override
      public void call(Subscriber<? super String> sub) {
        try {
          String text = downloadText(url);
          sub.onNext(text);
          sub.onCompleted();

        } catch (Throwable t) {
          sub.onError(t);
        }
      }
    }
  );
}

Before we specify the Scheduler used to run our asynchronous call, we need to state two assumptions:

  • Since the code that runs on Observable performs a network operation we must run Observable on...

Canceling subscriptions


When an Activity or a Fragment gets destroyed, our chain could continue to run in the background, preventing the Activity from being disposed if the chain has references to the Activity or Fragment. When you no longer need the result of the chain, it could make sense to cancel the subscription and terminate the chain execution.

When we call the Observable.subscribe() function, it returns a Subscription object that can be used to terminate the chain immediately:

Subscription subscription = getTextFromNetwork(
               "http://demo1472539.mockable.io/mytet")
               ...
               .subscribe(new MySubscriber());

Again, the most appropriate Activity lifecycle method for this is onPause, which is guaranteed to be called before the Activity finishes:

protected void onPause() {
  super.onPause();
  if ((subscription != null) && (isFinishing()))
    subscription.unsubscribe();
}

Composing Observables


As we explained earlier, an Observable interface is defined in a way that allows us to chain and combine different Observables to create complex tasks in a functional and declarative way.

Starting from our previous work, in our next example, we will make use of the RxJava composing feature and execute a second network call that depends on the previous Observable that will translate the text downloaded using a web service before we emit the translated text to the Subscriber.

To execute the translation on the network on a logically separate unit, we will create a new Observable that receives the text to translate, executes the task on the network, and emits the translated text as a String to the following Observable:

Observable<String> translateOnNetwork(final String url,
                                      final String toTranslate) {
  return Observable.create(
    new Observable.OnSubscribe<String>() {
      @Override
      public void call(Subscriber<...

Monitoring the event stream


Although so far we have been using the Observable operators to manipulate stream events, there are operators that allow us to monitor the events without changing them. These operators, known sometimes as utility operators, are able to react to the events or errors emitted on the Observable chain created between the source Observable and the final Subscriber without creating any side effects.

Let's enumerate them and explain the more common utility operators used to observe the event stream:

  • doOnSubscribe(Action0): Registers an Action0 function to get called when a Subscriber subscribes to the Observable.

  • doOnUnsubscribe(Action0): Registers an Action0 function to get called when a Subscriber unsubscribes from the Observable.

  • doOnNext(Action1): Registers an Action1 to be called when a new event is emitted from the source Observable. The Event <T> object is also passed as an argument to the Action1 function.

  • doOnCompleted(Action0): Registers an Action0 function...

Combining Observables


In the previous example, we used two Observable to create a simple sequence of network operations. The second asynchronous operation operated with the result of the first operation and the two operations that executed serially produced a String result that updates the UI.

In our next example, we will run two tasks in parallel and combine the results of both operations using a combining RxJava operator. Each operation will retrieve asynchronously a JSON Object from the network and combine both results in the JSON Object to produce the JSON String passed to the UI main Thread.

Since we only want to emit one Event or an error from the operation, we are going to use, for the first time, a special kind of Observer, Single.

While an Observable is able to invoke onNext, onError, and onCompleted Observer functions, a Single entity will only invoke either onSuccess or onError to a SingleSubscriber:

 // Success callback invoked on success
 void onSuccess(T value);

 // Callback to...

Observing UI Events with RxJava


So far, we have been using RxJava to process and manipulate data streams, which simplified the development of asynchronous that require IO blocking operations that will hang the application for a while.

In this section, we want to explain how to use RxJava and reactive streams to simplify the handling of UI events generated from Android Widgets.

In our next example, we will present a list of Soccer Teams with an instant search result input field. As you type in the input field, the names available in the list will be filtered if the text that you typed matches the beginning of any soccer team on the list.

To achieve the result required, we will create a custom Observable that attaches a TextWatcher to the searching input field, listens for onTextChanged events, and emits a String event when the text changes.

The Observer will feed a reactive functional stream that will filter our list of teams in a Recycler View.

First, we will write a Custom Observable that registers...

Working with Subjects


So far, we have been working with Observables, Subscriber, Observer, and Scheduler entities to create our RxJava functional processing lines. In this section, we will introduce the reader to a new kind of entity in the RxJava framework, the Subject. The Subject is a sort of adapter or bridge entity that acts as an Observable and Observer:

public abstract class      Subject<T,R>
                extends    Observable<R>
                implements Observer<T>

Since it can act as a Subscriber, it can subscribe to one or more Observables that emit Objects of the generic type T, and since it acts as an Observable, it can emit events of the generic type R and receive subscriptions from other Subscriber. Hence, it can emit events of the same type as received or emit a different type of event.

For example, the Subject<String, Integer> will receive events of type String and emit events of the type Integer.

The Subject could receive the events from the Observable...

Summary


In this final chapter, we learned how to use RxJava, an open source library that helps to process our Android application data or event streams using functional and reactive processing pipelines.

In the first sections, we learned in detail some of RxJava basic building blocks—Observable, Observer, and Subscriber.

Next, we introduced some of RxJava most common operators that are able to manipulate, transform, and combine event streams generated by an Observable.

In order to perform operations asynchronously and concurrently, we learned about the Scheduler, a magic RxJava entity that controls the concurrency, and is able to schedule RxJava units of work to run in background threads and feed the results back to the main Android Thread.

Next, using custom Observables and combining operators, we learned how to associate and compose interdependent complex blocking or long computing operations, such as REST API network operation.

In the meantime, we also learned how to react to a custom Observable...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Asynchronous Android Programming - Second Edition
Published in: Jul 2016Publisher: PacktISBN-13: 9781785883248
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at AU $19.99/month. Cancel anytime

Author (1)

author image
Steve Liles

Steve Liles is a self-confessed geek and has been an Android fan since the launch day of the G1. When he isn't at work building publishing systems and apps for newspapers and magazines, you'll find him tinkering with his own apps, building 3D printers, or playing RTS games. He is currently working with a start-up to build an advertising system that links the print and digital worlds using computer vision on Android and iOS devices.
Read more about Steve Liles