Reader small image

You're reading from  Building Big Data Pipelines with Apache Beam

Product typeBook
Published inJan 2022
Reading LevelBeginner
PublisherPackt
ISBN-139781800564930
Edition1st Edition
Languages
Right arrow
Author (1)
Jan Lukavský
Jan Lukavský
author image
Jan Lukavský

Jan Lukavský is a freelance big data architect and engineer who is also a committer of Apache Beam. He is a certified Apache Hadoop professional. He is working on open source big data systems combining batch and streaming data pipelines in a unified model, enabling the rise of real-time, data-driven applications.
Read more about Jan Lukavský

Right arrow

Chapter 3: Implementing Pipelines Using Stateful Processing

In the previous chapter, we focused on implementing pipelines that used high-level transformations. Such transforms tend to have low numbers of parameters and/or methods that need to be implemented in order to use them, and this comes at the expense of somewhat limited usability. Let's demonstrate this using the example of the GroupByKey transform. This is quite simply defined as a transform that wraps elements with the same key into an Iterable object. This Iterable object (essentially, nothing more than a bag of elements) is then triggered based on a windowing strategy. Nothing more, nothing less. But what if we need finer control? What if we want to control exactly when we emit the output for a particular input element? In that case, these high-level transformations will not do anymore.

In this chapter, we will first (nearly) complete the picture of the primitive PTransform objects that Apache Beam has in the model...

Technical requirements

We will use the same tools as in the previous chapter, so, for a detailed description of how to get set up, please refer to Chapter 2, Implementing, Testing, and Deploying Basic Pipelines. If you already have everything working, then there is nothing new needed. Please make sure that you have cloned the book's GitHub repository from https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam.

Now that we have everything set up, we can jump directly to solving our next puzzle!

Task 6 – Using an external service for data augmentation

All of the tasks we solved so far in the previous chapter had all of their data readily available in the input PCollection object. That might not be the case in all situations. Imagine a situation in which you need to augment your input data with some metadata that is located behind an external service. This external service is accessible via a Remote Procedure Call (RPC), as illustrated in the following figure:

Figure 3.1 – Augmenting data with an external service

We feed our input data to a (stateless) operation, which performs an RPC call for each input element (possibly doing some caching) and uses this outcome to somehow modify the input element and output (or discard) it to downstream processing. From this description, we will create a definition of the task problem.

Defining the problem

Given an input stream of lines of text (coming from Apache Kafka) and an RPC service that...

Introducing the primitive PTransform object – stateless ParDo

As we have already noted, the ParDo PTransform is the most basic primitive transform that we can use to do a variety of useful work. The name is an abbreviation of parallel do, and that is what it does. As already noted, there are multiple versions of this PTransform with different requirements and different behaviors. But, in essence, the basics of stateless ParDo remain valid for the other cases as well.

The essential parts of a ParDo object are illustrated in the following figure:

Figure 3.2 – A DoFn object life cycle

The first thing we notice is that the stream is split into chunks called bundles. The size of bundles or other runtime parameters are runner-specific – that is, each runner can choose its preferred way of assigning elements in a stream into bundles. The important thing to remember is that bundles are considered atomic units of work. The processing of a bundle...

Task 7 – Batching queries to an external RPC service

Let's imagine that the RPC service we used in Task 6 supports the batching of RPC queries. Batching is a technique for reducing network overhead by grouping multiple queries into a single one, thus increasing throughput. So, instead of querying our RPC service with each element, we would like to send multiple input elements in a single query.

Defining the problem

Given an RPC service that supports the batching of requests for increasing throughput, use this service to augment the input data of a PCollection object. Be sure to preserve the timestamp of both the timestamp and window assigned to the input element.

Discussing the problem decomposition

The first thing to notice is that unlike in Task 6, where we queried our RPC service with each element separately (and therefore, simply kept the timestamp and the window of the element untouched), in this case, we can have multiple elements with multiple timestamps...

Task 8 – Batching queries to an external RPC service with defined batch sizes

Let's suppose that our RPC server works best when it processes about 100 input words in a batch. A real-world requirement would probably look different and would be the result of measurements rather than an arbitrary number. However, for the present discussion, let's suppose that this performance characteristic is given. We can then summarize the task as follows.

Defining the problem

Use a given RPC service to augment data in an input stream using batched RPCs with batches of a size of about K elements. Also, resolve the batch after a time of (at most) T to avoid a (possibly) infinitely long wait for elements in small batches.

As we can see, we extended the definition of the problem with the introduction of a parameter, T, which will guard the time for which we can buffer the elements waiting for more data.

Discussing the problem decomposition

As already mentioned, we cannot...

Introducing the primitive PTransform object – stateful ParDo

This section will focus on a theoretical description of the stateful ParDo transform, which is the most complex transform that a typical user of Beam needs to understand. We will divide this section into two subsections. First, we will describe its theoretical properties and the differences from the stateless version of ParDo. Then, we will see how this theoretical knowledge applies to API changes.

Describing the theoretical properties of the stateful ParDo object

As we have seen, the main difference between a stateful ParDo object and a stateless ParDo object is – as the name suggests – the presence of user state or timers. This alone brings one important requirement: every meaningful access to a state must be keyed. That is to say, the PCollection object we apply to a stateful ParDo object must be of the KV<K, V> type. We must assign a key to every element of a PCollection object (or use...

Using side outputs

As the name suggests, side inputs are something that is added to the main input from the side, while side outputs are something that is output from the DoFn object outside of the main PCollection output. Let's start with the side outputs, as they are more straightforward.

As an example, let's imagine we are processing data coming in as JSON values. We need to parse these messages into an internal object. But what should we do with the values that cannot be parsed because they contain a syntax error? If we do not do any validation before we store them in the stream (topic), then it is certainly possible that we will encounter such a situation. We can silently drop those records, but that is obviously not a great idea, as that could cause hard-to-debug problems. A much better option would be to store these values on the side to be able to investigate and fix them. Therefore, we should aim to do the following:

Figure 3.8 – Main...

Defining droppable data in Beam

This section will be a short return to the material we covered in Chapter 2, Implementing, Testing, and Deploying Basic Pipelines, where we already defined what late data means. To recap – late data is every data element that has a timestamp that is behind the watermark. That is to say, the watermark tells us that we should not receive a data element with a timestamp lower than the watermark, but nevertheless, we do receive such an element. This is perfectly fine, and as already described in Chapter 1, Introduction to Data Processing with Apache Beam, a perfect watermark would introduce unnecessary – or even impractical – latency. However, what we left unanswered is the following question – what happens to data elements that arrive too late? We know that we can define allowed lateness, but what if any data arrives even later? And as always, the answer is – it depends. Luckily, some of the concepts relating to streaming...

Task 9 – Separating droppable data from the rest of the data processing

Under normal circumstances, data flowing in a pipeline does not change its status regarding being late, droppable, or on time. However, the exceptions to this are as follows:

  • Data could change its status if we change our WindowFn object and re-window our stream, thereby producing different points in time that define the window GC time.
  • Data could change its status if we apply logic with a more sensitive definition of droppable data – this specifically applies to @RequiresTimeSortedInput, where droppable data becomes every data element that is – at any point in time – more behind the watermark than the defined allowed lateness.

We can rephrase these conditions so that as long as we do not change the window function and do not apply logic with specific requirements, the droppable status of an element should not change between transforms. We will use this property to...

Task 10 – Separating droppable data from the rest of the data processing, part 2

First, let's rephrase our problem definition from Task 9.

Defining the problem

Create a pipeline that will separate droppable data elements from the rest of the data elements. It will send droppable data to one output topic and the rest to another topic. Make the separation work even in cases when the very first element in a particular window is droppable.

Discussing the problem decomposition

The main problem of our previous approach was that we were not able to distinguish a data element as late in the case when it was the very first data element in that particular window. Therefore, we need to be able to generate window labels prior to receiving any data for that particular window. We can do that using a technique called looping timers – that is, we set a timer and then reset it for a fixed duration in an infinite loop. If possible, we would like to align this timer with...

Using side inputs

We have already seen how to use side outputs, and side inputs are analogous to them. Besides the single main input, a ParDo transform can have multiple additional side inputs, as shown in the following figure:

Figure 3.13 – Side inputs

We have multiple ways of declaring a side input to a ParDo object. For instance, consider the following example:

ParDo.of(new MyDoFn())
Analogous to side outputs is also the way how we declare a side input – we must provide it to the ParDo by call to withSideInput as follows:
input.apply(ParDo.of(new MyDoFn())
    .withSideInput("side-input", sideInput));

Because we may have multiple side inputs, we need a way to distinguish them – if we assign a name to the side input, we can later access it easily in DoFn using a @SideInput annotation:

@ProcessElement
public void processElement(
    @Element .. element,
    ...

Summary

In this chapter, we learned about all the remaining primitive transforms. We now know the details of both the stateless and stateful ParDo objects. We know the basic life cycle of DoFn and understand the concept of bundles. We understand why input to stateful ParDo objects has to be in the form of keyed PCollection objects. We have seen and understood the details of how states and timers are managed by Beam and how they are delegated to runners in order to ensure fault tolerance. We know how a watermark propagates in transforms in general and what the (stateful) transform's input watermark and output watermark are. We have successfully used our knowledge to create our version of the GroupIntoBatches transform, which stores data into states before delegating them to an external RPC service.

Next, we focused on handling late and droppable data to be able to avoid data loss. We created one simple and one sophisticated version of a transform process to filter (split) data...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Building Big Data Pipelines with Apache Beam
Published in: Jan 2022Publisher: PacktISBN-13: 9781800564930
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Jan Lukavský

Jan Lukavský is a freelance big data architect and engineer who is also a committer of Apache Beam. He is a certified Apache Hadoop professional. He is working on open source big data systems combining batch and streaming data pipelines in a unified model, enabling the rise of real-time, data-driven applications.
Read more about Jan Lukavský