Hands-On Reactive Programming with Python

5 (1 reviews total)
By Romain Picard
  • Instant online access to over 8,000+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. An Introduction to Reactive Programming

About this book

Reactive programming is central to many concurrent systems, but it’s famous for its steep learning curve, which makes most developers feel like they're hitting a wall. With this book, you will get to grips with reactive programming by steadily exploring various concepts

This hands-on guide gets you started with Reactive Programming (RP) in Python. You will learn abouta the principles and benefits of using RP, which can be leveraged to build powerful concurrent applications. As you progress through the chapters, you will be introduced to the paradigm of Functional and Reactive Programming (FaRP), observables and observers, and concurrency and parallelism. The book will then take you through the implementation of an audio transcoding server and introduce you to a library that helps in the writing of FaRP code. You will understand how to use third-party services and dynamically reconfigure an application.

By the end of the book, you will also have learned how to deploy and scale your applications with Docker and Traefik and explore the significant potential behind the reactive streams concept, and you'll have got to grips with a comprehensive set of best practices.

Publication date:
October 2018
Publisher
Packt
Pages
420
ISBN
9781789138726

 

Chapter 1. An Introduction to Reactive Programming

This first chapter covers the principles of reactive programming and ReactiveX. It is composed of three parts. The first part explains what reactive programming is and how it compares to other concepts and paradigms that are often used in event-driven programming. The second part explains the foundations of ReactiveX and RxPY, its Python implementation. This exploration of RxPY is explained with a simple example that allows us to understand the basics of ReactiveX. Finally, the last part is dedicated to the documentation of the ReactiveX project and the documentation of your projects. By the end of this chapter, you will be able to write some ReactiveX code and understand existing code. 

The following topics will be covered in this chapter:

  • What is reactive programming?
  • An introduction to ReactiveX and RxPY
  • A reactive echo application
  • Marble diagrams
  • Flow diagrams
 

What is reactive programming?


Reactive programming has gained a lot of popularity since 2010. Although its concepts and usage date from the early days of computing, this recent popularity is mainly due to the publication of the ReactiveX project. This might seem surprising for developers who had rarely used event-driven programming before. However, for people who faced tremendous state machines or callback hell, this seems more of an inevitable fact.

 

Before playing with ReactiveX and RxPY, this first part describes the principles being used in reactive programming and how they are used in asynchronous frameworks. This initial study of low-level features is not strictly necessary when using ReactiveX and asynchronous frameworks, but it helps a lot to understand how they work, which thus helps us to use them correctly.

Event-driven programming

What is the common connection between state machines, Petri net, Kahn Process Networks (KPN), the observer design pattern, callbacks, pipes, publish/subscribe, futures, promises and streams? Event-driven programming!

By definition, any program has to deal with external events through inputs/outputs (I/O). I/O and event management are the foundations of any computer system: reading or writing from storage, handling touch events, drawing on a screen, sending or receiving information on a network link, and so on. Nothing useful can be done without interacting with I/O, and I/O are almost always managed through events. However, 50 years after the creation of the first microprocessor, event-driven programming is still a very active topic with new technologies appearing almost every year.

 

The main purpose of this important activity is that, despite the fact that event-driven programming has existed since the beginning of computer science, it is still hard to use correctly. More than writing event-driven code, the real challenge lies in writing readable, maintainable, reusable, and testable code. Event-driven programming is more difficult to implement and read than sequential programming because it often means writing code that is not natural to read for human beings—instead of a sequence of actions that execute one after the other until the task to execute is completed, the beginning of an action starts when an event occurs, and then the actions that are triggered are often dispersed within the program. When such a code flow becomes complex—and this starts only after few indirect paths in a code—then it becomes more and more difficult to understand what is happening. This is what is often called the callback hell. One has to follow callbacks calling callbacks, which call further callbacks, and so on.

During the late nineties, event-driven programming became quite popular with the advent of graphical user interfaces (GUIs). At that time, developers had the following options to write GUI applications:

  • Objective-C on NextStep and macOS
  • C++ on Windows
  • C or eventually C++ on Unix (with X11)
  • Java, with the hope of writing the same application for all these systems

All these environments were based on callbacks and it stayed that way for a very long time, until programming languages included features to improve the readability of event-driven code.

So event-driven code is often more difficult to read than sequential code because the code logic can be difficult to follow, depending on the programming language and the frameworks being used. Reactive programming, and more specifically ReactiveX, aims at solving some of these challenges. Python and its relatively recent support of async/await syntax also aims to make event-driven programming easier.

It is important to understand that reactive programming and event-driven programming are not programming paradigms, such as imperative or object-oriented programming, but are orthogonal to them. Event-driven programming is implemented within an existing paradigm. So, one can use event-driven programming with an object-oriented language or a functional language. Reactive programming is a specific case of event-driven programming. This can be seen in the following figure:

Figure 1.1: Event-driven programming and programming paradigms

Reactive versus proactive

So what is reactive programming?

 

An easy way to get the idea behind it is to use an analogy with people's behavior: someone who is proactive is somebody who takes initiatives. A proactive person will propose ideas or test things before somebody asks him to do so. On the other hand, a reactive person is somebody who waits for information before doing something. So,  a proactive person acts on his own while a reactive person reacts to external changes. There are pros and cons to each behavior: a proactive person proposing solutions ahead of time is great, but may have difficulties dealing with unexpected changes. On the other hand, a reactive person may be very efficient in dealing with very dynamic environments.

Reactive programming can be considered as implementing a behavior in a similar way to a reactive person. A reactive system reacts on external events and provides a result that depends only on the event it has received. So why would reactive programming be better than sequential programming? Better is always a matter of preferences and context, so reactive programming may not be the solution most adapted to all the problems you will encounter. However, as you will see through this book, reactive programming shines at implementing event-driven code.

Reactive programming is inherently asynchronous. So it makes it easier to deal efficiently with inputs and outputs than with synchronous paradigms. Reactive programming favors composition. Each component is completely independent from another and can be plugged in with other components. This also makes testing quite easy and, as a consequence, it also helps to refactor existing code. Moreover, it is quite engaging, and with experience you will see that almost everything is a flow of events.

Reactor and proactor

When looking for information on event-driven programming concepts, two other similar terms are often mentioned: the reactor and the proactor. These are notions that are not really important when using high-level libraries such as ReactiveX and AsyncIO. Still, it is interesting to know what they are so that you can better understand what is going on under the hood. They are two kinds of low-level APIs that allow us to implement an event-driven library. For example, AsyncIO, which is the Python asynchronous API which can use a reactor or a proactor to expose the same APIs. Using a reactor or a proactor as the foundation of a framework is driven by the support, or not, of the proactor on the operating system. All operating systems support a reactor via the POSIX select system call or one of its derivatives. Some operating systems such as Windows implement proactor system calls. The difference between these two design patterns is the way I/O are managed.

 

Figure 1.2 shows a sequence diagram of how a reactor works. The three main components involved in this pattern are as follows:

  • Reactor
  • Event handler
  • Event demultiplexer

When the Main Program needs to execute an asynchronous operation, it starts by telling it to the Reactor, with the identification of the Concrete Event Handler that will be notified when an event occurs. This is the call to the register_handler. Then the Reactor creates a new instance of this Concrete Event Handler and adds it to its handler list. After that, the Main Program calls handle_events, which is a function that blocks until an event is received. The Reactor then calls the Event Demultiplexer to wait until an event happens. The Event Demultiplexer is usually implemented through the select system call or one of its alternatives, such as poll, epoll, or kqueue. select is called with the list of handles to monitor. These handles are associated with the handlers that were registered before. When an event that corresponds to one of the handles occurs, the Event Demultiplexer returns the list of handles that correspond to the event that happened. The Reactor then calls the associated event handlers, and the event handlers implement the actual service logic. The following diagram demonstrates this:

Figure 1.2: The reactor design pattern principles

Figure 1.3 shows a sequence diagram of how a Proactor works. On a Proactor system, asynchronous operations can be executed by an Asynchronous Operation processor. This is an entity which is provided by the operating system, and not all operating systems have support for it. There are more components involved in a Proactor than in a reactor. First, an Initiator asks the Asynchronous Operation processor to execute and operate. With this request, the Initiator provides the information about the operation to execute, as well as the instance of the Completion Handler and Completion Event Queue associated to the operation. Then the Asynchronous Operation processor creates an operation object that corresponds to the request of the Initiator. After that, the Initiator asks the Proactor to execute all pending operations. When an event associated with one of the operations happens, the operation notifies the Asynchronous Operation processor about it. The Asynchronous Operation processor then pushes this result to a Completion Event Queue. At that point, the Completion Event Queue notifies the Proactor that something happens. The Proactor then pops the next event from the Completion Event Queue and notifies the Completion Handler about this result. The Completion Handler finally implements the actual service logic.

On a Proactor, the Initiator and the Completion Handler may be implemented in the same component. Moreover, this chain of actions can be repeated indefinitely. The implementation of the Completion Handler can be an Initiator that starts the execution of another Asynchronous Operation. This is used to chain the execution of asynchronous operations:

Figure 1.3: The proactor design pattern principles

As you can see, both patterns are somehow similar. They are both used to execute asynchronous operations. The main difference is in the way operations can be chained. On a reactor, asynchronous operations can be chained only by the main program once the blocking call to the event demultiplexer has been completed. On the other hand, a proactor allows the completion handlers to be initiators and so execute themselves new asynchronous operations.

Being aware of this is important because it allows us to understand what is going on behind the scenes. However, this is completely invisible with all the recent asynchronous frameworks. They all expose APIs whose behavior is similar to the proactor pattern because they allow us to easily chain asynchronous operations while others are still pending. However, they will still use a proactor depending on the operating system that you use. On some frameworks, when both the reactor and proactors are available, it is possible to select what pattern to use via configuration APIs.

Reactive systems

This book will cover many aspects on reactive programming. But an important thing to be aware of is that using reactive programming does not mean implementing a reactive system. A reactive system is much more than implementing a component with asynchronous and reactive programming. These are two notions that may be easily mingled due to the similarities in the way they are named, but they are completely different. As already explained, reactive programming is a way to code. On the other hand, a reactive system is an architecture pattern that allows us to write robust systems; that is, applications that are made of many components communicating via network channels, and with instances running on several (many?) servers, virtual machines, or containers. This architecture pattern is described in the reactive manifesto (https://www.reactivemanifesto.org/).

 

The four pillars of the reactive system are shown in the following figure:

Figure 1.4: A reactive system

These four pillars are interdependent. The value of a reactive system is being reactive thanks to an elastic and resilient design. A reactive system relies on a message-driven communication between the components of the system. More specifically:

  • A reactive system is responsive: It responds to events and user interactions rapidly and consistently. Responsiveness ensures that the application stays usable, and that, in case of a problem, these problems can be detected very quickly and thus handled correctly. Responsiveness is achieved thanks to the three other pillars of a reactive system.
  • A reactive system is resilient: The system stays responsive even in the event of failure. Resilience can be achieved in several ways, such as replication and isolation. Failures are handled and contained in each component. Other components are dedicated to recover the components that failed and replication allows it to provide high-availability.
  • A reactive system is elastic: The system stays responsive when the workload varies. The system can adapt to workloads that increase or decrease so that the allocated resources of the system are not oversized or undersized. In order to provide elasticity, the design must be vertically and horizontally scalable, with no performance bottleneck.
  • A reactive system is message-driven: The different components of the system communicate via asynchronous message channels. Communication via messages allows us to isolate components. Saturation is controlled via back-pressure.

As you can see, reactive programming does not provide these four pillars in itself. Reactive programming is one of the tools that can be used to implement a reactive system, but it is not sufficient. Many other tools, such as message brokers, containers, orchestrations, or monitoring tools are needed to build a reactive system.

 

Introduction to ReactiveX and RxPY


ReactiveX is a library which aims to make asynchronous programming easy. As the header of the project's website says, it is:

The Observer pattern done right

. ReactiveX is a library based on the idea of observable streams. A stream is an entity that emits zero, one, or several items, over a period of time. This stream of items can be observed by other entities that are interested in receiving these items and manipulated by them. This simple idea is the basis of what has become an incredibly successful way of doing asynchronous programming.

As said in the very first paragraph of this book, asynchronous programming is a very active field. ReactiveX is a typical example of technologies that did not exist a few years ago but that are now heavily used. It was originally one of the components of the Volta project at Microsoft. This project consisted of a set of developer tools to help with developing client and server parts of web applications. The Volta project was suspended in 2008 but ReactiveX continued to be developed, up to the point when it was publicly released for the .NET platform in 2010. The library was very successful, with a community starting to grow up and big companies such as Netflix and GitHub using it. In 2012, implementations for .NET, JavaScript, and C++ were published as open source projects. Since that time, ReactiveX has even impacted the standardization of some programming languages. ReactiveX now has official implementations for almost 20 programming languages and was the foundation of the Java reactive streams (http://www.reactive-streams.org) standard and the EcmaScript observable (https://github.com/tc39/proposal-observable) API. Nowadays, many other libraries, heavily inspired by ReactiveX, are available for virtually any programming language.

All of this is based on concepts that have already existed for many years, such as the observer design pattern, the iterator design pattern, and some principles from functional programming. The ingenuity came from combining them in such a way that it avoids the callback hell. Even better, it is equally suited for frontend applications that deal with user events and GUI widgets, and backend applications that work with network and database requests.

 

 

 

ReactiveX principles

ReactiveX is based on two entities: observables and observers. These are the only things that one needs to understand to be able to start writing code. Everything else is based on the behavior of one of these two entities.

Observables represent a source of events. An observable is an entity that can emit zero or one of several items. An observable has an explicit lifetime with a start and an end. When an observable completes or faces an error, it cannot send items anymore; its lifetime has ended. An observable may never end. In this case, it is an infinite source of events. Observables are a way to manage sequences of items in an asynchronous way. Table 1.1, which follows, shows a comparison between how to access items in a synchronous or asynchronous way. As you can see, observables fill a gap and are allowed to operate on multiple items in an asynchronous way.

Single item

Multiple items

Synchronous

Getter

Iterable

Asynchronous

Future

Observable

Table 1.1: Accessing an asynchronous sequence of items if possible

Observables work in push mode, as opposed to the pull mode of an iterable. Each time a new item is available, the observable pushes it to its observer. Table 1.2 shows the difference between the pull mode of an iterator and the push mode of an observable. This is what makes the behavior reactive and easy to handle with asynchronous code: whether items are emitted immediately or later is not important to the observer receiving it, and the code semantic is very similar to the one used in synchronous code:

Event

Iterable (pull)

Observable (push)

Retrieve data

For i in collection

on_next

Error

Exception is raised

on_error

Complete

End of loop

on_completed

Table 1.2 : Observables are push based

Observers are the receiving part of the items. An observer subscribes to an observable so that it can receive items emitted by this observable. Just as the observable emits items one after another, an observer receives them one after another. The observable informs the observer of the end of the sequence, either by indicating that the observable has completed (successfully) or by indicating that an error has occurred. These two kinds of completion are notified in a similar way, and so can be handled in a similar way. With ReactiveX, the error management is not a special case, but on a par with the items and completion management. In contrast to iterables that use exceptions, there is no radically different way of handling success from failure.

The implementation of RxPY (as well as all other implementations of ReactiveX) involves two other entities: a subscription function and a disposable object. Figure 1.5 shows a simplified representation of these entities. The AnonymousObservable class is the class that is almost always used to create an observable (directly or via another subclass). This class contains two methods to manage the lifetime of the observable and its observer. The first one, init, is not even a method but the constructor of the class. It takes a subscription function as an input argument. This subscription function will be called when an observer subscribes to this observable. The observable constructor returns a disposable function that can be called to free all resources used by the observable and observer. The second method of the AnonymousObservable class is subscribe. This is the method that is used to attach an observer to an observable and start the observable; that is, to make it start emitting items. The AnonymousObservable class can be used directly, but there are many cases where using an existing RxPY AnonymousObservable subclass is easier. This is typically the case when you need to create an observable from an iterable object or a single object.

The Observer class is a base class that contains three methods. They correspond to the behavior explained previously. This class must be subclassed to implement these three methods. The method on_next is called each time an item is emitted by the observable. The method on_completed is called when the observable completes successfully. Finally, the method on_error is called when the observable completes because of an error. The on_item method will never be called after the on_completed or the on_error methods.

 

 

The subscription entity is a function that takes an observer as input parameter. This function is called when the subscribe method of the observable is called. This is where the emission of items is implemented. The emission of these items can be either synchronous or asynchronous. Items are emitted in a synchronous way if the subscription function directly calls the on_items methods of the observable. But items can also be emitted asynchronously if the observer instance is saved and used later (after the subscription function returns). The subscription function can return a Disposable object or function. This Disposable object will be called when the observable is being disposed.

Finally, the Disposable class and its associated dispose function are used to clean up any resources used by an Observer or a subscription. In the case of an asynchronous observable, this is how the subscription function is notified that it must stop emitting items, because Observer is no more valid after that. The following figure shows these components:

Figure 1.5: RxPY components

Let's try to make more sense of all these definitions. The following figure shows a sequence diagram of how these calls are organized when an observable is created, subscribed, and finally disposed:

 

Figure 1.6: RxPY dynamics. An example of the creation of a synchronous observable and its subscription and disposal

First the application creates an AnonymousObservable and provides the subscription function associated to this observable. It then creates an observer object (actually a subclass of Observer). After that, the observable is subscribed, with reference to the observer object provided as an input parameter. During the call to subscribe, the subscription function is called. In this example, the subscription function is synchronous: it emits three items (the integers 1, 2, and 3), and completes the observable. When the subscription function returns, the observable is already completed. At that point, the subscribe method of AnonymousObservable returns the Disposable function to the application. The application finally calls this dispose function to clean up any resource still used by subscription and observer.

 

Operators

Now the principles of observables and observers should start to be more clear. However, you may wonder: how can this make development of asynchronous code easier? After all, what was described in the previous section is almost exactly the description of the observer design pattern, with additions for the management of completion and errors. The answer is that this is not the whole story, but the foundation of an extensible framework. What made ReactiveX different from other frameworks is its inspiration from functional programming, with the availability of many operators that can be chained in a pipeline. In the RxPY implementation, operators are methods of the Observable class. On some implementations of ReactiveX, they are implemented as functions which allow us to add new operators very easily. Look at the following example of pseudo-code:

Observable.from_(...)
    .filter()
    .distinct()
    .take(20)
    .map(...)

This is what a ReactiveX code looks like. It is a succession of operators that modify the items flowing through them. An operator can be seen as a monad: a construction that encapsulates program logic instead of data. (I apologize to functional programmers for this very simplistic definition.) An operator takes an observable as input and returns another observable. So, we can consider the observable data type as an abstract type that is used to compose functions together. Some operators accept other input parameters to provide additional program logic that will be executed on each item received on the operator. For example, the map operator that will be used later takes a function as a parameter. This function contains the code logic that will be executed on each item sent on the input observable.

Operators are not limited to the code logic of items. Since they work on the observable data type, they can also be used to manage observables' completion and errors. For example, some operators use completion events to chain observables one after another. Other operators use error events to gracefully manage these errors.

The RxPY implementation contains about 140 operators. We will cover the most used, which is about half of them. As you will be writing RxPY code, you will also be writing your own operators, even if they will not be directly usable in the kind of pipeline that we saw earlier. As we will see, writing a custom operator is the way to factorize code and make functions simpler when using ReactiveX.

 

Installating RxPY

RxPY is available as a Python package published on PyPI (https://pypi.org/), so it can be installed with pip. Depending on your operating system, you may already have pip installed. Otherwise refer to the pip installation documentation (https://pip.pypa.io/en/stable/installing/).

By following the examples in this book, you will install several libraries and sample packages that you may not want to keep on your system. In order to avoid cluttering your environment, you should run all these examples in a virtualenv. virtualenv is a tool that allows you to create isolated Python environments. With it you can create many different Python environments, potentially using different Python interpreters that are independent from each other. This is a great tool when developing in Python because it allows you to:

  • Test libraries without installing them on your system
  • Test some code with different versions of Python interpreters
  • Test some code with different versions of dependency packages
  • Have one independent and reproducible execution environment per project

If you are not already familiar with it, I encourage you to use it when running the example code that we will use. The installation of virtualenv is very easy via PyPI, using the following command:

pip3 install virtualenv

Creating a new virtual environment is also quite easy, in a single line with three parameters, using the following line of code:

virtualenv --system-site-packages -p /usr/local/bin/python3 venv-rx

The--system-site-packagesoption indicates that we want to use system packages when they are already installed. If you omit this parameter you will create a complete Python environment from scratch. This would be better for isolation but requires much more space because each dependency will be reinstalled in virtualenv. In our case, we do not need a strict isolation from the system; we only want to avoid installing new packages in our system. The-poption indicates what the Python interpreter will use. You should adapt it to your environment. The/usr/local/binvalue corresponds to a macOS system. On Linux, the cpython interpreter is usually located at/usr/bin. Finally thevenv-rxparameter is the name of virtualenv created. After running this command you will see a new directory namedvenv-rx.

 

Once virtualenv is created, you can enter it and leave it. Once you have entered virtualenv, all actions done by the Python interpreter are done in this isolated environment. Entering into virtualenv is done by sourcing a script in its bin directory, as can be seen in the following code:

source venv-rx/bin/activate
(venv-rx)$

When you are inside virtualenv, the name of virtualenv is printed in parentheses before the shell prompt. This is how you know if you are inside an isolated environment or on your system environment. Leaving virtualenv is done by executing the deactivate function, as can be seen in the following code:

(venv-rx)$ deactivate
$

From that point, the rx package can be installed via the pip tool, as can be seen in the following code:

$ source venv-rx/bin/activate
(venv-rx)$ pip install rx

If you want to install the latest development version, you can also install it directly from the GitHub sources. First clone the repository, and then install RxPY from the local sources, as demonstrated in the following code:

(venv-rx)$ git clone https://github.com/ReactiveX/RxPY.git
(venv-rx)$ cd RxPy.git
(venv-rx)$ python3 setup.py install
 

A reactive echo application


After this short introduction to ReactiveX and RxPY, the time has come to see some concrete code and write a first example. This first RxPY application is a command line interface (CLI) program that echoes the parameters that are provided as input. Save the following code in a file called echo1.py, or use the echo1.py script from the Git repository of this book, as shown in the following code:

import sys
from rx import Observable

argv = Observable.from_(sys.argv[1:])

argv.subscribe(
    on_next=lambda i: print("on_next: {}".format(i)),
    on_error=lambda e: print("on_error: {}".format(e)),
    on_completed=lambda: print("on_completed"))

Ensure that you are running in virutalenv, as shown in the following code:

$ source venv-rx/bin/activate

And when you run it, you should see the following output:

(venv-rx)$ python3 echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed

We ran the program with three parameters (hello, world, and !), and it printed these three parameters as well as information on the end of Observable. Let's detail each line of this program. We will start by importing the modules that we will use, as in the following code:

import sys
from rx import Observable

The sys module allows us to access the command line arguments. The rx module is the name of the RxPY package, which we installed from pip. We do not import the complete rx module, but just the Observable class. In many cases we will only need this class, or a few other ones. Then we can create an observable from the command line arguments, as in the following example:

argv = Observable.from_(sys.argv[1:])

sys.argv is a list containing the command line arguments that were used to run the program. The first argument is the name of the script being executed. In this case its value is echo1.py. Since we do not want to use this argument we omit it with a slice, using the second up to the last argument of the list. An observable is created from this list with the from_ creation operator. This operator creates an observable from a Python iterable object, which is the case of our argument list. We affect the reference of this observable to the argv variable. So, argv is a reference to an observable that will emit items containing the arguments provided on the command line, one item per argument. After this affectation the observable is created, but does not emit any item yet; items are emitted only once the observable is subscribed. On the last part of the program, we subscribe to this observable and print text depending on the event being received, as can be seen in the following code:

argv.subscribe(
    on_next=lambda i: print("on_next: {}".format(i)),
    on_error=lambda e: print("on_error: {}".format(e)),
    on_completed=lambda: print("on_completed"))

Three callback arguments are provided to the subscribe method: on_next, on_error, and on_completed. They are all optional, and they correspond to the reception of the associated events. As already explained, the on_next callback will be called zero or more times, and the on_error and on_completed callbacks can be called once at the most (and never if the observable never ends, which is not the case here). The call to the subscribe method is the one that makes the argv observable start emitting items. In this simple application, the code of each callback is very simple, so we use lambda instead of functions.

Lambdas are anonymous functions; that is, functions that can be referenced only from where they are defined, because they have no name. However, lambdas have restrictions over functions, which makes them only suitable when simple manipulations are done with the data:

  • Lambdas can use only expressions, not statements
  • Lambdas contain only one expression
  • Lambdas cannot declare or use local variables

So, lambdas are very useful when you need to do an action on one or several input parameters. For more complex logic, writing a function is mandatory. Lambdas are used a lot when developing RxPY code because many operators take functions as input. So, such operators are functions that accept functions as input parameters. Functions that accept functions as input are called higher order functions in functional programming and this is another aspect of functional programming used a lot in ReactiveX.

As you can see, the on_next callback is called once for each argument provided on the command line, and the on_completed callback is called right after. In this example application, we use a synchronous Observable. In practice, this means that all items are emitted in the context of the subscribe call. To confirm this, add another print statement after the subscribe call:

argv.subscribe(
    on_next=lambda i: print("on_next: {}".format(i)),
    on_error=lambda e: print("on_error: {}".format(e)),
    on_completed=lambda: print("on_completed"))
print("done")

Then run the program again. You should see the following output:

(venv-rx)$ python3 ch1/echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
done

As you can see, the done print is displayed after the observable completes because the observable emits all its items during the call to subscribe.

We will now add some functionality to this echo application. Instead of simply printing each argument, we will print them with the first letter in uppercase. This is the typical case of an action that must be applied to each item of an observable. In the current code, there are two possible locations to do it:

  • Either by using an operator on the argv observable
  • By modifying the on_next callback in the subscribe call

In a real application, there will usually be only a single place where the action must be applied, depending on whether the action must be done in the observer or on the observable directly. Implementing the action in the observer allows you to isolate the change to this single observer. The other way of implementing the action on the observable allows you to share this behavior with several observers. Here we will implement the action on the observable with the map operator. Modify the code as in the following example, or use the echo2.py script from the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of the book:

argv = Observable.from_(sys.argv[1:]) \
    .map(lambda i: i.capitalize())

The map operator takes an observable as input, applies a transformation function on each item of this observable, and returns an observable which contains all input items, with the transformation applied to them. Here we have used lambda that returns the item (a string) capitalized; that is, with its first letter in uppercase. If you run this new code you should get the following output:

(venv-rx)$ python3 ch1/echo2.py hello world !
on_next: Hello
on_next: World
on_next: !
on_completed
done

As you can see, the output is the same, but with capitalized names. Congratulations; you have just written your first reactive application!

 

Marble diagrams


The ReactiveX project is composed of several hundreds of operators, and the Python implementation contains about 140 of them. When writing ReactiveX code, especially in the early days, you should regularly refer to the operator's documentation. Do not hesitate to read through it to find the most adapted operator for your needs. This is a good way to discover operators that you may use later, and you may find an operator doing exactly what you want to implement.

Unfortunately, this documentation is often difficult to understand. A representative of this situation is the FlatMap (http://reactivex.io/documentation/operators/flatmap.html) operator description:

"Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable"

This simple description is—obviously—correct, but may not be easy to comprehend. This is why there is also a more detailed explanation on how the operator works. In the case of the flat_map operator, the behavior is to take an observable as input and apply a transformation on each item of this observable. The transformation is done by a function also provided as an input of the operator. This function is called for each item emitted by the input observable and returns an observable at each execution (that is, for each item). Finally, flat_map merges all the observables returned by the transformation function in a single output observable.

This is a very detailed description of how the operator works, but even when you are used to the ReactiveX terminology, you need to spend some time carefully reading each word of the documentation to understand what it means. Marble diagrams are a clever application of the a figure is worth a thousand words idiom. They represent graphically an example of the behavior of an operator. All operators are documented with their descriptions, and a marble diagram. Most of the time, the marble diagram allows you to understand the behavior of the operator without even having to read the documentation.

Figure 1.7 shows the structure of a marble diagram. A marble diagram is composed of three parts: input timelines, a transformation description, and output timelines:

Figure 1.7: Marble diagram

A timeline represents an observable state according to the time that goes on. It is drawn as an arrow going from left to right. Each item emitted on the observable is drawn as a circle. The official documentation uses different shapes and colors to represent items. In this book, we will only use circles with labels. Depending on the operator, there may be one or several input timelines.

Below the input timeline is the transformation description drawn as a rectangle. Some brief information about the transformation being performed is printed inside this rectangle. There are dashed arrows between the input timeline items and the transformation description each time the transformation is applied to an item or some items.

Finally, below the transformation description are the output timelines. They represent the result observables of the operator. In the example of Figure 1.7 each input letter is transformed to uppercase, so a becomes A, b becomes B, and j becomes J. Just like input timelines, most of the time there is only one output timeline. But some operators may return several observables, and some return observables of observables. This is shown in the following figure, in which for each item a new observable is generated, emitting the next three letters:

Figure 1.8: Observable of observable

An observable of observables is an observable that emits items that are themselves observables. While this may surprise at first, this is just one level of composition; an observable can convey items of any type, including observables. So, at some point you may work with several layers of observables being composed this way. Such observables are called higher order observables in this book, borrowing the naming convention used for higher order functions.

The last thing to understand in marble diagrams is the way timelines can end. An observable lifetime ends either on completion (that is, on success) or on error. These are respectively represented as a vertical line and as a cross, as shown in the following figure:

Figure 1.9: The end of an observable. On the left an observable that completes successfully and on the right an Observable that terminates in an error

The map operator

Let's see a real example with the map operator. This operator takes a source observable as input and returns an observable as output. It applies a function to each item of the source observable, and emits the result of this function on the output observable. Hopefully, its marble diagram, shown in the following figure, should make the description much more clear:

Figure 1.10: The map operator

The marble diagram shows an input timeline with three integer items: 1, 12, and 7. The transformation is described with syntax similar to that of the Python lambda. Note that the official ReactiveX documentation uses the JavaScript arrow function notation, because it uses marble diagrams from the JavaScript implementation. In this example, the transformation is a multiplication by 3 of the source item. The output timeline contains also three items, corresponding to the input items values multiplied by 3.

The prototype of this operator is the following one:

Observable.map(self, selector)

The selector parameter is the function that will be executed on all items of the input observable.

The from_ operator

The from_ operator is the operator that was used in the previous example to create an observable from a list. The marble diagram of this operator is shown in the following figure:

Figure 1.11: The from_ operator

Note

The name of this operator is strange because it ends with an underscore. The reason for this underscore is that from is a reserved keyword in Python, so it was not possible to name this operator as it should be; that is, from. If you dislike this notation, there is an alias named from_list. You can use it for a more Pythonic code at the expense of a longer name.

The prototype of this operator is the following one:

Observable.from_(iterable, scheduler=None)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

The first parameter accepts any iterable object. This includes lists, tuples, dictionaries, and any class that implements the iterator methods __iter__ and next. The second parameter is used to provide a scheduler that will be used to emit the items of the observable. This parameter is present on all creation operators. It is useful when running in an asynchronous environment or an environment with concurrency. We will study schedulers in detail in Chapter 5, Concurrency and Paralellism in RxPY.

The from_ operator creates an observable that emits one item per entry in the iterable. It then completes the observable. Here are some examples of usage which show the items that they return:

Observable.from_(sys.argv) # argv[0], argv[1], argv[2]..., completed
Observable.from_([1, 2, 3, 4]) # 1, 2, 3, 4, completed
Observable.from_({'foo': 'fooz', 'bar': 'barz'}) # 'foo', 'bar', completed

Note that when using a dictionary, the observable contains the keys of the dictionary and not the values. This is the same behavior as a classic Python iteration on a dictionary using a for loop.

 

Flow diagrams


Marble diagrams are a great way to explain how an operator works. However, they are not suited to describe how a program, a function, or a component works. Marble diagrams can show from example how a transformation on one observable is applied, but they cannot be used to describe how operators are combined to achieve a more complex operation on the data. We need a kind of diagram that shows the complete transformations applied on observables without the details of each operator being used. We need a diagram in which one considers that the operators are known by the reader but the overall behavior of a component must be described.

Reactivity diagrams

Fortunately, we can use diagrams inspired from a tool used in sequential programming: UML activity diagrams. An activity diagram is the UML name of something you may know as a flowchart. These diagrams are used to describe the sequences of actions which are performed by a program, but they can also express repetitions, alternatives, joins, merges and so on. Activity diagrams with only few tweaks are a great way to represent the behavior of an RX component. We will call them reactivity diagrams.

 

 

The main difference between an activity and a reactivity diagram is that an activity diagram represents actions that are performed when the component is called, while a reactivity diagram represents actions which are performed each time an item is emitted on one of the source observables. Other than that, the elements defined by UML are used in almost the same way. Let's detail this with the example shown in the following figure:

Figure 1.12: A reactivity diagram

This is an example of a function which takes two observables as input and provides two observables as output. This function counts the number of dogs emitted on the Dogs input observable and sends back the name of all animals emitted on both the Dogs and Cats input observables. Input observables are observables that the component observes. They emit the items that will be transformed by the component. Input observables are represented as black circles. Output observables are observables that are the result of the transformation of the component. Output observables are represented as encircled black circles.

 

The previous example shows an important point in RX programming: from now on—almost—everything that you will write will deal with observables. It means that almost all components are written as functions that take observables as input and return observables. This is the way reusability is achieved via composition.

The same transformation is applied to the items of both input observables: capitalize the name of the animal. Then the capitalized dog name items are being shared; that is, they are being emitted on two observables. Finally, the capitalized dog name items are counted and the value of this count is emitted in the dogs count observable. On the right side, the dog and cat name observables are merged to an observable that emits the capitalized names of all animals.

So, if such a component is used with the following observables, it will be shown as in the following example:

dogs = Observable.from_(["sam", "max", "maggie", "buddy"])
cats = Observable.from_(["luna", "kitty", "jack")

It will emit the following items on the output observables:

dog_count: 1, 2, 3, 4
animals: sam, luna, max, maggie, buddy, kitty, jack

The actual ordering of the items emitted on the animals observable will depend on when they are emitted on the input observable.

Reactivity diagram elements

Reactivity diagrams can be constructed from the following elements:

  • Circles: Observables are represented as circles. An input observable is represented as a black circle. An output observable is represented as an encircled black circle. Two other notations allow us to indicate when actions are done: when an observable terminates on completion or on error. This notation is similar to the marble diagrams; an observable error event is represented as a circle with a cross in it, and an observable completion event is represented as a circle with a bar in it. This is shown in the following figure:

Figure 1.13: Observable notations. From left to right: input, output, error, completion

  • Rectangles: Operators are represented as rounded rectangles. The text inside the rectangle describes the actions being performed. The first line contains the name of the operator and its parameters. The following lines contain the description of the action. This notation can also be used for components being used in the current component. This notation can also be used as a merge point for operators that combine several observables. This is shown in the following figure:

Figure 1.14: Operators notation

  • Flow: Items flows are represented as arrows. The type of items emittedon the observable is written near the arrow. Item flows show how operators and other elements are linked together. This is shown in the following figure:

Figure 1.15: Items flows notation

  • Diamond: Decisions are represented as a diamond. The decision notation is used only for operators that take an observable as input and split it into two or more observables, the split being based on a segmentation logic described in the diamond. The text inside the diamonds describes the segmentation logic in the same way as the operator's notation. This is seen in following figure:

Figure 1.16: Decisions notation

  • Horizontal or vertical black bar: Share and merge are represented as a horizontal or vertical black bar. An observable is shared when there is one incoming observable and several outgoing observables on the bar. Observables are merged when there are several incoming observables and one outgoing observable on the bar.This is demonstrated in the following figure:

Figure 1.17: Share and merge notation: share (center), merge (right)

  • Rectangle with the upper-right corner bent: Out of monad actions are represented as a rectangle with the upper-right corner bent. Out of monad actions are the actions that are not done via an operator or a component operating on observables. The typical usecase is the code of the subscription associated to an observer. The text in the rectangle describes briefly the actions being done. This can be seen in the following figure:

Figure 1.18: Out of monad notation

Reactivity diagrams of an echo example

We will complete this tour of the reactivity diagrams by writing the echo example. Its diagram is shown in the following figure:

Figure 1.19: The echo app reactivity diagram

This simple diagram should allow any developer to understand what is going on, provided that he knows that it applies to each item emitted on the argv input observable. First, the input observable is created from the argv variable. Then each item is capitalized with the map operator. Finally, each event type (item, completion, or error) is printed. Note that the content of each element is not a copy of the code, but a small description of what it does. The echo example was quite simple, but in a real application you want to document the behavior with reactivity diagrams, not duplicate the code on a diagram.

 

Summary


You should now understand what event-driven programming is, what reactive programming is, and what are the common points and differences between them. It is important to remember that event-driven programming is not a programming paradigm, but a way to structure the code flow. Knowing the basics of the reactor and proactor design patterns is also important to better understand how the frameworks that will be used in the next chapters work.

From now, you can start writing reactive code for tasks that you may have written in a sequential way. This kind of exercise, even for very simple algorithms, is good training in how to structure your code as a data-flow instead of a code-flow. Switching from a code-flow design to a data-flow design is the key point in writing ReactiveX applications.

Last but not least, you should now be able to navigate easily in the ReactiveX documentation and understand more easily the behavior of each operator, thanks to marble diagrams. Never hesitate to use them when you need to write your own operator or component. This is always a good way to show what you want to achieve. For a more dynamic view, reactivity diagrams will help you design bigger components or document how they are composed together.

The next chapter will introduce how asynchronous programming is done in Python, and, more specifically, with its dedicated module of the standard library, AsyncIO. But before that, detailed explanations of the underlying principles of asynchronous functions will be provided so that you can understand what's going on under the hood.

 

 

 

Questions


  • Is event-driven programming possible only with some programming languages?
  • What are the differences between reactive programming and reactive systems?
  • What is an observable?
  • What is an observer?
  • Are observables pull-based or push-based?
  • What makes reactivity diagrams different from activity diagrams?
  • How do you create observable emitting integers from 0 to 10,000?
  • How do you create an observable from another observable where all items are multiplied by 3?
 

Further reading


The description of the observer design pattern was originally documented in the book

Design Patterns: Elements of Reusable Object-Oriented Software

. You can read it to understand one of the foundations of ReactiveX. This book contains the description of all base design patterns used in object-oriented programming.

A detailed description of the reactor and proactor design patterns is available in the book

Pattern-Oriented Software Architecture, Patterns for Concurrent and Networked Objects, Volume 2.

This book contains information on concurrency, synchronization, and event handling.

The ReactiveX documentation is available online here: http://reactivex.io/documentation/operators.html. This documentation is generic to all programming languages, featuring marble diagrams of each operator. Refer to this documentation when looking for an operator. For each operator, it contains links to the specificity of each implementation.

The Reactive Manifesto is a great source of information to understand what is a reactive system and how to implement such a system: https://www.reactivemanifesto.org/.

You must read and learn The Observable Contract available here: http://reactivex.io/documentation/contract.html. It describes in a few words the rules that govern observables and observers. Once you are at ease with these concepts, you will be able to write and read ReactiveX code in an efficient way.

About the Author

  • Romain Picard

    Romain Picard is currently a data science engineer. He has been working in the digital TV and telecommunications industry for 20 years. His daily work consists of data manipulation, machine learning model training, and model deployment. Almost all of these tasks are based on Python code, and he uses reactive programming whenever it's applicable. He was previously a media software architect and a software developer. In these previous positions, he designed and developed TV and OTT players that have been used in tens of millions of set top boxes. Romain is especially interested in algorithms, looking for the most suitable algorithm for each use case.

    Browse publications by this author

Latest Reviews

(1 reviews total)
One of the most clear, comprehensive, and sorely needed books I've ever spent money on

Recommended For You

Book Title
Unlock this full book with a FREE 10-day trial
Start Free Trial