Learning RxJava

4.5 (23 reviews total)
By Thomas Nield
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Thinking Reactively

About this book

RxJava is a library for composing asynchronous and event-based programs using Observable sequences for the JVM, allowing developers to build robust applications in less time.

Learning RxJava addresses all the fundamentals of reactive programming to help readers write reactive code, as well as teach them an effective approach to designing and implementing reactive libraries and applications.

Starting with a brief introduction to reactive programming concepts, there is an overview of Observables and Observers, the core components of RxJava, and how to combine different streams of data and events together. You will also learn simpler ways to achieve concurrency and remain highly performant, with no need for synchronization. Later on, we will leverage backpressure and other strategies to cope with rapidly-producing sources to prevent bottlenecks in your application. After covering custom operators, testing, and debugging, the book dives into hands-on examples using RxJava on Android as well as Kotlin.

Publication date:
June 2017
Publisher
Packt
Pages
400
ISBN
9781787120426

 

Chapter 1. Thinking Reactively

It is assumed you are fairly comfortable with Java and know how to use classes, interfaces, methods, properties, variables, static/nonstatic scopes, and collections. If you have not done concurrency or multithreading, that is okay. RxJava makes these advanced topics much more accessible.

Have your favorite Java development environment ready, whether it is Intellij IDEA, Eclipse, NetBeans, or any other environment of your choosing. I will be using Intellij IDEA, although it should not matter or impact the examples in this book. I recommend that you have a build automation system as well such as Gradle or Maven, which we will walk through shortly.

Before we dive deep into RxJava, we will cover some core topics first:

  • A brief history of Reactive Extensions and RxJava
  • Thinking reactively
  • Leveraging RxJava
  • Setting up your first RxJava project
  • Building your first reactive applications
  • Differences between RxJava 1.0 and RxJava 2.0
 

A brief history of ReactiveX and RxJava


As developers, we tend to train ourselves to think in counter-intuitive ways. Modeling our world with code has never been short of challenges. It was not long ago that object-oriented programming was seen as the silver bullet to solve this problem. Making blueprints of what we interact with in real life was a revolutionary idea, and this core concept of classes and objects still impacts how we code today. However, business and user demands continued to grow in complexity. As 2010 approached, it became clear that object-oriented programming only solved part of the problem. Classes and objects do a great job of representing an entity with properties and methods, but they become messy when they need to interact with each other in increasingly complex (and often unplanned) ways. Decoupling patterns and paradigms emerged, but this yielded an unwanted side effect of growing amounts of boilerplate code. In response to these problems, functional programming began to make a comeback, not to replace object-oriented programming, but rather to complement it and fill this void. Reactive programming, a functional event-driven programming approach, began to receive special attention. A couple of reactive frameworks emerged ultimately, including Akka and Sodium. But at Microsoft, a computer scientist named Erik Meijer created a reactive programming framework for .NET called Reactive Extensions. In a matter of years, Reactive Extensions (also called ReactiveX or Rx) was ported to several languages and platforms, including JavaScript, Python, C++, Swift, and Java, of course. ReactiveX quickly emerged as a cross-language standard to bring reactive programming into the industry. RxJava, the ReactiveX port for Java, was created in large part by Ben Christensen from Netflix and David Karnok. RxJava 1.0 was released in November 2014, followed by RxJava 2.0 in November 2016. RxJava is the backbone to other ReactiveX JVM ports, such as RxScala, RxKotlin, and RxGroovy. It has become a core technology for Android development and has also found its way into Java backend development. Many RxJavaadapter libraries, such as RxAndroid (https://github.com/ReactiveX/RxAndroid), RxJava-JDBC (https://github.com/davidmoten/rxjava-jdbc), RxNetty (https://github.com/ReactiveX/RxNetty), and  RxJavaFX (https://github.com/ReactiveX/RxJavaFX) adapted several Java frameworks to become reactive and work with RxJava out of the box. This all shows that RxJava is more than a library. It is part of a greater ReactiveX ecosystem that represents an entire approach to programming. The fundamental idea of ReactiveX is that events are data and data are events. This is a powerful concept that we will explore later in this chapter, but first, let's step back and look at the world through the reactive lens.

 

Thinking reactively


Suspend everything you know about Java (and programming in general) for a moment, and let's make some observations about our world. These may sound like obvious statements, but as developers, we can easily overlook them. Bring your attention to the fact that everything is in motion. Traffic, weather, people, conversations, financial transactions, and so on are all moving. Technically, even something stationary as a rock is in motion due to the earth's rotation and orbit. When you consider the possibility that everything can be modeled as in motion, you may find it a bit overwhelming as a developer.   Another observation to note is that these different events are happening concurrently. Multiple activities are happening at the same time. Sometimes, they act independently, but other times, they can converge at some point to interact. For instance, a car can drive with no impact on a person jogging. They are two separate streams of events. However, they may converge at some point and the car will stop when it encounters the jogger.   If this is how our world works, why do we not model our code this way?. Why do we not model code as multiple concurrent streams of events or data happening at the same time? It is not uncommon for developers to spend more time managing the states of objects and doing it in an imperative and sequential manner. You may structure your code to execute Process 1, Process 2, and then Process 3, which depends on Process 1 and Process 2. Why not kick-off Process 1 and Process 2 simultaneously, and then the completion of these two events immediately kicks-off Process 3? Of course, you can use callbacks and Java concurrency tools, but RxJava makes this much easier and safer to express.   Let's make one last observation. A book or music CD is static. A book is an unchanging sequence of words and a CD is a collection of tracks. There is nothing dynamic about them. However, when we read a book, we are reading each word one at a time. Those words are effectively put in motion as a stream being consumed by our eyes. It is no different with a music CD track, where each track is put in motion as sound waves and your ears are consuming each track. Static items can, in fact, be put in motion too. This is an abstract but powerful idea because we made each of these static items a series of events. When we level the playing field between data and events by treating them both the same, we unleash the power of functional programming and unlock abilities you previously might have thought impractical. The fundamental idea behind reactive programming is that events are data and data are events. This may seem abstract, but it really does not take long to grasp when you consider our real-world examples. The runner and car both have properties and states, but they are also in motion. The book and CD are put in motion when they are consumed. Merging the event and data to become one allows the code to feel organic and representative of the world we are modeling.

 

Why should I learn RxJava?


 ReactiveX and RxJava paints a broad stroke against many problems programmers face daily, allowing you to express business logic and spend less time engineering code. Have you ever struggled with concurrency, event handling, obsolete data states, and exception recovery? What about making your code more maintainable, reusable, and evolvable so it can keep up with your business? It might be presumptuous to call reactive programming a silver bullet to these problems, but it certainly is a progressive leap in addressing them. There is also growing user demand to make applications real time and responsive. Reactive programming allows you to quickly analyse and work with live data sources such as Twitter feeds or stock prices. It can also cancel and redirect work, scale with concurrency, and cope with rapidly emitting data. Composing events and data as streams that can be mixed, merged, filtered, split, and transformed opens up radically effective ways to compose and evolve code. In summary, reactive programming makes many hard tasks easy, enabling you to add value in ways you might have thought impractical earlier. If you have a process written reactively and you discover that you need to run part of it on a different thread, you can implement this change in a matter of seconds. If you find network connectivity issues crashing your application intermittently, you can gracefully use reactive recovery strategies that wait and try again. If you need to inject an operation in the middle of your process, it is as simple as inserting a new operator. Reactive programming is broken up into modular chain links that can be added or removed, which can help overcome all the aforementioned problems quickly. In essence, RxJava allows applications to be tactical and evolvable while maintaining stability in production.

What we will learn in this book?

 As stated earlier, RxJava is the ReactiveX port for Java. In this book, we will focus primarily on RxJava 2.0, but I will call out significant differences in RxJava 1.0. We will place priority on learning to think reactively and leverage the practical features of RxJava. Starting with a high-level understanding, we will gradually move deeper into how RxJava works. Along the way, we will learn about reactive patterns and tricks to solve common problems programmers encounter.    In Chapter 2, The Observable and Subscribers,Chapter 3, Basic Operators, and Chapter 4, Combining Observables, we will cover core Rx concepts with Observable, Observer, and Operator. These are the three core entities that make up RxJava applications. You will start writing reactive programs immediately and have a solid knowledge foundation to build on for the rest of the book.  Chapter 5, Multicasting, Replaying, and Caching, and Chapter 6, Concurrency and Parallelization, will explore more of the nuances of RxJava and how to effectively leverage concurrency.

In Chapter 7, Switching, Throttling, Windowing, and Buffering and Chapter 8, Flowables and Backpressure, we will learn about the different ways to cope with reactive streams that produce data/events faster than they can be consumed.

Finally, Chapter 9, Transformers and Custom Operators, Chapter 10, Testing and Debugging, Chapter 11, RxJava on Android, and Chapter 12, Using RxJava with Kotlin New, will touch on several miscellaneous (but essential) topics including custom operators as well as how to use RxJava with testing frameworks, Android, and the Kotlin language.

 

Setting up


There are two co-existing versions of RxJava currently: 1.0 and 2.0. We will go through some of the major differences later and discuss which version you should use.

RxJava 2.0 is a fairly lightweight library and comes just above 2 Megabytes (MBs) in size. This makes it practical for Android and other projects that require a low dependency overhead. RxJava 2.0 has only one dependency, called Reactive Streams ( http://www.reactive-streams.org/), which is a core library (made by the creators of RxJava) that sets a standard for asynchronous stream implementations, one of which is RxJava 2.0.

It may be used in other libraries beyond RxJava and is a critical effort in the standardization of reactive programming on the Java platform. Note that RxJava 1.0 does not have any dependencies, including Reactive Streams, which was realized after 1.0.

 If you are starting a project from scratch, try to use RxJava 2.0. This is the version we will cover in this book, but I will call out significant differences in 1.0. While RxJava 1.0 will be supported for a good while due to countless projects using it, innovation will likely only continue onward in RxJava 2.0. RxJava 1.0 will only get maintenance and bug fixes.

Both RxJava 1.0 and 2.0 run on Java 1.6+. In this book, we will use Java 8, and it is recommended that you use a minimum of Java 8 so you can use lambdas out of the box. For Android, there are ways to leverage lambdas in earlier Java versions that will be addressed later. But weighing the fact that Android Nougat uses Java 8 and Java 8 has been out since 2014, hopefully, you will not have to do any workarounds to leverage lambdas.

Navigating the Central Repository

To bring in RxJava as a dependency, you have a few options. The best place to start is to go to The Central Repository (search http://search.maven.org/) and search for rxjav. You should see RxJava 2.0 and RxJava 1.0 as separate repositories at the top of the search results, as shown in the following screenshot:

Searching for RxJava in the Central Repository (RxJava 2.0 and 1.0 are highlighted)

At the time of writing, RxJava 2.0.2 is the latest version for RxJava 2.0 and RxJava 1.2.3 is the latest for RxJava 1.0. You can download the latest JAR file for either by clicking the JAR links in the far right under the Download column. You can then configure your project to use the JAR file.

However, you might want to consider using Gradle or Maven to automatically import these libraries into your project. This way, you can easily share and store your code project (through GIT or other version control systems) without having to download and configure RxJava manually into it each time. To view the latest configurations for Maven, Gradle, and several other build automation systems, click on the version number for either of the repositories, as highlighted in the following screenshot:

Click the version number under the Latest Version column to view the configurations for Maven, Gradle, and other major build automation systems

Using Gradle

There are several automated build systems available, but the two most mainstream options are Gradle and Maven. Gradle is somewhat a successor to Maven and is especially the go-to build automation solution for Android development. If you are not familiar with Gradle and would like to learn how to use it, check out the Gradle Getting Started guide (https://gradle.org/getting-started-gradle-java/).

There are also several decent books that cover Gradle in varying degrees of depth, which you can find at https://gradle.org/books/. The following screenshot displays the The Central Repository page showing how to set up RxJava 2.0.2 for Gradle:

You can find the latest Gradle configuration code and copy it into your Gradle script

In your build.gradle script, ensure that you have declared mavenCentral() as one of your repositories. Type in or paste that dependency line compile 'io.reactivex.rxjava2:rxjava:x.y.z', where x.y.z is the version number you want to use, as shown in the following code snippet:

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
      mavenCentral()
}

dependencies {
      compile 'io.reactivex.rxjava2:rxjava:x.y.z'
}

Build your Gradle project and you should be good to go! You will then have RxJava and its types available for use in your project.

Using Maven

You also have the option to use Maven, and you can view the appropriate configuration in The Central Repository by selecting the Apache Maven configuration information, as shown in the following screenshot:

Select and then copy the Apache Maven configuration

You can then copy and paste the <dependency> block containing the RxJava configuration and paste it inside a  <dependencies>block in your pom.xml file. Rebuild your project, and you should now have RxJava set up as a dependency. The x.y.z version number corresponds to the desired RxJava version that you want to use:

<project>
  <modelVersion>4.0.0</modelVersion>
    <groupId>org.nield</groupId>
    <artifactId>mavenrxtest</artifactId>
    <version>1.0</version>
  <dependencies>
    <dependency>
     <groupId>io.reactivex.rxjava2</groupId>
     <artifactId>rxjava</artifactId>
     <version>x.y.z</version>
    </dependency>
  </dependencies>
</project>
 

A quick exposure to RxJava


 Before we dive deep into the reactive world of RxJava, here is a quick exposure to get your feet wet first. In ReactiveX, the core type you will work with is the Observable. We will be learning more about the Observable throughout the rest of this book. But essentially, an Observable pushes things. A given Observable<T>pushes things of type T through a series of operators until it arrives at an Observer that consumes the items. For instance, create a new Launcher.java file in your project and put in the following code:

import io.reactivex.Observable;
public class Launcher {
      public static void main(String[] args) {
        Observable<String> myStrings =
          Observable.just("Alpha", "Beta", "Gamma", "Delta", 
"Epsilon");
      }
}

In our main() method,  we have an Observable<String> that will push five string objects. An Observable can push data or events from virtually any source, whether it is a database query or live Twitter feeds. In this case, we are quickly creating an Observable using Observable.just(), which will emit a fixed set of items.

Note

In RxJava 2.0, most types you will use are contained in the io.reactivex package. In RxJava 1.0, the types are contained in the rx package.

However, running this main() method is not going to do anything other than declare Observable<String>. To make this Observable actually push these five strings (which are called emissions), we need an Observer to subscribe to it and receive the items. We can quickly create and connect an Observer by passing a lambda expression that specifies what to do with each string it receives:

import io.reactivex.Observable;

public class Launcher {
      public static void main(String[] args)  {
        Observable<String> myStrings =
          Observable.just("Alpha", "Beta", "Gamma", "Delta", 
"Epsilon");

        myStrings.subscribe(s -> System.out.println(s));
      }
}

 When we run this code, we should get the following output:

    Alpha 
    Beta 
    Gamma 
    Delta 
    Epsilon

What happened here is that our Observable<String> pushed each string object one at a time to our Observer, which we shorthanded using the lambda expression s -> System.out.println(s). We pass each string through the parameter s (which I arbitrarily named) and instructed it to print each one. Lambdas are essentially mini functions that allow us to quickly pass instructions on what action to take with each incoming item. Everything to the left of the arrow -> are arguments (which in this case is a string we named s), and everything to the right is the action (which is System.out.println(s)).   If you are unfamiliar with lambda expressions, turn to Appendix, to learn more about how they work. If you want to invest extra time in understanding lambda expressions, I highly recommend that you read at least the first few chapters of Java 8 Lambdas (O'Reilly) (http://shop.oreilly.com/product/0636920030713.do) by Richard Warburton. Lambda expressions are a critical topic in modern programming and have become especially relevant to Java developers since their adoption in Java 8. We will be using lambdas constantly in this book, so definitely take some time getting comfortable with them.

 We can also use several operators between Observable and Observer to transform each pushed item or manipulate them in some way. Each operator returns a new Observable derived-off the previous one but reflects that transformation. For example, we can use map() to turn each string emission into its length(), and each length integer will then be pushed to Observer , as shown in the following code snippet:

import io.reactivex.Observable;

public class Launcher {
 public static void main(String[] args) {

   Observable<String> myStrings =
     Observable.just("Alpha", "Beta", "Gamma", "Delta",
      "Epsilon");

     myStrings.map(s -> s.length()).subscribe(s -> 
     System.out.println(s));
  }
}

When we run this code, we should get the following output:

    5
    4
    5
    5
    7

If you have used Java 8 Streams or Kotlin sequences, you might be wondering how Observable is any different. The key difference is that Observable pushes the items while Streams and sequences pull the items. This may seem subtle, but the impact of a push-based iteration is far more powerful than a pull-based one. As we saw earlier, you can push not only data, but also events. For instance, Observable.interval() will push a consecutive Long at each specified time interval, as shown in the following code snippet. This Long emission is not only data, but also an event! Let's take a look:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
      public static void main(String[] args) {
        Observable<Long> secondIntervals =
          Observable.interval(1, TimeUnit.SECONDS);

        secondIntervals.subscribe(s -> System.out.println(s));

        /* Hold main thread for 5 seconds
        so Observable above has chance to fire */
        sleep(5000);
      }

      public static void sleep(long millis) {
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
}

When we run this code, we should get the following output:


0 
1 
2 
3 
4 

When you run the preceding code, you will see that a consecutive emission fires every second. This application will run for about five seconds before it quits, and you will likely see emissions 0 to 4 fired, each separated by a just a second's gap. This simple idea that data is a series of events over time will unlock new possibilities in how we tackle programming.

On a side note, we will get more into concurrency later, but we had to create a sleep() method because this Observable fires emissions on a computation thread when subscribed to. The main thread used to launch our application is not going to wait on this Observable since it fires on a computation thread, not the main thread. Therefore, we use sleep() to pause the main thread for 5000 milliseconds and then allow it to reach the end of the main() method (which will cause the application to terminate). This gives Observable.interval() a chance to fire for a five second window before the application quits.

Throughout this book, we will uncover many mysteries about Observable and the powerful abstractions it takes care of for us. If you've conceptually understood what is going on here so far, congrats! You are already becoming familiar with how reactive code works. To emphasize again, emissions are pushed one at a time all the way to Observer. Emissions represent both data and an event, which can be emitted over time. Of course, beyond map(), there are hundreds of operators in RxJava, and we will learn about the key ones in this book. Learning which operators to use for a situation and how to combine them is the key to mastering RxJava. In the next chapter, we will cover Observable and Observer much more comprehensively. We will also demystify events and data being represented in Observable a bit more.

 

RxJava 1.0 versus RxJava 2.0 - which one do I use?


As stated earlier, you are encouraged to use RxJava 2.0 if you can. It will continue to grow and receive new features, while RxJava 1.0 will be maintained for bug fixes. However, there are other considerations that may lead you to use RxJava 1.0.

If you inherit a project that is already using RxJava 1.0, you will likely continue using that until it becomes feasible to refactor to 2.0. You can also check out David Akarnokd's RxJava2Interop project (https://github.com/akarnokd/RxJava2Interop), which converts Rx types from RxJava 1.0 to RxJava 2.0 and vice versa. After you finish this book, you may consider using this library to leverage RxJava 2.0 even if you have the RxJava 1.0 legacy code.

In RxJava, there are several libraries to make several Java APIs reactive and plug into RxJava seamlessly. Just to name a few, these libraries include RxJava-JDBC, RxAndroid, RxJava-Extras, RxNetty, and RxJavaFX. At the time of writing this, only RxAndroid and RxJavaFX have been fully ported to RxJava 2.0 (although many other libraries are following). By the time you are reading this, all major RxJava extension libraries will hopefully be ported to RxJava 2.0.

You will also want to prefer RxJava 2.0 because it was built on much of the hindsight and wisdom gained from RxJava 1.0. It has better performance, simpler APIs, a cleaner approach to backpressure, and a bit more safety when hacking together your own operators.

 

When to use RxJava


A common question ReactiveX newcomers ask is what circumstances warrant a reactive approach? Do we always want to use RxJava? As someone who has been living and breathing reactive programming for a while, I have learned that there are two answers to this question:

The first answer is when you first start out: yes! You always want to take a reactive approach. The only way to truly become a master of reactive programming is to build reactive applications from the ground up. Think of everything as Observable and always model your program in terms of data and event flows. When you do this, you will leverage everything reactive programming has to offer and see the quality of your applications go up significantly.

The second answer is that when you become experienced in RxJava, you will find cases where RxJava may not be appropriate. There will occasionally be times where a reactive approach may not be optimal, but usually, this exception applies to only part of your code. Your entire project itself should be reactive. There may be parts that are not reactive and for good reason. These exceptions only stand out to a trained Rx veteran who sees that returning List<String> is perhaps better than returning Observable<String>. Rx greenhorns should not worry about when something should be reactive versus something not reactive. Over time, they will start to see cases where the benefits of Rx are marginalized, and this is something that only comes with experience.

So for now, no compromises. Go reactive all the way!

 

Summary


 In this chapter, we learned how to look at the world in a reactive way. As a developer, you may have to retrain yourself from a traditional imperative mindset and develop a reactive one. Especially if you have done imperative, object-oriented programming for a long time, this can be challenging. But the return on investment will be significant as your applications will become more maintainable, scalable, and evolvable. You will also have faster turn around and more legible code.

We also covered how to configure a RxJava project using Gradle or Maven and what decisions should drive whether you should choose RxJava 2.0 versus RxJava 1.0. We also got a brief introduction to reactive code and how Observable works through push-based iteration.   By the time you finish this book, you will hopefully find reactive programming intuitive and easy to reason with. I hope you find that RxJava not only makes you more productive, but also helps you take on tasks you hesitated to do earlier. So let's get started!

 

About the Author

  • Thomas Nield

    Thomas Nield is a business consultant for Southwest Airlines in Schedule Initiatives, and a maintainer for RxJavaFX and RxKotlin. Early in his career, he became fascinated with technology and its role in business analytics. After becoming proficient in Java, Kotlin, Python, SQL, and reactive programming, he became an open source contributor as well as an author/speaker at O'Reilly Media. He is passionate about sharing what he learns and enabling others with new skill sets. He enjoys making technical content relatable and relevant to those unfamiliar with or intimidated by it.

    Currently, Thomas is interested in data science, reactive programming, and the Kotlin language. You may find him speaking on these three subjects and how they can interconnect.

    He has also authored the book Getting Started with SQL, by O'Reilly Media.

    Browse publications by this author

Latest Reviews

(23 reviews total)
Have not finished reading the book yet.
Well written useful book, good value for money.
Great book. Some of the best explications that I could find regarding JavaRx programming.

Recommended For You

Hands-On Design Patterns with Java

Understand Gang of Four, architectural, functional, and reactive design patterns and how to implement them on modern Java platforms, such as Java 12 and beyond

By Dr. Edward Lavieri
Hands-On Reactive Programming in Spring 5

Explore the reactive system and create efficient microservices with Spring Boot 2.1 and Spring Cloud

By Oleh Dokuka and 1 more
Reactive Programming in Kotlin

Learn how to implement Reactive Programming paradigms with Kotlin, and apply them to web programming with Spring Framework 5.0 and in Android Application Development.

By Rivu Chakraborty
Hands-On Microservices with Spring Boot and Spring Cloud

Apply microservices patterns to build resilient and scalable distributed systems

By Magnus Larsson