Reader small image

You're reading from  Building Big Data Pipelines with Apache Beam

Product typeBook
Published inJan 2022
Reading LevelBeginner
PublisherPackt
ISBN-139781800564930
Edition1st Edition
Languages
Right arrow
Author (1)
Jan Lukavský
Jan Lukavský
author image
Jan Lukavský

Jan Lukavský is a freelance big data architect and engineer who is also a committer of Apache Beam. He is a certified Apache Hadoop professional. He is working on open source big data systems combining batch and streaming data pipelines in a unified model, enabling the rise of real-time, data-driven applications.
Read more about Jan Lukavský

Right arrow

Chapter 2: Implementing, Testing, and Deploying Basic Pipelines

Now that we are familiar with the basic concept of streaming data processing, in this chapter, we will take a deep dive into how to build something practical with Apache Beam.

The purpose of this chapter is to give you some hands-on experience of solving practical problems from start to finish. The chapter will be divided into subsections, with each following the same structure:

  1. Defining a practical problem
  2. Discussing the problem decomposition (and how to solve the problem using Beam's PTransform)
  3. Implementing a pipeline to solve the defined problem
  4. Testing and validating that we have implemented our pipeline correctly
  5. Deploying the pipeline, both locally and to a running cluster

During this process (mostly at Step 2), we will discuss the various possibilities provided by Beam for addressing the problem, and we will try to highlight any caveats or common issues you might run into...

Technical requirements

In this chapter, we will be installing and setting up the development environment that we will be using throughout the book. In order to carry out the necessary installations, you will need a system capable of running Bash and Docker. All of the other necessary technologies are self-contained within minikube, which we will run on top of the system.

Throughout this chapter, we will use the code from the GitHub repository for this book:

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam

Do not forget to clone it or use the cloned version from the previous chapter.

Setting up the environment for this book

In this section, we will set up the environment needed for this chapter and the rest of the book. The technologies we will build upon are Docker and minikube.

Minikube is a local version of Kubernetes, which will enable us to easily set up the other technologies we need.

Let's set up everything we need for this chapter now:

  1. The steps to install minikube can be found at https://minikube.sigs.k8s.io/docs/start/.
  2. Next, make sure to install the kubectl tool using the official Kubernetes instructions, which can be found at https://kubernetes.io/docs/tasks/tools/.
  3. After installing minikube, we will start it by executing the following command:
    $ minikube start

    Important note

    minikube accepts as an optional parameter a configurable amount of memory and number of CPUs. The minikube start command takes the optional --cpus and --memory arguments, which can be used to tune these settings. We recommend using all of the CPUs available...

Task 1 – Calculating the K most frequent words in a stream of lines of text

In the previous chapter, we wrote a very basic pipeline that computed a simple (but surprisingly frequently used) functionality. The pipeline computed the number of occurrences of a word in a text document. We then transformed this to a data stream of lines, which was generated by a TestStream utility.

In the first task of this chapter, we want to extend this simple pipeline to be able to calculate and output only the K most frequent words in a stream of lines. So, let's first define the problem.

Defining the problem

Given an input data stream of lines of text, calculate the K most frequent words within a fixed time window of T seconds.

There are many practical applications for solving this problem. For example, if we had a store, we might want to compute daily statistics to find the products with the maximum profit. However, we have chosen the example of counting words in a text stream...

Task 2 – Calculating the maximal length of a word in a stream

This is a similar example. In the previous task, we wanted to calculate the K most frequent words in a stream for a fixed time window. How would our solution change if our task was to calculate this from the beginning of the stream? Let's define the problem.

Defining the problem

Given an input data stream of lines of text, calculate the longest word ever seen in this stream. Start with an empty word value; once a longer word is seen, immediately output the new longest word.

Discussing the problem decomposition

Although the logic seems to be similar to the previous task, it can be simplified as follows:

Figure 2.3 – The problem decomposition

Note, there are two main differences from the previous task:

  • We must compute the word with the longest length; although this could be viewed as a Top transform, with K equal to one, Beam has a specific transform for that...

Specifying the PCollection Coder object and the TypeDescriptor object

The PCollection name is an abbreviation of parallel collection, which suggests that it is a collection of elements that are somewhat distributed among multiple workers. And that is exactly what a parallel collection is. To be able to communicate with the individual elements of this collection, each element needs a serialized representation. That is, we need a piece of code that takes a raw (in-memory) object and produces a byte representation that can be sent over the wire. After receiving on the remote side, we need another piece of code that will take this byte representation and recreate the original in-memory object (or rather, a copied version of the original). And that is exactly what coders are for.

We have already used Coder in our test cases. Recall how we constructed our TestStream object:

TestStream.create(StringUtf8Coder.of())

The reason we need to specify a Coder object here is that every PCollection...

Understanding default triggers, on time, and closing behavior

As we have seen, when specifying a PTransform window, which is necessary for all grouping operations, we may optionally specify a triggering. We explored this concept in the theoretical part of Chapter 1, Introduction to Data Processing with Apache Beam. Here, we will focus specifically on understanding how Beam interprets triggers and when the output is triggered.

The simplest trigger we can specify is the AfterWatermark.pastEndOfWindow() trigger, which simply means trigger the output once the window has completed. That is, once the watermark passes the end timestamp of each particular window. We have already seen that each window has such an end timestamp, including the global window, which has a timestamp set in the very distant future.

A question we might ask is, which trigger will be used if we create a PTransform window without specifying a trigger? The answer is DefaultTrigger. How should this trigger be defined...

Introducing the primitive PTransform object – Combine

So far, we have seen three grouping (stateful) transformations: Count, Top, and Max. None of these are actually primitive transformations. A primitive transformation is defined as a transformation that needs direct support from a runner and cannot be executed via other transformations. The Combine object is actually the first primitive PTransform object that we are going to introduce. Beam actually has only five primitive PTransform objects, and we will walk through all of them in this chapter. We call non-primitive PTransform objects composite transformations.

The Combine PTransform object generally performs a reduction operation on a PCollection object. As the name suggests, the transform combines multiple input elements into a single output value per window (Combine.globally) or per key and window (Combine.perKey). This reduction is illustrated by the following figure:

Figure 2.6 –...

Task 3 – Calculating the average length of words in a stream

In this task, we will investigate how we can use CombineFn and accumulators to compute a directly non-combinable reduction and average. Let's see how this works.

Defining the problem

Given an input data stream of lines of text, calculate the average length of words currently seen in this stream. Output the current average as frequently as possible, ideally after every word.

Discussing the problem decomposition

Calculating an average is not a directly combinable function. An average of averages is not a proper average of the original data. However, we can calculate an average using an accumulator. An accumulator would be a pair of (sum, count) and the output will be extracted using a function that divides the sum by the count. We can illustrate this with Figure 2.9:

Figure 2.9 – Calculating an average using CombineFn

We will need to create an accumulator object for...

Task 4 – Calculating the average length of words in a stream with fixed lookback

In this section, we will focus on using a different kind of window – a sliding window. Let's see what we can do with them.

Defining the problem

Given an input data stream of lines of text, calculate the average length of the words seen in this stream during the last 10 seconds and output the result every 2 seconds.

Discussing the problem decomposition

This is actually very similar to Task 3. However, what we need to do is apply a different Window transform. What we need is a sliding window with a length of 10 seconds and a slide interval of 2 seconds, as this will produce the output we want.

Implementing the solution

The solution to this task can be found in the com.packtpub.beam.chapter2.SlidingWindowWordLength class.

The modification to the code from the previous task is just the different Window transform:

words
  .apply(
    Window...

Ensuring pipeline upgradability

First, be aware that Beam (currently, as of version 2.28.0) does not offer an abstraction that would allow us to transfer a pipeline between runners including its state. The code of the pipeline can be transferred, but that means a new pipeline will be created and that any computation done on the previous runner is lost. That is due to the fact that, currently, the processing of pipeline upgrades is a runner-specific task, and details might therefore differ slightly based on which runner we choose.

That is the bad news. The good news is that the pipeline upgrade process is generally subject to the same constraints that are mostly runner independent and, therefore, the chances are high that very similar rules will apply to the majority of runners.

Let's look at the tasks that a runner must perform to upgrade a pipeline:

  1. The complete state (including the timers) of all transforms must be stored in a durable and persistent location. This...

Task 5 – Calculating performance statistics for a sport activity tracking application

Let's explore the most useful applications of stream processing – the delivery of high-accuracy real-time insights to (possibly) high-volume data streams. As an example, we will borrow a use case known to almost everyone – calculating performance statistics (for example, speed and total distance) from a stream of GPS coordinates coming from a sport activity tracker!

Defining the problem

Given an input data stream of quadruples (workoutId, gpsLatitude, gpsLongitude, and timestamp) calculate the current speed and the total tracked distance of the tracker. The data comes from a GPS tracker that sends data only when its user starts a sport activity. We can assume that workoutId is unique and contains a userId value in it.

Let's describe the problem more informally. Suppose we have a stream that looks as follows:

(user1:track1, 65.5384, -19.9108, 1616427100000...

Introducing the primitive PTransform object – GroupByKey

As we have seen, a GroupByKey transform works in the way illustrated in the following figure:

Figure 2.14 – GroupByKey

As in the case of Combine PTransform objects, the input stream must be keyed. This is a way of saying that the PCollection must have elements of the KV type. This is generally true for any stateful operations. The reason for this is that having a state (which cannot be partitioned) divided into smaller, independent sub-states means that it cannot scale and would therefore lead to scalability issues. Therefore, Beam explicitly prohibits this and enforces the use of keyed PCollections for the input of each stateful operation.

The GroupByKey transform then takes this keyed stream (in Figure 2.14, the key is represented as the shape of the stream element) and creates something that can be viewed as a sub-stream for each key. We can then process elements with a different...

Introducing the primitive PTransform object – Partition

The GroupByKey transform creates a set of sub-streams based on a dynamic property of the data – the set of keys of a particular window can be modified during the pipeline execution time. New keys can be created and processed at any time. This creates the complexity mentioned in the previous section – we need to store our data in keyed states and flush them on triggers. A question we might have is – would the task be easier if we knew the exact set of keys upfront, during pipeline construction time?

The answer is yes, and that is why we have a PTransform object called Partition.

Important note

A pipeline is generally divided into three phases during its life cycle: pipeline compile time, pipeline construction time, and pipeline execution time. Compile time refers (as usual) to the time we compile the source to bytecode. Construction time is the time when the pipeline's DAG of transformations...

Summary

In this chapter, we first walked through the steps needed to set up our environment to run the code located at this book's GitHub. We created a minikube cluster and ran Apache Kafka and Apache Flink on top of it. We then found out how to use the scripts located on GitHub to create topics in Kafka and publish messages to them, and how to consume data from topics.

After we walked through the necessary infrastructure, we jumped directly into implementing various practical tasks. The first one was to calculate the K most frequent words in a stream of text lines. In order to accomplish this, we learned how to use the Count and Top transforms. We also learned how to use the TestStream utility to create a simulated stream of input data and use this to write a test case that validates our pipeline implementation. Then, we learned how to deploy our pipeline to a real runner – Apache Flink.

We then got acquainted with another grouping transform – Max, which we...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Building Big Data Pipelines with Apache Beam
Published in: Jan 2022Publisher: PacktISBN-13: 9781800564930
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Jan Lukavský

Jan Lukavský is a freelance big data architect and engineer who is also a committer of Apache Beam. He is a certified Apache Hadoop professional. He is working on open source big data systems combining batch and streaming data pipelines in a unified model, enabling the rise of real-time, data-driven applications.
Read more about Jan Lukavský