C++ Reactive Programming

4.7 (6 reviews total)
By Praseed Pai , Peter Abraham
  • Instant online access to over 8,000+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Reactive Programming Model – Overview and History

About this book

Reactive programming is an effective way to build highly responsive applications with an easy-to-maintain code base. This book covers the essential functional reactive concepts that will help you build highly concurrent, event-driven, and asynchronous applications in a simpler and less error-prone way.

C++ Reactive Programming begins with a discussion on how event processing was undertaken by different programming systems earlier. After a brisk introduction to modern C++ (C++17), you’ll be taken through language-level concurrency and the lock-free programming model to set the stage for our foray into the Functional Programming model. Following this, you’ll be introduced to RxCpp and its programming model. You’ll be able to gain deep insights into the RxCpp library, which facilitates reactive programming. You’ll learn how to deal with reactive programming using Qt/C++ (for the desktop) and C++ microservices for the Web.

By the end of the book, you will be well versed with advanced reactive programming concepts in modern C++ (C++17).

Publication date:
June 2018
Publisher
Packt
Pages
348
ISBN
9781788629775

 

Chapter 1. Reactive Programming Model – Overview and History

The X Windows system, Microsoft Windows, and IBM OS/2 Presentation Manager made GUI programming popular on the PC platform. This was a major shift from the character mode user interface and batch process style programming models that existed before them. Responding to events became a major concern for software developers worldwide and platform vendors resorted to the creation of low-level C-based APIs that relied on function pointers and callbacks to enable programmers to handle the events. The programming models were mostly based on the co-operative multithreaded model, and with the advent of better microprocessors, most platforms began to support pre-emptive multithreading. Handling events (and other asynchronous tasks) became more complex and responding to events in the traditional way became less scalable. Even though excellent C++-based GUI toolkits made their appearance, event handling was done mostly using message IDs, function pointer based dispatches, and other low-level techniques. A prominent compiler vendor even tried adding language extensions to the C++ language to enable better Windows programming. Handling events, asynchrony, and associated issues require a fresh look at the problem. Luckily, the Modern C++ standard has support for Functional Programming, language-level concurrency (with a memory model), and better memory management techniques to enable programmers to work with asynchronous data streams (by treating events as streams). This is achieved using a programming model called reactive programming. To put things in perspective, this chapter will outline the following topics:

  • Event-driven programming model and how it has been implemented in various platforms.
  • What is reactive programming?
  • Different models of reactive programming.
  • Some simple programs to make conceptual understanding better.
  • The philosophy of our book.
 

Event-driven programming model


Event-driven programming is a programming model where flow control is determined by events. Examples of events are mouse clicks, key presses, gestures, sensor data, messages from other programs, and so on. An event-driven application has the mechanism to detect events on a near real-time basis, and respond or react to them by invoking the appropriate event handling procedure. Since the bulk of the earlier event processing programs were written using C/C++, they resorted to low-level techniques such as callbacks (using function pointers) to write those event handlers. Later systems such as Visual Basic, Delphi, and other rapid application development tools did add native support for event-driven programming. To make matters more clear, we will take a tour of the event handling mechanism of the various platforms. This will help readers appreciate the issues that reactive programming models are solving (from a GUI programming context).

Note

Reactive programming treats data as streams and events in windowing systems can be treated as streams to be processed in a uniform manner. The Reactive programming model provides support for gathering events from different sources as streams, filtering streams, the transformation of streams, performing actions on streams, and so on. The programming model handles asynchrony, scheduling details as part of the framework. This chapter is mostly based on the key data structures of the Reactive programming model and how we can implement basic Reactive programs. In an industrial-strength reactive program, the code written will be asynchronous and the examples from this chapter are synchronous. We give the necessary background information and language constructs in the following chapters before out of order execution and schedules are discussed. These implementations are here for elucidation and can be treated as learning examples.

Event-driven programming on X Windows

The X Windows programming model is a cross-platform API, is mostly supported on POSIX systems, and has even been ported to Microsoft Windows. In fact, X is a network windowing protocol, which required a Window manager to manage the Windows stack. The screen contents are managed by the X server and the client library will pull the contents and display them on the local machine. In desktop environments, the server runs locally on the same machine. The following program will help the reader understand the gist of the XLib programming model and how events are handled in the platform:

#include <X11/Xlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main(void)
{
    Display *display;
    Window window;
    XEvent event;
    char *msg = "Hello, World!";
    int s;

The preceding code snippet includes the proper header files that a programmer is supposed to include to get the function prototypes provided by the XLib C library. There are some data structures that a programmer should be aware of while writing XLib programs from scratch. Nowadays, people use libraries such as Qt, WxWidgets, Gtk+, Fox toolkit, and so on to write commercial-quality X Programs.

/* open connection with the server */
    display = XOpenDisplay(NULL);
    if (display == NULL){
        fprintf(stderr, "Cannot open display\n");
        exit(1);
    }
    s = DefaultScreen(display);
    /* create window */
    window = XCreateSimpleWindow(display,
             RootWindow(display, s), 10, 10, 200, 200, 1,
             BlackPixel(display, s), WhitePixel(display, s));

    /* select kind of events we are interested in */
    XSelectInput(display, window, ExposureMask | KeyPressMask);

    /* map (show) the window */
    XMapWindow(display, window);

The preceding code snippet initializes the server and creates a window to certain specifications. Traditionally, most X Windows programs run under a window manager that manages the cascading windows. We selected the messages that are of interest to us by invoking the XSelectInput API call before displaying the window:

    /* event loop */
    for (;;)
    {
        XNextEvent(display, &event);

        /* draw or redraw the window */
        if (event.type == Expose)
        {
            XFillRectangle(display, window,
                DefaultGC(display, s), 20, 20, 10, 10);
            XDrawString(display, window,
                DefaultGC(display, s), 50, 50, msg, strlen(msg));
        }
        /* exit on key press */
        if (event.type == KeyPress)
        break;
    }

Then, the program goes to an infinite loop while polling for any events, and the appropriate Xlib API will be used to draw a string on the Window. In Windowing parlance, it is called a message loop. The retrieval of events will be done by the XNextEvent API call:

    /* close connection to server */
    XCloseDisplay(display);

    return 0;
    }

Once we are out of the infinite message loop, the connection to the server will be closed.

Event-driven programming on Microsoft Windows

Microsoft Corporation created a GUI programming model, which can be considered as the most successful windowing system in the world. The third edition of the Windows software was a runaway success (in 1990) and Microsoft followed this with the Windows NT and Windows 95/98/ME series. Let us look at the event-driven programming model of Microsoft Windows (consult Microsoft documentation for a detailed look at how this programming model works). The following program will help us understand the gist of what is involved in writing Windows Programming using C/C++:

#include <windows.h>
//----- Prtotype for the Event Handler Function
LRESULT CALLBACK WndProc(HWND hWnd, UINT message,
                         WPARAM wParam, LPARAM lParam);
//--------------- Entry point for a Idiomatic Windows API function
int WINAPI WinMain(HINSTANCE hInstance,
              HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdShow)
{

MSG msg = {0};
WNDCLASS wc = {0};
wc.lpfnWndProc = WndProc;
wc.hInstance = hInstance;
wc.hbrBackground = (HBRUSH)(COLOR_BACKGROUND);
wc.lpszClassName = "minwindowsapp";
if( !RegisterClass(&wc) )
  return 1;

The preceding code snippet initializes a structure by the name of WNDCLASS (or WNDCLASSEX for modern systems) with a necessary template for a Window. The most important field in the structure is lpfnWndProc, which is the address of the function that responds to the event inside an instance of this Window:

if( !CreateWindow(wc.lpszClassName,
                  "Minimal Windows Application",
                  WS_OVERLAPPEDWINDOW|WS_VISIBLE,
                  0,0,640,480,0,0,hInstance,NULL))
    return 2;

We will invoke the CreateWindow (or CreateWindowEx on modern systems) API call to create a window based on the class name provided in the WNDCLASS.lpszClassname parameter:

    while( GetMessage( &msg, NULL, 0, 0 ) > 0 )
        DispatchMessage( &msg );
    return 0;
}

The preceding code snippet gets into an infinite loop where messages will be retrieved from the message queue until we get a WM_QUIT message. The WM_QUIT message takes us out of the infinite loop. The Messages will sometimes be translated before calling the DispatchMessage API call. DispatchMessage invokes the Window callback procedure (lpfnWndProc):

LRESULT CALLBACK WndProc(HWND hWnd, UINT message,
                         WPARAM wParam, LPARAM lParam) {
switch(message){
  case WM_CLOSE:
    PostQuitMessage(0);break;
  default:
    return DefWindowProc(hWnd, message, wParam, lParam);
}
return 0;
}

The preceding code snippet is a minimalist callback function. You can consult Microsoft documentation to learn about Windows API programming and how events are handled in those programs

Event-driven programming under Qt

The Qt Framework is an industrial-strength, cross-platform, and multi-platform GUI toolkit that runs on Windows, GNU Linux, macOS X, and other Mac systems. The toolkit has been compiled into embedded systems and mobile devices. The C++ Programming model has leveraged something called Meta Object Compiler (MOC), which will peruse the source code for directives (a bunch of macros and language extensions embedded in the source code) and generate appropriate additional source code to generate event handlers. So, before the C++ compiler gets the source code, the MOC pass has to run to generate legal ANSI C++ by removing those extra linguistic constructs specific to the Qt system. Consult the Qt documentation to learn more about this. The following simple Qt program will demonstrate the key aspects of Qt programming and its event processing system:

#include <qapplication.h>
#include <qdialog.h>
#include <qmessagebox.h>
#include <qobject.h>
#include <qpushbutton.h>

class MyApp : public QDialog {
  Q_OBJECT
public:
    MyApp(QObject* /*parent*/ = 0):
    button(this)
    {
      button.setText("Hello world!"); button.resize(100, 30);

      // When the button is clicked, run button_clicked
      connect(&button,
              &QPushButton::clicked, this, &MyApp::button_clicked);
    }

The macro Q_OBJECT is a directive to the MOC to generate an Event Dispatch table. When we connect the event source to an event sink, an entry will be given to the Event Dispatch table. The generated code will be compiled along with the C++ code to produce an executable:

public slots:
    void button_clicked() {
      QMessageBox box;
      box.setWindowTitle("Howdy");
      box.setText("You clicked the button");
      box.show();
      box.exec();
    }

protected:
  QPushButton button;
};

The language extension public slots will be stripped away by the MOC (after doing the job of source code generation) to a form compatible with the ANSI C/C++ compiler:

int main(int argc, char** argv) {
  QApplication app(argc, argv);
  MyApp myapp;
  myapp.show();
  return app.exec();
}

The preceding code snippet initializes the Qt application object and displays the main window. For all practical purposes, Qt is the most prominent application development framework for the C++ language and it also has got a good binding to the Python Programming language.

Event-driven programming under MFC

The Microsoft Foundation class library is still a popular library with which to write Microsoft Windows-based desktop programs. It does have some support for web programming if we mix ActiveX Template Library (ATL) along with it. Being a C++ library, MFC uses a mechanism called Message Mapping to handle events. A sample event handling table given as macros is part of every MFC program:

BEGIN_MESSAGE_MAP(CClockFrame,CFrameWnd)
    ON_WM_CREATE()
    ON_WM_PAINT()
    ON_WM_TIMER()
END_MESSAGE_MAP()

The preceding message map will respond to OnCreate, OnPaint, and Ontimer standard Windows API messages. Deep down these message maps are arrays on to which we will use message id as an index for dispatching the events. On closer examination, it is not much different from the standard Windows API messaging model.

Note

The code listing is not given here because we have globally a GUI implementation of one of the key interfaces for the Reactive Programming model using MFC. The implementation is based on the MFC library and the reader can go through the annotated listing to gain an understanding of non-trivial event processing in MFC.

Other event-driven programming models

Distributed object processing frameworks such as COM+ and CORBA do have their own event processing framework. The COM+ event model is based on the notion of Connection Points (modeled by IConnectionPointContainer/IConnectionPoint interfaces) and CORBA does have its own event service model. The CORBA standard provides both pull-based and push-based event notifications. COM+ and CORBA are beyond the scope of this book and the reader is expected to consult the respective documentation.

Limitations of classical event processing models

The whole purpose of making a tour of the event processing supported by various platforms was to put things into the proper perspective. The event response logic in these platforms is mostly coupled with the platform where the code is written. With the advent of multi-core programming, writing low-level multi-threaded code is difficult and declarative task-based programming models are available with the C++ programming language. But the event sources are mostly outside the C++ standard! The C++ language does not have a standard GUI programming library, an interface standard to access external devices, and so on. What is the way out? Luckily, events and data from external sources can be aggregated into streams (or sequences) and by using functional programming constructs such as Lambda functions can be processed very efficiently. The added bonus is that if we resort to some kind of restrictions regarding the mutability of variables and streams, concurrency, and parallelism are built into the stream processing model.

 

Reactive programming model


Simply put, reactive programming is nothing but programming with asynchronous data streams. By applying various operations on stream, we can achieve different computational goals. The primary task in a reactive program is to convert data into streams, regardless of what the source of the data is. While writing modern graphical user interface applications, we process mouse move-and-click events. Currently, most systems get a callback and process these events as and when they happen. Most of the time, the handler does a series of filtering operations before it invokes the action methods associated with the event calls. In this particular context, reactive programming helps us in aggregating the mouse move-and-click events into a collection and sets a filter on them before notifying the handler logic. In this way, the application/handler logic does not get executed unnecessarily.

The stream-processing model is well known, and it is very easy to encode by application developers. Pretty much anything can be converted into a stream. Such candidates include messages, logs, properties, Twitter feeds, blog posts, RSS feeds, and so on. Functional programming techniques are really good at processing streams. A language such as Modern C++, with excellent support for Object/Functional programming, is a natural choice for writing reactive programs. The basic idea behind reactive programming is that there are certain datatypes that represent a value over time. These datatypes (or rather data sequences) are represented as Observable sequences in this programming paradigm. Computations that involve these changing (time-dependent) values will, in turn, themselves have values that change over time, and will need to asynchronously receive notifications (as and when the dependent data changes).

 

Functional reactive programming


Almost all modern programming languages support functional programming constructs. Functional programming constructs such as Transform, Apply, Filter, Fold, and so on are good for processing streams. Programming asynchronous data streams using functional programming constructs are generally called functional reactive programming (for all practical purposes). The definition given here is an operational one. Consult the work done by Conal Elliott and Paul Hudak as part of the Haskell community to understand the strict definition. Mixing Reactive Programming with FP is gaining traction among developers these days. The Emergence of libraries such as Rx.Net, RxJava, RxJs, and RxCpp and so on is a testimony to this.

Note

Even though reactive programming is the core subject of this book, in this chapter we will be sticking to an OOP approach. This is necessitated because of the fact that we need to introduce some standard interfaces (emulated in C++ using virtual functions) necessary for doing Reactive programming. Later on, after learning about FP constructs supported by C++ , readers can do some mental model mapping from OOP to FP constructs. We will also keep away from concurrency stuff to focus on software interfaces in this chapter. Chapters 2, A Tour of the Modern C++ and Its Key Idioms, Chapter 3, Language-Level Concurrency and Parallelism in C++, and Chapter 4, Asynchronous and Lock-Free Programming in C++, will give the necessary background to understand reactive programming using FP constructs.

 

The key interfaces of a reactive program


To help you understand what is really happening inside a reactive program, we will write some toy programs to put things in proper context. From a software design point of view, if you keep concurrency/parallelism aside to focus on software interfaces, a reactive Program should have:

  • An event source that implements IObservable<T>
  • An event sink that implements IObserver<T>
  • A mechanism to add subscribers to an event source
  • When data appears at the source, subscribers will be notified

Note

In this particular chapter, we have written code using classic C++ constructs. This is because we have not yet introduced Modern C++ constructs. We have also used raw pointers, something which we can mostly avoid while writing Modern C++ code. The code in this chapter is written to conform to the ReactiveX documentation in general. In C++, we do not use inheritance-based techniques like we do in Java or C#. 

To kickstart, let us define Observer, Observable, and a CustomException class:

#pragma once 
//Common2.h 
 
struct CustomException /*:*public std::exception */ {
   const char * what() const throw () { 
         return "C++ Exception"; 
   } 
}; 

The CustomException class is just a placeholder to make the interface complete. Since we have decided that we will only use classic C++ in this chapter, we are not deviating from the std::exception class:

template<class T> class IEnumerator {
public:
      virtual bool HasMore() = 0;
      virtual T next() = 0;
      //--------- Omitted Virtual destructor for brevity
};
template <class T> class IEnumerable{
public:
      virtual IEnumerator<T> *GetEnumerator() = 0;
      //---------- Omitted Virtual destructor for brevity
};

The Enumerable interface is used by the data source from which we can enumerate data and IEnuerator<T> will be used for iteration by the client.

Note

The purpose of defining interfaces for Iterator (IEnuerable<T>/IEnumerator<T>) is to make the reader understand that they are very closely related to the Observer<T>/Observable<T> pattern. We will define Observer<T>/Observable<T> as follows:

template<class T> class IObserver
{
public:
      virtual void OnCompleted() = 0;
      virtual void OnError(CustomException *exception) = 0;
      virtual void OnNext(T value) = 0;
};
template<typename T>
class IObservable
{
public:
      virtual bool Subscribe(IObserver<T>& observer) = 0;
};

IObserver<T> is the interface that the data sink will use to receive notifications from the data source. The data source will implement the IObservable<T> interface.

Note

We have defined the IObserver<T> interface and it has got three methods. They are OnNext (when the item is notified to the Observer), OnCompleted (when there is no more data), and OnError (when an exception is encountered). Observable<T> is implemented by the event source and event sinks can insert objects that implement IObserver<T> to receive notifications.

 

Pull-versus push-based reactive programming


Reactive programs can be classified as push-based and pull-based. The pull-based system waits for a demand to push the data streams to the requestor (or subscriber in our case). This is the classic case where the data source is actively polled for more information. This employs the iterator pattern, and IEnumerable <T>/IEnumerator <T> interfaces are specifically designed for such scenarios that are synchronous in nature (the application can block while pulling data). On the other hand, a push-based system aggregates events and pushes through a signal network to achieve the computation. In this case, unlike the pull-based system, data and related updates are handed to the subscriber from the source (Observable sequences in this case). This asynchronous nature is achieved by not blocking the subscriber, but rather making it react to the changes. As you can see, employing this push pattern is more beneficial in rich UI environments where you wouldn't want to block the main UI thread while waiting for some events. This becomes ideal, thus making reactive programs responsive.

The IEnumerable/IObservable duality

If you take a closer look, there is only a subtle difference between these two patterns. IEnumerable<T> can be considered the pull-based equivalent of the push-based IObservable<T>. In fact, they are duals. When two entities exchange information, one entity's pull corresponds to another entity pushing the information. This duality is illustrated in the following diagram:

Let's understand this duality by looking at this sample code, a number sequence generator:

Note

We have striven to use classic C++ constructs to write programs for this particular chapter as there are chapters on Modern C++ language features, language level concurrency, lock-free programming, and related topics for implementing Reactive constructs in Modern C++.

#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class ConcreteEnumberable : public IEnumerable<int>
{
      int *numberlist,_count;
public:
      ConcreteEnumberable(int numbers[], int count):
            numberlist(numbers),_count(count){}
      ~ConcreteEnumberable() {}

      class Enumerator : public IEnumerator<int>
      {
      int *inumbers, icount, index;
      public:
      Enumerator(int *numbers,
            int count):inumbers(numbers),icount(count),index(0) {}
      bool HasMore() { return index < icount; }
      //---------- ideally speaking, the next function should throw
      //---------- an exception...instead it just returns -1 when the 
      //---------- bound has reached
      int next() { return (index < icount) ?
                   inumbers[index++] : -1; }
      ~Enumerator() {}
      };
      IEnumerator<int> *GetEnumerator()
            { return new Enumerator(numberlist, _count); }
};

The preceding class takes an array of integers as a parameter and we can enumerate over the elements as we have implemented the IEnumerable<T> interface. The Enumeration logic is implemented by the nested class, which implements the IEnumerator<T> interface:

int main()
{
      int x[] = { 1,2,3,4,5 };
      //-------- Has used Raw pointers on purpose here as we have
      //------- not introduced unique_ptr,shared_ptr,weak_ptr yet
      //-------- using auto_ptr will be confusting...otherwise
      //-------- need to use boost library here... ( an overkill)
      ConcreteEnumberable *t = new ConcreteEnumberable(x, 5);
      IEnumerator<int> * numbers = t->GetEnumerator();
      while (numbers->HasMore())
            cout << numbers->next() << endl;
      delete numbers;delete t;
      return 0;
}

The main program instantiates an implementation of the ConcreteEnuerable class and walks through each element.

We will write an even number sequence generator to demonstrate how these data types work together in converting a pull-based program to a push program. The robustness aspect is given low priority to keep the listing terse:

#include "stdafx.h"
#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class EvenNumberObservable : IObservable<int>{
      int *_numbers,_count;
public:
      EvenNumberObservable(int numbers[],
            int count):_numbers(numbers),_count(count){}
      bool Subscribe(IObserver<int>& observer){
            for (int i = 0; i < _count; ++i)
                  if (_numbers[i] % 2 == 0)
                        observer.OnNext(_numbers[i]);
            observer.OnCompleted();
            return true;
      }
};

The preceding program takes an array of integers, filters out of the odd numbers, and notifies Observer<T> if an even integer is encountered. In this particular case, the data source is pushing data to observer. The implementation of Observer<T> is given as follows:

class SimpleObserver : public IObserver<int>{
public:
      void OnNext(int value) { cout << value << endl; }
      void OnCompleted() { cout << _T("hello completed") << endl; }
      void OnError( CustomException * ex) {}
};

The SimpleObserver class implements the IObserver<T> interface and it has the capability to receive notifications and react to them:

int main()
{
      int x[] = { 1,2,3,4,5 };
      EvenNumberObservable *t = new EvenNumberObservable(x, 5);
      IObserver<int>> *xy = new SimpleObserver();
      t->Subscribe(*xy);
      delete xy; delete t;
      return 0;
}

From the preceding example, you see how one can naturally subscribe for even numbers from an Observable sequence of natural numbers. The system will automatically push (publish) the values to the observer (subscriber) when an even number is detected. The code gives explicit implementations for key interfaces so that one can understand, or speculate what really happens under the hood.

 

Converting events to IObservable<T>


We have now understood how one can convert an IEnumerable<T>-based pull program to an IObservable<T>/IObserver<T>-based push program. In real life, the event source is not as simple as we found in the number stream example given earlier. Let us see how we can convert a MouseMove event into a stream with a small MFC program:

Note

We have chosen MFC for this particular implementation because we have a chapter dedicated to Qt-based reactive programming. In that chapter, we will be implementing Reactive programs in idiomatic asynchronous push-based streams. In this MFC program, we simply do a filtering operation to see whether the mouse is moving in a bounding rectangle and, if so, notify the observer. We are using synchronous dispatch here. This example is synchronous too:

#include "stdafx.h"
#include <afxwin.h>
#include <afxext.h>
#include <math.h>
#include <vector>
#include "../Common2.h"

using namespace std;
class CMouseFrame :public CFrameWnd,IObservable<CPoint>
{
private:
      RECT _rect;
      POINT _curr_pos;
      vector<IObserver<CPoint> *> _event_src;
public:
      CMouseFrame(){
            HBRUSH brush =
                  (HBRUSH)::CreateSolidBrush(RGB(175, 238, 238));
            CString mywindow = AfxRegisterWndClass(
                  CS_HREDRAW | CS_VREDRAW | CS_DBLCLKS,
                  0, brush, 0);
            Create(mywindow, _T("MFC Clock By Praseed Pai"));
      }

The preceding part of the code defines a Frame class that derives from the MFC library the CFrameWnd class and also implements the IObservable<T> interface to force the programmer to implement the Subscribe method. A vector of IObserver<T> will store the list of observers or Subscribers. For this example, we will have only one observer. There is no restriction on the number of observer in the code:

      virtual bool Subscribe(IObserver<CPoint>& observer) {
            _event_src.push_back(&observer);
            return true;
      }

The Subscribe method just stores the reference to the observer onto a vector and returns true: when the mouse is moved, we get notification from the MFC library and if it is in a rectangular area, observer will be notified (the notification code is as follows):

      bool FireEvent(const CPoint& pt) {
            vector<IObserver<CPoint> *>::iterator it =
                  _event_src.begin();
            while (it != _event_src.end()){
                  IObserver<CPoint> *observer = *it;
                  observer->OnNext(pt);
                  //---------- In a Real world Rx programs there is a 
                  //--------- sequence stipulated to call methods...
                  //--------- OnCompleted will be called only when 
                  //--------- all the data is processed...this code
                  //--------- is written to demonstrate the call schema
                  observer->OnCompleted();
                  it++;
            }
            return true;
      }

The FireEvent method walks through the observer's and calls the OnNext method of the observer. It also calls the OnCompleted method of each instance of Observer's: The Rx dispatching mechanism follows certain rules while calling the observer methods. If OnComplete method is called, no more OnNext will be called on the same observer. Similarly, if OnError is called, no further messages will be dispatched to the observer. If we need to follow the conventions stipulated by the Rx model here, the listing will get complicated. The purpose of the code given here is to show how the Rx programming model works in a schematic manner.

      int OnCreate(LPCREATESTRUCT l){
            return CFrameWnd::OnCreate(l);
      }
      void SetCurrentPoint(CPoint pt) {
            this->_curr_pos = pt;
            Invalidate(0);
      }

The SetCurrentPoint method is invoked by observer to set the current point where the text has to be drawn. The Invalidate method is invoked to trigger a WM_PAINT message and the MFC subsystem will route it to OnPaint (as it is wired in the Message maps):

      void OnPaint()
      {
            CPaintDC d(this);
            CBrush b(RGB(100, 149, 237));
            int x1 = -200, y1 = -220, x2 = 210, y2 = 200;
            Transform(&x1, &y1); Transform(&x2, &y2);
            CRect rect(x1, y1, x2, y2);
            d.FillRect(&rect, &b);
            CPen p2(PS_SOLID, 2, RGB(153, 0, 0));
            d.SelectObject(&p2);

            char *str = "Hello Reactive C++";
            CFont f;
            f.CreatePointFont(240, _T("Times New Roman"));
            d.SelectObject(&f);
            d.SetTextColor(RGB(204, 0, 0));
            d.SetBkMode(TRANSPARENT);
            CRgn crgn;
            crgn.CreateRectRgn(rect.left,rect.top,
            rect.right ,rect.bottom);
            d.SelectClipRgn(&crgn);
            d.TextOut(_curr_pos.x, _curr_pos.y,
            CString(str), strlen(str));
      }

The OnPaint method is invoked by the MFC framework when the Invalidate call is made. The method draws the literal string, Hello Reactive C++, on the screen:

      void Transform(int *px, int *py) {
            ::GetClientRect(m_hWnd, &_rect);
            int width = (_rect.right - _rect.left) / 2,
            height = (_rect.bottom - _rect.top) / 2;
           *px = *px + width; *py = height - *py;
      }

The Transform method computes the bound of the client area of the Frame and converts Cartesian coordinates to devise coordinates. This computation can be better done through world coordinate transformations:

      void OnMouseMove(UINT nFlags, CPoint point)
      {
            int x1 = -200,y1= -220, x2 = 210,y2 = 200;
            Transform(&x1, &y1);Transform(&x2, &y2);
            CRect rect(x1, y1, x2, y2);
            POINT pts;
            pts.x = point.x; pts.y = point.y;
            rect.NormalizeRect();
            //--- In a real program, the points will be aggregated
            //---- into a list (stream)
            if (rect.PtInRect(point)) {
                  //--- Ideally speaking this notification has to go
                  //--- through a non blocking call
                  FireEvent(point);
            }
      }

The OnMouseMove method checks whether the mouse position is within a rectangle centered inside the screen and fires the notification to the observer:

      DECLARE_MESSAGE_MAP();
};

BEGIN_MESSAGE_MAP(CMouseFrame, CFrameWnd)
      ON_WM_CREATE()
      ON_WM_PAINT()
      ON_WM_MOUSEMOVE()
END_MESSAGE_MAP()
class WindowHandler : public IObserver<CPoint>
{
private:
      CMouseFrame *window;
public:
      WindowHandler(CMouseFrame *win) : window(win) { }
      virtual ~WindowHandler() { window = 0; }
      virtual void OnCompleted() {}
      virtual void OnError(CustomException *exception) {}
      virtual void OnNext(CPoint value) {
            if (window) window->SetCurrentPoint(value);
      }
};

The preceding class WindowHandler implements the IObserver<T> interface and handles the event notified by CMouseFrame, which implements the IObservable<CPoint> interface. In this canned example, we set the current point by invoking the SetCurrentPoint method to draw the string at the mouse position:

class CMouseApp :public CWinApp
{
      WindowHandler *reactive_handler;
public:
      int InitInstance(){
            CMouseFrame *p = new CMouseFrame();
            p->ShowWindow(1);
            reactive_handler = new WindowHandler(p);
            //--- Wire the observer to the Event Source
            //--- which implements IObservable<T>
            p->Subscribe(*reactive_handler);
            m_pMainWnd = p;
            return 1;
      }
      virtual ~CMouseApp() {
            if (reactive_handler) {
                  delete reactive_handler;
                  reactive_handler = 0;
           }
      }
};

CMouseApp a;
 

The philosophy of our book


The purpose of this chapter is to introduce readers to the key interfaces of the reactive programming mode they are—IObservable<T> and IObserver<T>. They are in fact the duals of the IEnumerable<T> and IEnumerator<T> interface. We learned how to model those interfaces in classic C++ (well, mostly) and had toy implementations of all of them. Finally, we implemented a GUI program that captures mouse movements and notifies a list of Observers. These toy implementations are to get our feet wet with the ideas and ideals of the Reactive programming model. Our implementations can be considered as implementing of OOP-based reactive programming.

To be proficient in C++ reactive programming, a programmer has to be comfortable with the following topics:

  • Advanced linguistic constructs provided by Modern C++
  • Functional programming constructs provided by Modern C++
  • Asynchronous programming (RxCpp handles it for you!) model
  • Event stream processing
  • Knowledge of industrial-strength libraries such as RxCpp
  • Applications of RxCpp in GUI and web programming
  • Advanced reactive programming constructs
  • Handling errors and exceptions

This chapter was mostly about key idioms and why we require a robust model for handling asynchronous data. The next three chapters will cover language features of Modern C++, handling concurrency/parallelism with C++ standard constructs, and lock-free programming (made possible by memory model guarantees). The preceding topics will give the user a firm foundation from which to master functional reactive programming.

In Chapter 5, Introduction to Observables, we will once again return to the topic of Observables and implement interfaces in a functional manner to reiterate some of the concepts. In Chapter 6, Introduction to Event Stream Programming Using C++, we will move towards the advanced event stream processing topics with the help of two industrial-strength libraries that use the Domain Specific Embedded Language (DSEL) approach towards event stream processing.

By now, the stage will be set for the user to be exposed to the industrial-strength RxCpp library and its nuances to write professional-quality Modern C++ programs. In Chapter 7, Introduction to Data Flow Computation and the RxCpp Library and Chapter 8, RxCpp – the Key Elements, we will cover this wonderful library. The following chapters will cover Reactive GUI programming using the Qt library and advanced operators in RxCpp.

The last three chapters cover advanced topics of Reactive design patterns, micro-services in C++, and handling errors/exceptions. By the end of the book, the reader who started with classic C++ will have covered a lot of ground, not only in writing Reactive programs but in the C++ language itself. Because of the nature of the topic, we will cover most of the features of C++ 17 (at the time of writing).

 

Summary


In this chapter, we learned about some key data structures of the Rx programming model. We implemented toy versions of them to familiarize us with the conceptual nuances underpinning them. We started with how GUI events were handled by Windows API, XLib API, MFC, and Qt. We briefly touched upon how events are handled in COM+/CORBA as well. Then, a quick overview of Reactive programming was given. After introducing some interfaces, we implemented them from scratch. Finally, a GUI version of these interfaces on top of MFC was implemented for the sake of completeness. We also dealt with the key philosophical aspects of the book.

In the next chapter, we will make a whirlwind tour of the key features of Modern C++ (C++ Versions 11/14/17) by emphasizing on move semantics, Lambdas, type inference, range-based loops, pipe-able operators, smart pointers, and so on. This is essential for writing even basic code for Reactive Programming.

About the Authors

  • Praseed Pai

    Praseed Pai has been working in the software industry for the last 25 years, starting his career as a MS-DOS systems programmer using ANSI C. He has been actively involved in developing large-scale, cross-platform, native code-based systems using C++ on Windows, GNU Linux, and macOS X. He has experience in COM+ and CORBA programming using C++. In the last decade, he has worked with Java- and .NET-based systems.

    He is the primary implementer of the SLANG4.net compilation system, which has been ported to C++ with an LLVM backend. He coauthored .NET Design Patterns, by Packt Publishing.

    Browse publications by this author
  • Peter Abraham

    Peter Abraham has been a performance fanatic and a C/C++ programming language enthusiast right from his college days, where he excelled in Microsoft Windows programming and GNU Linux programming. He garnered experience in working with CUDA, image processing, and computer graphics programs by virtue of working with companies such as Quest Global, Siemens, and Tektronics.

    Peter has been eagerly following the C++ standard and RxCpp libraries as part of his profession. He has worked with cross-platform GUI toolkits such as Qt, WxWidgets, and FOX toolkit.

    Browse publications by this author

Latest Reviews

(6 reviews total)
Very good information on RxCpp
the topic is relevant to my interest and the writer explains clearly and easy to understand with great examples
Great use and teaching of reactive programming in C++.

Recommended For You

Book Title
Unlock this full book with a FREE 10-day trial
Start Free Trial