Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Building Big Data Pipelines with Apache Beam

You're reading from  Building Big Data Pipelines with Apache Beam

Product type Book
Published in Jan 2022
Publisher Packt
ISBN-13 9781800564930
Pages 342 pages
Edition 1st Edition
Languages
Author (1):
Jan Lukavský Jan Lukavský
Profile icon Jan Lukavský

Table of Contents (13) Chapters

Preface Section 1 Apache Beam: Essentials
Chapter 1: Introduction to Data Processing with Apache Beam Chapter 2: Implementing, Testing, and Deploying Basic Pipelines Chapter 3: Implementing Pipelines Using Stateful Processing Section 2 Apache Beam: Toward Improving Usability
Chapter 4: Structuring Code for Reusability Chapter 5: Using SQL for Pipeline Implementation Chapter 6: Using Your Preferred Language with Portability Section 3 Apache Beam: Advanced Concepts
Chapter 7: Extending Apache Beam's I/O Connectors Chapter 8: Understanding How Runners Execute Pipelines Other Books You May Enjoy

Chapter 6: Using Your Preferred Language with Portability

In the previous chapters, we focused on the Java SDK – or various Java SDK-based DSLs – but what if we want to implement our data transformation logic in a completely different language, such as Python or Go? One of the main goals of Apache Beam is portability. We have already seen the portability of pipelines between different Runners and between batch and streaming semantics. In this chapter, we will explore the last aspect of portability – portability between SDKs.

We will outline how the portability layer works (Apache Beam often calls it the Fn API – pronounced Fun API) so that the result is portable. The desired goal is to enable Runners so that they don't have to understand the SDK (the language we want to use to implement our pipeline), yet can still execute it successfully. That way, new SDKs can be created without us needing to make modifications to the currently existing Runners...

Technical requirements

The toolset we are using will only change slightly in this chapter. Besides the classical Docker and minikube, we will need to install Python3. Let's take a look:

  1. Python can be installed by following the instructions at https://realpython.com/installing-python/.
  2. We will also need to install the apache_beam package using pip:
    $ python3 =m pip install apache_beam

    If something goes wrong, make sure you have the latest version of pip by using the following command:

    $ python3 =m pip install --upgrade pip
  3. The highest fully Apache Beam-supported version of Python is 3.7. If you have a higher version installed and do not want to downgrade, you can use the pack-beam pod in minikube, which has the correct Python version bundled with it. All the examples in this chapter can be run using the following command:
    $ kubectl exec -t packt-beam -- \
        /usr/local/bin/<name_of_script.py>

You will also need to have a basic understanding...

Introducing the portability layer

In this section, we will walk through the design of the portability layer – the FnAPI – to understand which components are orchestrated together to allow pipelines to be executed from different SDKs on the same Runner.

First, let's see how the whole portability layer works. This concept is illustrated in the following (somewhat simplified) diagram:

Figure 6.1 – The portability layer architecture

As we can see, the architecture consists of two types of components – Apache Beam components and Runner components. In this case, a Runner is a piece of technology that performs the actual execution – it may be Apache Flink, Apache Spark, Google Cloud Dataflow, or any other supported Runner. Each of these Runners typically has a coordinator that needs to receive a job submission and use this submission to create work for worker nodes. By doing this, it can orchestrate its execution. This coordinator...

Implementing our first pipelines in the Python SDK

In this section, we will learn the basics of the Python SDK. Namely, we will learn how to create the pipeline, how to run it using DirectRunner, and how to test our pipelines. Again, our very first pipeline will take our well-known input file, called lorem.txt, which we used in Chapter 1, Introducing Data Processing with Apache Beam, and output the number of occurrences of each word present in the file. So, let's dive into the Python SDK.

Implementing our first Python pipeline

The source code can be found in the first_pipeline.py script, which is located in chapter6/src/main/python/. Let's get started:

  1. The script uses the name of its file as an argument, so we can run it locally using the following command:
    $ chapter6/src/main/python/first_pipeline.py \
        chapter1/src/main/resources/lorem.txt
  2. After running the preceding command, we will see the usual output, which consists of a word...

Task 17 – Implementing MaxWordLength in the Python SDK

We will use the well-known examples, which have mostly been implemented using the Java SDK, from Chapter 2, Implementing, Testing, and Deploying Basic Pipelines, and Chapter 3, Implementing Pipelines Using Stateful Processing. We will also build on our knowledge from Chapter 4, Structuring Code for Reusability, regarding using user-defined PTransforms for better reusability and testing.

Our first complete task will be the task we implemented as Task 2 in Chapter 2, Implementing, Testing, and Deploying Basic Pipelines, but as always, for clarity, we will restate the problem here.

Problem definition

Given an input data stream of lines of text, calculate the longest word found in this stream. Start with an empty word; once a longer word is seen, output the newly found candidate.

Problem decomposition discussion

From a logical perspective, this problem is the same as in the case of Task 2. So, let's focus...

Python SDK type hints and coders

Python3 can provide type hints on functions, as shown in the following code:

def toKv(s: str) -> beam.typehints.KV[bytes, bytes]:
  return ("".encode("utf-8"), s.encode("utf-8"))

The previous code defines a method called toKv that takes a string (str) as input and outputs an object that's compatible with beam.typehints.KV[bytes, bytes]. When we use such a function in a simple transform such as beam.Map(toKv), Beam can infer the type of the resulting PCollection and can automatically use a special-purpose coder instead of pickle. In the case of bytes, this would be ByteArrayCoder.

Besides declaring type hints to mapping functions, we can use a decorator for DoFns, which will declare the (input or output) type hint explicitly for the whole transform:

@beam.typehints.with_input_types(
    beam.typehints.Tuple[
        str, beam.typehints...

Task 18 – Implementing SportTracker in the Python SDK

This task will be a reimplementation of Task 5 from Chapter 2, Implementing, Testing, and Deploying Basic Pipelines. Again, for clarity, let's restate the problem definition.

Problem definition

Given an input data stream of quadruples (workoutId, gpsLatitude, gpsLongitude, and timestamp), calculate the current speed and total tracked distance. The data comes from a GPS tracker that sends data only when the user starts a sports activity. We can assume that workoutId is unique and contains userId in it.

The caveats of the implementation are the same as what we discussed in the original Task 5, so we'll skip to its Python SDK implementation right away.

Solution implementation

The complete implementation can be found in the source code for of this chapter, in chapter6/src/main/python/sport_tracker.py. The logic is concentrated in two functions – SportTrackerCalc and computeMetrics:

  1. The...

Task 19 – Implementing RPCParDo in the Python SDK

This task will be a reimplementation of Task 8 from Chapter 3, Implementing Pipelines using Stateful Processing. We will use a stateful DoFn to batch the elements for a defined maximal amount of time. As always, we will restate the problem again for clarity.

Problem definition

Use a given RPC service to augment data in the input stream using batched RPCs with batches whose size are about K elements. Also, resolve the batch after, at most, time T to avoid (possibly) an infinitely long waiting time for elements in small batches.

As in the previous task, we will skip the discussion of the problem's decomposition as we discussed that when we implemented Task 8. Instead, we will jump directly into its implementation using the Python SDK.

Solution implementation

The implementation can be found in chapter6/src/main/python/rpc_par_do.py. It can be broken down into the RPCParDoStateful transform, which is declared...

Task 20 – Implementing SportTrackerMotivation in the Python SDK

The last task we will implement in this chapter is a well-known task that we have used multiple times in Chapter 4, Structuring Code for Reusability – for example, in Task 11. First, let's restate the problem definition.

Problem definition

Calculate two per-user running averages over the stream of artificial GPS coordinates that were generated for Task 5. One computation will be the average pace over a longer (5-minute) interval, while the other will be over a shorter (1-minute) interval. Every minute, for each user, output information will be provided about whether the user's current 1-minute pace is over or under the longer average if the short average differs by more than 10%.

We implemented this task in several versions while using a playground to demonstrate various aspects of the Java SDK. In this case, we will implement only one version and use the CoGroupByKey transform to join...

Using the DataFrame API

For those who are familiar with Python's pandas package, it might be interesting to know that Apache Beam has a pandas-compatible API. It is called the DataFrame API, and we will briefly introduce it here. We will not walk through the details of the pandas API itself; it can easily be found online. Instead, we will explain how to use it and how to switch between the DataFrame API and the classical PCollection API.

The basic idea behind a DataFrame (both in Beam and in pandas) is that a data point can be viewed as a row in a table, where each row can have multiple fields (columns). Each field has an associated name and data type. Not every row (data point) has to have the same set of fields.

We can either use the DataFrame API directly from the beginning or swap between the classical API and the DataFrame API, depending on the situation and which API gives more readable code.

We'll start by introducing the first option – creating a DataFrame...

Interactive programming using InteractiveRunner

The Python SDK lets us develop pipelines in Read-Evaluate-Print-Loop (REPL) fashion. This is especially useful for various data science tools, such as Python notebooks. This book focuses on the data engineering part, so we will not install the complete notebook. Instead, will use a command-line utility. This should be able to demonstrate the benefits of interactive programming.

We will run IPython for a better user experience by using the following command:

$ kubectl exec -it packt-beam-5686785d65-2ww5m -- /bin/bash -c "python3 \'which ipython3\'"

This will create an IPython console whose prompt looks like this:

Python 3.7.12 (default, Sep  8 2021, 01:20:16) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.27.0 -- An enhanced Interactive Python. Type '?' for help.
 
In [1]:

Now, we can start REPL coding. We have included a sample...

Introducing and using cross-language pipelines

Cross-language pipelines are a natural concept that comes with Beam's portability. Every executed PTransform in a pipeline has an associated environment. This environment describes how (DOCKER, EXTERNAL, PROCESS) and what (the Python SDK, Java SDK, Go SDK, and so on) should be executed by the Runner so that the pipeline behaves as intended by the pipeline author. Most of the time, all PTransforms in a single pipeline share the same SDK and the same environment. This doesn't necessarily have to be a rule and – when we view this via the optics of the Runner only, the Runner does not care if it executes a Python transform or a Java transform. The Runner code is already written in an (SDK) language-agnostic way, so it should not make any difference.

The first thing we must understand is how is the portable pipeline is represented. When an SDK builds and starts to execute a pipeline, it first compiles it into a portable...

Summary

In this chapter, we looked at the general design of Apache Beam's portability layer. We understood how this layer is designed so that both Runners and various SDKs can be developed independently so that once a portable Runner is implemented, it should be capable of running any SDK, even if the SDK did not exist at the time the Runner was implemented.

We then had a deep dive into the Python SDK, which builds heavily on the portability layer. We saw that the core Apache Beam model concepts are mirrored by all SDKs. Not all SDKs have the same set of features at the moment, but the set of supported features should converge over time.

We reimplemented some of our well-known examples from the Java SDK into the Python SDK to learn how to write and submit pipelines to a portable Runner – we used FlinkRunner for this, and we will continue to do so for the rest of this book. Next, we explored interactive programming using InteractiveRunner and Python notebooks. We saw...

lock icon The rest of the chapter is locked
You have been reading a chapter from
Building Big Data Pipelines with Apache Beam
Published in: Jan 2022 Publisher: Packt ISBN-13: 9781800564930
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}