Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Apache Kafka Quick Start Guide

You're reading from  Apache Kafka Quick Start Guide

Product type Book
Published in Dec 2018
Publisher Packt
ISBN-13 9781788997829
Pages 186 pages
Edition 1st Edition
Languages
Author (1):
Raúl Estrada Raúl Estrada
Profile icon Raúl Estrada

Table of Contents (10) Chapters

Preface Configuring Kafka Message Validation Message Enrichment Serialization Schema Registry Kafka Streams KSQL Kafka Connect Other Books You May Enjoy

Kafka Streams

In this chapter, instead of using the Kafka Java API for producers and consumers as in previous chapters, we are going to use Kafka Streams, the Kafka module for stream processing.

This chapter covers the following topics:

  • Kafka Streams in a nutshell
  • Kafka Streams project setup
  • Coding and running the Java PlainStreamsProcessor
  • Scaling out with Kafka Streams
  • Coding and running the Java CustomStreamsProcessor
  • Coding and running the Java AvroStreamsProcessor
  • Coding and running the Late EventProducer
  • Coding and running the Kafka Streams processor

Kafka Streams in a nutshell

Kafka Streams is a library and part of Apache Kafka, used to process streams into and from Kafka. In functional programming, there are several operations over collections, such as the following:

  • filter
  • map
  • flatMap
  • groupBy
  • join

The success of streaming platforms such as Apache Spark, Apache Flink, Apache Storm, and Akka Streams is to incorporate these stateless functions to process data streams. Kafka Streams provides a DSL to incorporate these functions to manipulate data streams.
Kafka Streams also has stateful transformations; these are operations related to the aggregation that depend on the state of the messages as a group, for example, the windowing functions and support for late arrival data. Kafka Streams is a library, and this means that Kafka Streams applications can be deployed by executing your application jar. There is no need to deploy...

Project setup

The first step is to modify the kioto project. We have to add the dependencies to build.gradle, as shown in Listing 6.1:

apply plugin: 'java'
apply plugin: 'application'

sourceCompatibility = '1.8'

mainClassName = 'kioto.ProcessingEngine'

repositories {
mavenCentral()
maven { url 'https://packages.confluent.io/maven/' }
}

version = '0.1.0'

dependencies {
compile 'com.github.javafaker:javafaker:0.15'
compile 'com.fasterxml.jackson.core:jackson-core:2.9.7'
compile 'io.confluent:kafka-avro-serializer:5.0.0'
compile 'org.apache.kafka:kafka_2.12:2.0.0'
compile 'org.apache.kafka:kafka-streams:2.0.0'
compile 'io.confluent:kafka-streams-avro-serde:5.0.0'
}

jar {
manifest {
attributes 'Main-Class': mainClassName
} from {
configurations.compile...

Java PlainStreamsProcessor

Now, in the src/main/java/kioto/plain directory, create a file called PlainStreamsProcessor.java with the contents of Listing 6.2, shown as follows:

import ...
public final class PlainStreamsProcessor {
private final String brokers;
public PlainStreamsProcessor(String brokers) {
super();
this.brokers = brokers;
}
public final void process() {
// below we will see the contents of this method
}
public static void main(String[] args) {
(new PlainStreamsProcessor("localhost:9092")).process();
}
}
Listing 6.2: PlainStreamsProcessor.java

All the magic happens inside the process() method. The first step in a Kafka Streams application is to get a StreamsBuilder instance, as shown in the following code:

StreamsBuilder streamsBuilder = new StreamsBuilder();

The StreamsBuilder is an object that allows building a topology. A topology...

Running the PlainStreamsProcessor

To build the project, run this command from the kioto directory:

$ gradle build

If everything is correct, the output is something like the following:

BUILD SUCCESSFUL in 1s
6 actionable task: 6 up-to-date
  1. The first step is to run a console consumer for the uptimes topic, shown in the following code snippet:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic uptimes --property print.key=true
  1. From the IDE, run the main method of the PlainStreamsProcessor
  2. From the IDE, run the main method of the PlainProducer (built in previous chapters)
  3. The output on the console consumer for the uptimes topic should be similar to the following:
EW05-HV36 33
BO58-SB28 20
DV03-ZT93 46
...

Scaling out with Kafka Streams

To scale out the architecture as promised, we must follow these steps:

  1. Run a console consumer for the uptimes topic, shown as follows:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic uptimes --property print.key=true
  1. Run the application jar from the command line, shown in the following code:
$ java -cp ./build/libs/kioto-0.1.0.jar 
kioto.plain.PlainStreamsProcessor

This is when we verify that our application really scales out.

  1. From a new command-line window, we execute the same command, shown as follows:
$ java -cp ./build/libs/kioto-0.1.0.jar 
kioto.plain.PlainStreamsProcessor

The output should be something like the following:

2017/07/05 15:03:18.045 INFO ... Setting newly assigned 
partitions [healthchecks-2, healthchecks -3]

If we remember the theory of Chapter 1, Configuring Kafka, when we created our topic, we specified...

Java CustomStreamsProcessor

Summing up what has happened so far, in previous chapters we saw how to make a producer, a consumer, and a simple processor in Kafka. We also saw how to do the same with a custom SerDe, how to use Avro, and the Schema Registry. So far in this chapter, we have seen how to make a simple processor with Kafka Streams.

In this section, we will use all our knowledge so far to build a CustomStreamsProcessor with Kafka Streams to use our own SerDe.


Now, in the src/main/java/kioto/custom directory, create a file called CustomStreamsProcessor.java with the contents of Listing 6.3, shown as follows:

import ...
public final class CustomStreamsProcessor {
private final String brokers;
public CustomStreamsProcessor(String brokers) {
super();
this.brokers = brokers;
}
public final void process() {
// below we will see the contents of this method
}
public...

Running the CustomStreamsProcessor

To build the project, run this command from the kioto directory:

$ gradle build

If everything is correct, the output is something like the following:

BUILD SUCCESSFUL in 1s
6 actionable task: 6 up-to-date
  1. The first step is to run a console consumer for the uptimes topic, shown as follows:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic uptimes --property print.key=true
  1. From our IDE, run the main method of the CustomStreamsProcessor
  2. From our IDE, run the main method of the CustomProducer (built in previous chapters)
  3. The output on the console consumer for the uptimes topic should be similar to the following:
      EW05-HV36 33
BO58-SB28 20
DV03-ZT93 46
...

Java AvroStreamsProcessor

In this section we will see how to use all this power gathered together: Apache Avro, Schema Registry, and Kafka Streams.

Now, we are going to use Avro format in our messages, as we did in previous chapters. We consumed this data by configuring the Schema Registry URL and using the Kafka Avro deserializer. For Kafka Streams, we need to use a Serde, so we added the dependency in the Gradle build file, given as follows:

compile 'io.confluent:kafka-streams-avro-serde:5.0.0'

This dependency has the GenericAvroSerde and specific avroSerde explained in previous chapters. Both Serde implementations allow us to work with Avro records.

Now, in the src/main/java/kioto/avro directory, create a file called AvroStreamsProcessor.java with the contents of Listing 6.4, shown as follows:

import ...
public final class AvroStreamsProcessor {
private final...

Running the AvroStreamsProcessor

To build the project, run this command from the kioto directory:

$ gradle build

If everything is correct, the output is something like the following:

BUILD SUCCESSFUL in 1s
6 actionable task: 6 up-to-date
  1. The first step is to run a console consumer for the uptimes topic, shown as follows:
      $ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic uptimes --property print.key=true
  1. From our IDE, run the main method of the AvroStreamsProcessor
  2. From our IDE, run the main method of the AvroProducer (built in previous chapters)
  3. The output on the console consumer for the uptimes topic should be similar to the following:
       EW05-HV36 33
BO58-SB28 20
DV03-ZT93 46
...

Late event processing

Previously, we talked about message processing, but now we will talk about events. An event in this context is something that happens at a particular time. An event is a message that happens at a point in time.

In order to understand events, we have to know the timestamp semantics. An event always has two timestamps, shown as follows:

  • Event time: The point in time when the event happened at the data source
  • Processing time: The point in time when the event is processed in the data processor

Due to limitations imposed by the laws of physics, the processing time will always be subsequent to and necessarily different from the event time, for the following reasons:

  • There is always network latency: The time to travel from the data source to the Kafka broker cannot be zero.
  • The client could have a cache: If the client cached some events before, send them to...

Basic scenario

To explain late events, we need a system where the events arrive periodically and we want to know how many events are produced by unit of time. In Figure 6.1, we show this scenario:


Figure 6.1: The events as they were produced

In the preceding figure, each marble represents an event. They are not supposed to have dimensions as they are at a specific point in time. Events are punctual, but for demonstration purposes, we represent them as balls. As we can see in t1 and t2, two different events can happen at the same time.

In our figure, tn represents the nth time unit. Each marble represents a single event. To differentiate between them, the events on t1 have one stripe, the events on t2 have two stripes, and the events on t3 have three stripes.

We want to count the events per unit of time, so we have the following:

  • t1 has six events
  • t2 has four events
  • t3 has three...

Late event generation

To test the Kafka Streams solution for late events, the first thing we need is a late event generator.

To simplify things, our generator will constantly send events at a fixed rate. And from time to time, it will generate a late event. The generator generates events with the following process:

  • Each window is 10 seconds long
  • It produces one event every second
  • The event should be generated in 54th second of each minute, and will be delayed by 12 seconds; that is, it will arrive in the sixth second of the next minute (in the next window)

When we say that the window is of 10 seconds, we mean that we will make aggregations every 10 seconds. Remember that the objective of the test is that the late events are counted in the correct window.

Create the src/main/java/kioto/events directory and, inside it, create a file called EventProducer.java with the contents...

Running the EventProducer

To run the EventProducer, we follow these steps:

  1. Create the events topic, as shown in the following block:
$. /bin/kafka-topics --zookeeper localhost:2181 --create --topic 
events --replication-factor 1 --partitions 4
  1. Run a console consumer for the events topic using the following command:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic events
  1. From the IDE, run the main method of the EventProducer.
  1. The output on the console consumer for the events topic should be similar to the following:
1532529060000,47, on time
1532529060000,48, on time
1532529060000,49, on time
1532529070000,50, on time
1532529070000,51, on time
1532529070000,52, on time
1532529070000,53, on time
1532529070000,55, on time
1532529070000,56, on time
1532529070000,57, on time
1532529070000,58, on time
1532529070000,59, on time
1532529080000,0, on time
1532529080000...

Kafka Streams processor

Now, let's solve the problem of counting how many events are in each window. For this, we will use Kafka Streams. When we do this type of analysis, it is said that we are doing streaming aggregation.

In the src/main/java/kioto/events directory, create a file called EventProcessor.java with the contents of Listing 6.6, shown as follows:

package kioto.events;
import ...
public final class EventProcessor {
private final String brokers;
private EventProcessor(String brokers) {
this.brokers = brokers;
}
private void process() {
// ...
}
public static void main(String[] args) {
(new EventProcessor("localhost:9092")).process();
}
}
Listing 6.6: EventProcessor.java

All the processing logic is contained in the process() method. The first step is to create a StreamsBuilder to create the KStream, shown as follows:

StreamsBuilder streamsBuilder...

Running the Streams processor

To run the EventProcessor, follow these steps:

  1. Create the aggregates topic as follows:
$. /bin/kafka-topics --zookeeper localhost:2181 --create --topic 
aggregates --replication-factor 1 --partitions 4
  1. Run a console consumer for the aggregates topic, as follows:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic aggregates --property print.key=true
  1. From the IDE, run the main method of the EventProducer.
  2. From the IDE, run the main method of the EventProcessor.
  3. Remember that it writes to the topic every 30 seconds. The output on the console consumer for the aggregates topic should be similar to the following:
1532529050000 10
1532529060000 10
1532529070000 9
1532529080000 3

After the second window, we can see that the values in the KTable are updated with fresh (and correct) data, shown as follows:

1532529050000 10
1532529060000...

Stream processor analysis

If you have a lot of questions here, it is normal.

The first thought to consider is that in streaming aggregation, and in streaming in general, the Streams are unbounded. It is never clear when we will take the final results, that is, we as programmers have to decide when to consider a partial value of an aggregation as a final result.

Recall that the print of the Stream is an instant photo of the KTable at a certain time. Therefore, the results of a KTable are only valid at the time of the output. It is important to remember that in the future, the values of the KTable may be different. Now, to see results more frequently, change the value of the commit interval to zero, shown as follows:

props.put("commit.interval.ms", 0);

This line says that the results of the KTable will be printed when they are modified, that is, it will print new values...

Summary

Kafka Streams is a powerful library, and is the only option when building data pipelines with Apache Kafka. Kafka Streams removes much of the boilerplate work needed when implementing plain Java clients. Compared to Apache Spark or Apache Flink, the Kafka Streams applications are much simpler to build and manage.

We also have seen how to improve a Kafka Streams application to deserialize data in JSON and Avro formats. The serialization part (writing to a topic) is very similar since we are using SerDes that are capable of both data serialization and deserialization.

For those who work with Scala, there is a library for Kafka Streams called circe that offers SerDes to manipulate JSON data. The circe library is the equivalent in Scala of the Jackson library.

As mentioned earlier, Apache Beam has a more complex suite of tools, but is totally focused on Stream management...

lock icon The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka Quick Start Guide
Published in: Dec 2018 Publisher: Packt ISBN-13: 9781788997829
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}