A quick exposure to RxJava
Before we dive deep into the reactive world of RxJava, here is a quick exposure to get your feet wet first. In ReactiveX, the core type you will work with is the Observable
. We will be learning more about the Observable
throughout the rest of this book. But essentially, an Observable
pushes things. A given Observable<T>
pushes things of type T
through a series of operators until it arrives at an Observer
that consumes the items.
For instance, create a new Launcher.java
file in your project and put in the following code:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> myStrings =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
}
}
In our main()
method, we have an Observable<String>
that will push five string objects. An Observable
can push data or events from virtually any source, whether it is a database query or live Twitter feeds. In this case, we are quickly creating an Observable
using Observable.just()
, which will emit a fixed set of items.
Note
In RxJava 2.0, most types you will use are contained in the io.reactivex
package. In RxJava 1.0, the types are contained in the rx
package.
However, running this main()
method is not going to do anything other than declare Observable<String>
. To make this Observable
actually push these five strings (which are called emissions), we need an Observer
to subscribe to it and receive the items. We can quickly create and connect an Observer
by passing a lambda expression that specifies what to do with each string it receives:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> myStrings =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
myStrings.subscribe(s -> System.out.println(s));
}
}
When we run this code, we should get the following output:
Alpha
Beta
Gamma
Delta
Epsilon
What happened here is that our Observable<String>
pushed each string object one at a time to our Observer
, which we shorthanded using the lambda expression s -> System.out.println(s)
. We pass each string through the parameter s
(which I arbitrarily named) and instructed it to print each one. Lambdas are essentially mini functions that allow us to quickly pass instructions on what action to take with each incoming item. Everything to the left of the arrow ->
are arguments (which in this case is a string we named s
), and everything to the right is the action (which is System.out.println(s)
).
If you are unfamiliar with lambda expressions, turn to Appendix, to learn more about how they work. If you want to invest extra time in understanding lambda expressions, I highly recommend that you read at least the first few chapters of Java 8 Lambdas (O'Reilly) (http://shop.oreilly.com/product/0636920030713.do) by Richard Warburton. Lambda expressions are a critical topic in modern programming and have become especially relevant to Java developers since their adoption in Java 8. We will be using lambdas constantly in this book, so definitely take some time getting comfortable with them.
We can also use several operators between Observable
and Observer
to transform each pushed item or manipulate them in some way. Each operator returns a new Observable
derived-off the previous one but reflects that transformation. For example, we can use map()
to turn each string emission into its length()
, and each length integer will then be pushed to Observer
, as shown in the following code snippet:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> myStrings =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
myStrings.map(s -> s.length()).subscribe(s ->
System.out.println(s));
}
}
When we run this code, we should get the following output:
5
4
5
5
7
If you have used Java 8 Streams or Kotlin sequences, you might be wondering how Observable
is any different. The key difference is that Observable
pushes the items while Streams and sequences pull the items. This may seem subtle, but the impact of a push-based iteration is far more powerful than a pull-based one. As we saw earlier, you can push not only data, but also events. For instance, Observable.interval()
will push a consecutive Long
at each specified time interval, as shown in the following code snippet. This Long
emission is not only data, but also an event! Let's take a look:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> secondIntervals =
Observable.interval(1, TimeUnit.SECONDS);
secondIntervals.subscribe(s -> System.out.println(s));
/* Hold main thread for 5 seconds
so Observable above has chance to fire */
sleep(5000);
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
When we run this code, we should get the following output:
0
1
2
3
4
When you run the preceding code, you will see that a consecutive emission fires every second. This application will run for about five seconds before it quits, and you will likely see emissions 0
to 4
fired, each separated by a just a second's gap. This simple idea that data is a series of events over time will unlock new possibilities in how we tackle programming.
On a side note, we will get more into concurrency later, but we had to create a sleep()
method because this Observable
fires emissions on a computation thread when subscribed to. The main thread used to launch our application is not going to wait on this Observable
since it fires on a computation thread, not the main thread. Therefore, we use sleep()
to pause the main thread for 5000 milliseconds and then allow it to reach the end of the main()
method (which will cause the application to terminate). This gives Observable.interval()
a chance to fire for a five second window before the application quits.
Throughout this book, we will uncover many mysteries about Observable
and the powerful abstractions it takes care of for us. If you've conceptually understood what is going on here so far, congrats! You are already becoming familiar with how reactive code works. To emphasize again, emissions are pushed one at a time all the way to Observer
. Emissions represent both data and an event, which can be emitted over time. Of course, beyond map()
, there are hundreds of operators in RxJava, and we will learn about the key ones in this book. Learning which operators to use for a situation and how to combine them is the key to mastering RxJava. In the next chapter, we will cover Observable
and Observer
much more comprehensively. We will also demystify events and data being represented in Observable
a bit more.