Hands-On Reactive Programming with Reactor

4 (1 reviews total)
By Rahul Sharma
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies

About this book

Reactor is an implementation of the Java 9 Reactive Streams specification, an API for asynchronous data processing. This specification is based on a reactive programming paradigm, enabling developers to build enterprise-grade, robust applications with reduced complexity and in less time. Hands-On Reactive Programming with Reactor shows you how Reactor works, as well as how to use it to develop reactive applications in Java.

The book begins with the fundamentals of Reactor and the role it plays in building effective applications. You will learn how to build fully non-blocking applications and will later be guided by the Publisher and Subscriber APIs. You will gain an understanding how to use two reactive composable APIs, Flux and Mono, which are used extensively to implement Reactive Extensions. All of these components are combined using various operations to build a complete solution.

In addition to this, you will get to grips with the Flow API and understand backpressure in order to control overruns. You will also study the use of Spring WebFlux, an extension of the Reactor framework for building microservices.

By the end of the book, you will have gained enough confidence to build reactive and scalable microservices.

Publication date:
September 2018
Publisher
Packt
Pages
250
ISBN
9781789135794

 

Chapter 1. Getting Started with Reactive Streams

Over the years, application architecture has evolved. Businesses increasingly need to build systems that remain responsive and can scale when required. Systems should also be maintainable and quickly releasable. In accordance with these needs, we have started to build applications as loosely coupled services. We no longer build a system as one big application. Instead, we split systems into multiple independent, autonomous services. The objective for such services is to do one thing, and do it well.

In this chapter, we will discuss concerns associated with building such services. We will also look at how to address those concerns.

 

Technical requirements 


  • Java Standard Edition, JDK 8 or above
  • IntelliJ IDEA IDE, 2018.1 or above

The GitHub link for this chapter is https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Reactor/tree/master/Chapter01.

 

Reactive architecture


When we start to build microservice architecture, we try to involve different services to deliver business solutions. We often build services as traditional API models, where each of the services can interact with other services. This is referred to as distributed architecture. If a distributed architecture is designed incorrectly, performance issues surface very quickly. It can be difficult to have numerous distributed services that work concurrently to deliver the intended performance. Companies that offer services requiring large data exchange (such as Netflix, Amazon, or Lightbend) have therefore seen a need for alternative paradigms, which can be used for systems with the following characteristics:

  • Consisting of very loosely coupled components
  • Responding to user inputs
  • Resilient to varying load conditions
  • Always available

In order to achieve the preceding characteristics, we need to build event-driven, modular services that communicate with each other by using notifications. In turn, we can respond to the system's flow of events. The modular services are more scalable, as we can add or remove service instances without halting the complete application. The complete architecture will be fault tolerant if we can isolate errors and take corrective actions. The preceding four characteristics are the basic principles of the Reactive Manifesto. The Reactive Manifesto states that each reactive system should consist of loosely coupled components that rely on asynchronous, message-driven architecture. They must remain responsive to user input and isolate failures to individual components. Replication must be done in order to respond to varying load conditions. The following is a diagram of the Reactive Manifesto:

The Reactive Manifesto describes a reactive system. It does not required that the system be based on reactive programming, or any other reactive library. We can build a message-driven, resilient, scalable, and responsive application without using a reactive library, but it is easier to build an application based on reactive libraries.

Reactive programming

Most of us write imperative applications, where statements are required in order to change the application state. The code is executed and a final state is arrived at. After the state's computation, the state does not change when the underlying factors do. Let's consider the following code as an example:

int value1 = 5;int value2 = 10;int sum = val1 + val2;System.out.println(sum); // 15value1 = 15;System.out.println(sum); // 15

The sum is still 15, even though value1 has been changed.

On the other hand, reactive programming is about the propagation of change. It is also referred to as declarative programming, where we express our intent and application state as dynamically determined by changes to underlying factors. The preceding sum program example, under a reactive paradigm, would behave as follows:

int value1 = 5;int value2 = 10;int sum = val1 + val2;System.out.println(sum); // 15value1 = 15;System.out.println(sum); // 25

Consequently, if a program reacts to changes in the underlying factors, it can be called reactive. Reactive programs can be built using imperative techniques, like callbacks. This may be fine for a program that has a single event. However, for applications where hundreds of events are happening, this could easily lead to callback hell; we could have numerous callbacks relying on one another, and it would be really difficult to figure out which ones were being executed. As a result, we require a new set of abstractions that enable us to seamlessly build asynchronous, event-driven interactions across a network boundary. There are libraries in different imperative languages, like Java, that provide us with these abstractions. These libraries are referred to as Reactive Extensions. 

 

ReactiveX


Reactive Extensions, also known as ReactiveX, enable us to express the asynchronous events in an application as a set of observable sequences. Other applications can subscribe to these observables, in order to receive notifications of events that are occurring. A producer can then push these notification events to a consumer as they arrive. Alternatively, if a consumer is slow, it can pull notification events according to its own consumption rate. The end-to-end system of a producer and its consumers is known as a pipeline. It is important to note that pipelines are lazy by default and do not materialize until they are subscribed to by a consumer. This is very different from eager Java types, like Future, which represent active work. The ReactiveX API consists of the following components:

  1. Observables:Observables represent the core concept of ReactiveX. They represent the sequences of emitted items, and they generate events that are propagated to the intended subscribers.
  1. Observer: Any application can express its intent for events published by an observable by creating an observer and subscribing to the respective observable. The intent is expressed in terms of the OnNext, OnCompleted, and OnError methods. Each observable sends a stream of events, followed by a completion event, which executes these methods.
  2. Operators:Operators enable us to transform, combine, and manipulate the sequences of items emitted by observables. The operators on an observable provide a new observable, and thus, they can be tied together. They do not work independently on the original observable; instead, they work on the observable generated by the previous operator to generate a new observable. The complete operator chain is lazy. It is not evaluated until an observer is subscribed to it. The complete chain is shown as follows:

ReactiveX provides the architecture design to build reactive applications. Individual libraries were built around it in different imperative languages to enable its use. These abstractions allow us to build asynchronous, non-blocking applications, and provide the additional benefits listed in the following sections.

Composite streams

In software design, composition refers to grouping different entities and treating each group as a single entity. Additionally, the single entity exhibits the same behavior as the type it refers to. ReactiveX streams are composite in nature. They make it possible to combine existing data streams, add transformations, and generate new data streams. Moreover, all of this can be done in a declarative manner, making the overall solution maintainable in the long run. 

Flexible operators

The libraries offer a range of operators for all kinds of functions. Each of the operators accomplishes its tasks similarly to that of a workstation on an assembly line. It takes input from the previous workstation and provides input to the next workstation. These operators offer all kinds of data transformation, stream orchestration, and error handlers.

ReactiveX makes its easier to build event-based applications. However, the framework does not present the ways in which different event-driven applications should interact with each other. In a microservice architecture consisting of numerous event-driven services, the gains made are often offset by the workarounds required for inter-process communication.

 

Reactive Streams


Reactive Streams is a specification that determines the minimum set of interfaces required to build the asynchronous processing of a large volume of unbounded data. It is a specification aimed at JVM and JavaScript runtime. The main goal of the Reactive Streams specification is to standardize the exchange of stream data across an asynchronous boundary of applications. The API consists of the following four interfaces:

  1. Publisher: The publisher is responsible for the generation of an unbounded number of asynchronous events and pushing those events to the associated subscribers.

  2. Subscriber: The subscriber is a consumer of the events published by a publisher. The subscriber gets events for subscription, data, completion, and error. It can choose to perform actions on any of them.

  3. Subscription: A subscription is a shared context between the publisher and subscriber, for the purpose of mediating the data exchange between the two. The subscription is available with the subscriber only, and enables it to control the flow of events from the publisher. The subscription becomes invalid if there is an error or a completion. A subscriber can also cancel the subscriptions, in order to close its stream.

  4. Processor: The processor represents a stage of data processing between a subscriber and a publisher. Consequently, it is bound by both of them. The processor has to obey the contract between the publisher and the subscriber. If there is an error, it must propagate it back to the subscriber.

 

Note

The Reactive Streams specification is the result of a collaborative effort of engineers from Kaazing, Netflix, Pivotal, Red Hat, Twitter, Typesafe, and many other companies.

While there are only four interfaces, there are around 30 rules that govern the data exchange between the publisher and the subscriber. These rules are based on the two principles covered in the following sections.

Asynchronous processing

Asynchronous execution refers to the ability to execute tasks without having to wait to finish previously executed tasks first. The execution model decouples tasks, so that each of them can be performed simultaneously, utilizing the available hardware.

The Reactive Streams API delivers events in an asynchronous manner. A publisher can generate event data in a synchronous blocking manner. On the other hand, each of the on-event handlers can process the events in a synchronously blocking manner. However, event publishing must occur asynchronously. It must not be blocked by the subscriber while processing events.

Subscriber backpressure

A subscriber can control events in its queue to avoid any overruns. It can also request more events if there is additional capacity. Backpressure enforces the publisher to bound the event queues according to the subscriber. Furthermore, a subscriber can ask to receive one element at a time, building a stop-and-wait protocol. It can also ask for multiple elements. On the other hand, a publisher can apply the appropriate buffers to hold non-delivered events, or it can just start to drop events if the production rate is more than the consumption rate.

It is important to note that the Reactive Streams API is aimed at the flow of events between different systems. Unlike ReactiveX, it does not provide any operators to perform transformations. The API has been adopted as a part of the java.util.concurrent.Flow package in JDK 9.

David Karnok's classification

David Karnok, a veteran of various reactive projects like Rxjava and Reactor, has categorized the evolution of reactive libraries into the following generations.

Zero generation

The zero generation revolves around the java.util.observable interface and the related callbacks. It essentially uses the observable design pattern for reactive development. It lacks the necessary support of composition, operators, and backpressure.

First generation

The first generation represents Erik Mejer's attempt to address reactive issues by building Rx.NET. This referred to implementations in the form of the IObserver and IObservable interfaces. The overall design was synchronous and lacked backpressure.

Second generation

The first generation deficiencies of backpressure and synchronous handling were handled in the second generation APIs. This generation refers to the first implementations of Reactive Extensions, such as RxJava 1.X and Akka.

Third generation

The third generation refers to the Reactive Streams specification, which enables library implementors to be compatible with each other and compose sequences, cancellations, and backpressure across boundaries. It also enables an end user to switch between implementations at their own will.

Fourth generation

The fourth generation refers to the fact that reactive operators can be combined in an external or internal fashion, leading to performance optimization. A fourth generation reactive API looks like a third generation, but internally, the operators have changed significantly to yield intended benefits. Reactor 3.0 and RxJava 2.x belong to this generation.

Fifth generation

The fifth generation refers to a future work, in which there will be a need for bidirectional reactive I/O operations over the streams.

 

Reactor


Reactor is an implementation completed by the Pivotal Open Source team, conforming to the Reactive Streams API. The framework enables us to build reactive applications, taking care of backpressure and request handling. The library offers the following features.

Infinite data streams

Reactor offers implementations for generating infinite sequences of data. At the same time, it offers an API for publishing a single data entry. This is suited to the request-response model. Each API offers methods aimed at handling the specific data cardinality.

Rather than waiting for the entire data collection to arrive, subscribers to each data stream can process items as they arrive. This yields optimized data processing, in terms of space and time. The memory requirement is limited to a subset of items arriving at the same time, rather than the entire collection. In terms of time, results start to arrive as soon as the first element is received, rather than waiting for the entire dataset.

Push-pull model

Reactor is a push-pull system. A fast producer raises events and waits for the slower subscriber to pull them. In the case of a slow publisher and a fast subscriber, the subscriber waits for events to be pushed from the producer. The Reactive Streams API allows this data flow to be dynamic in nature. It only depends on the real-time rate of production and the rate of consumption.

Concurrency agnostic

The Reactor execution model is a concurrency agnostic. It does not cover how different streams should be processed. The library facilitates different execution models, which can be used at a developer's discretion. All transformations are thread safe. There are various operators that can influence the execution model by combining different synchronous streams.

 

Operator vocabulary

Reactor provides a wide range of operators. These operators allow us to select, filter, transform, and combine streams. The operations are performed as a workstation in a pipeline. They can be combined with each other to build high-level, easy-to-reason data pipelines.

Reactor has been adopted in Spring Framework 5.0 to provide reactive features. The complete project consists of the following sub-projects:

  • Reactor-Core: This project provides the implementation for the Reactive Streams API. The project is also the foundation for Spring Framework 5.0 Reactive Extensions.
  • Reactor-Extra: This project complements the Reactor-Core project. It provides the necessary operators to work on top of the Reactive Streams API.
  • Reactor-Tests: This project contains utilities for test verification.
  • Reactor-IPC: This project provides non-blocking, inter-process communication. It also provides backpressure-ready network engines for HTTP (including WebSockets), TCP, and UDP. The module can also be used to build microservices.

Project setup

This book follows a hands-on approach; you will learn Reactor by working with examples. This chapter will set up the project that we will use throughout this book. Before we can move on, we will have to do some setting up. Please install the following items on your machine:

$ java -version
java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)
  • IntelliJ IDEA 2018.1 or above: We will be using the latest community edition of IntelliJ. You can download the latest version from the JetBrains website at https://www.jetbrains.com/idea/download/. We will be using version 2018.1.1.

  • Gradle: Gradle is a one of the most popular build tools in the JVM ecosystem. It is used for dependency management and for running automated tasks. You don't have to install Gradle on your local machine; we will use a Gradle wrapper that downloads and installs Gradle for your project. To learn more about Gradle, you can refer to the Gradle documentation at https://docs.gradle.org/current/userguide/userguide.html.

Now that we have all the prerequisites, let's create a Gradle project by using IntelliJ IDEA itself:

  1. Launch IntelliJ IDEA and you will see the following screen, where you can begin to create a project:

  1. Click on Create New Project to start the process of creating a Java Gradle project. You will see a screen for creating a new project. Here, select Gradle and Java, as shown in the following screenshot. You will also have to specify theProject SDK. Click on the New button to select JDK 8. Then, click on Next to move to the next screen:

  1. Now you will be asked to enter the GroupId andArtifactId. Click on Next to move to the next screen:

  1. The next screen will ask you to specify a few Gradle settings. We will select Use auto-import, so that Gradle will automatically add new dependencies when we add them to the build file. Click on Next to move to the final screen:

  1. On this screen, you will be asked for the location where you want to create the project. Select a convenient directory path for the application. Finally, click on Finish to complete the project creation process:

Now that the Java Gradle project has been created, we have to make a couple of changes in the Gradle build file, that is, build.gradle. Open the build.gradle file in IDE and change it to match the following contents:

plugins {
    id "io.spring.dependency-management" version "1.0.5.RELEASE"
}
group 'com.reactor'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
    mavenCentral()
}
dependencyManagement {
    imports {
        mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE"
    }
}
dependencies {
    compile 'io.projectreactor:reactor-core'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

In the preceding build.gradle file, we have done the following:

  1. Added the io.spring.dependency-management plugin. This plugin allows us to have a dependency-management section, for configuring dependency versions.
  2. Configured the dependency-management plugin to download the latest version of Reactor. We have used the maven BOM published by the Reactor project.
  3. Added the reactor-core dependency to the list of project dependencies.

That's all we need to do to start using Reactor.

Note

At the time of writing, Bismuth-RELEASE was the latest version of Reactor.

Now, let's build a simple test case to see how we can work with the Reactor API. We will build a simple test case for generating Fibonacci numbers. Wikipedia defines Fibonacci numbers as follows:

"In mathematics, the Fibonacci numbers are the numbers in the following integer sequence, called the Fibonacci sequence, and characterized by the fact that every number after the first two is the sum of the two preceding ones: 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , 55 , 89 , 144, ..."

Let's build our test for the Fibonacci generation. The test case will start to generate a series, from 0 and 1. It will generate the first 50 Fibonacci numbers, and will validate the 50th number as 7778742049:

@Test
public void testFibonacci() {
  Flux<Long> fibonacciGenerator = Flux.generate(
    () -> Tuples.<Long, Long>of(0L, 1L),
     (state, sink) -> {
       sink.next(state.getT1());
       return Tuples.of(state.getT2(), state.getT1() + state.getT2());
     });
     List<Long> fibonacciSeries = new LinkedList<>();
     int size = 50;
     fibonacciGenerator.take(size).subscribe(t -> {
       fibonacciSeries.add(t);
     });
     System.out.println(fibonacciSeries);
     assertEquals( 7778742049L, fibonacciSeries.get(size-1).longValue());
}

In the preceding test case, we are doing the following:

  1. We create Fibonacci as Flux<Long>, by using the Flux.generate() call. The API has a State and Sink. For now, we will leave the Flux API details for the next chapter.
  2. The API takes a seed as Tuple [0 , 1]. It then emits the first argument of the pair by using the Sink.next() call.
  1. The API also generates the next Fibonacci number by aggregating the pair.
  2. Next, we select the first 50 Fibonacci numbers by using the take() operator.
  3. We subscribe to the published numbers, and then append the received number to a List<Long>.
  4.  Finally, we assert the published numbers.

In the preceding test case, we have used a number of Rector features. We will cover each of them in detail in our subsequent chapters. For now, let's execute the test case and check that our project is running fine.

Running our unit test should give us a green bar, as follows:

 

Summary


In this chapter, we discussed the need for a reactive paradigm. We also looked at the evolution of the paradigm, from reactive programming to Reactive Extensions and then Reactive Streams. Furthermore, we discussed the Reactive Streams specification as a specification aimed at JVM for the following:

  • Processing a potentially unbounded number of elements in a sequence
  • Asynchronously passing elements between components with mandatory non-blocking backpressure

At the end of the chapter, we covered Reactor, an implementation by the Pivotal team, and built a sample project with it. In the next chapter, we will discuss the APIs available in Reactor.

 

 

 

Questions


  1. What are the principles of the Reactive Manifesto?
  2. What are Reactive Extensions?
  3. What does the Reactive Stream specification cater for?
  4. What are the principles upon which Reactive Streams are based?
  5. What are the salient features of the Reactor Framework?
 

Further reading


About the Author

  • Rahul Sharma

    Rahul Sharma is passionately curious about teaching programming. He has been writing software for the last two years. He got started with Rust with his work on Servo, a browser engine by Mozilla Research as part of his GSoC project. At present, he works at AtherEnergy, where he is building resilient cloud infrastructure for smart scooters. His interests include systems programming, distributed systems, compilers and type theory. He is also an occasional contributor to the Rust language and does mentoring of interns on the Servo project by Mozilla.

    Browse publications by this author

Latest Reviews

(1 reviews total)
Excellent. Good book on a new topic

Recommended For You

Hands-On Microservices with Spring Boot and Spring Cloud

Apply microservices patterns to build resilient and scalable distributed systems

By Magnus Larsson
Java Coding Problems

Develop your coding skills by exploring Java concepts and techniques such as Strings, Objects and Types, Data Structures and Algorithms, Concurrency, and Functional programming

By Anghel Leonard
Hands-On Design Patterns with Java

Understand Gang of Four, architectural, functional, and reactive design patterns and how to implement them on modern Java platforms, such as Java 12 and beyond

By Dr. Edward Lavieri
Python Machine Learning - Third Edition

Applied machine learning with a solid foundation in theory. Revised and expanded for TensorFlow 2, GANs, and reinforcement learning.

By Sebastian Raschka and 1 more