Reader small image

You're reading from  Apache Kafka Quick Start Guide

Product typeBook
Published inDec 2018
Reading LevelBeginner
PublisherPackt
ISBN-139781788997829
Edition1st Edition
Languages
Tools
Right arrow
Author (1)
Raúl Estrada
Raúl Estrada
author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada

Right arrow

Kafka Connect

In this chapter, instead of using the Kafka Java API for producers and consumers, Kafka Streams, or KSQL as in previous chapters, we are going to connect Kafka with Spark Structured Streaming, the Apache Spark solution to process streams with its Datasets API.

This chapter covers the following topics:

  • Spark Streaming processor
  • Reading Kafka from Spark
  • Data conversion
  • Data processing
  • Writing to Kafka from Spark
  • Running the SparkProcessor

Kafka Connect in a nutshell

Kafka Connect is an open source framework, part of Apache Kafka; it is used to connect Kafka with other systems, such as structured databases, column stores, key-value stores, filesystems, and search engines.

Kafka Connect has a wide range of built-in connectors. If we are reading from the external system, it is called a data source; if we are writing to the external system, it is called a data sink.

In previous chapters, we created a Java Kafka Producer that sends JSON data to a topic called healthchecks in messages like these three:

{"event":"HEALTH_CHECK","factory":"Lake Anyaport","serialNumber":"EW05-HV36","type":"WIND","status":"STARTING","lastStartedAt":"2018-09-17T11:05:26.094+0000","temperature":62.0,"ipAddress...

Project setup

The first step is to modify our Kioto project. We have to add the dependencies to build.gradle, as shown in Listing 8.1:

apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = '1.8'
mainClassName = 'kioto.ProcessingEngine'
repositories {
mavenCentral()
maven { url 'https://packages.confluent.io/maven/' }
}
version = '0.1.0'
dependencies {
compile 'com.github.javafaker:javafaker:0.15'
compile 'com.fasterxml.jackson.core:jackson-core:2.9.7'
compile 'io.confluent:kafka-avro-serializer:5.0.0'
compile 'org.apache.kafka:kafka_2.12:2.0.0'
compile 'org.apache.kafka:kafka-streams:2.0.0'
compile 'io.confluent:kafka-streams-avro-serde:5.0.0'
compile 'org.apache.spark:spark-sql_2.11:2.2.2'
compile 'org.apache...

Spark Streaming processor

Now, in the src/main/java/kioto/spark directory, create a file called SparkProcessor.java with the contents of Listing 8.2, shown as follows:

package kioto.spark;
import kioto.Constants;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.*;
import org.apache.spark.sql.types.*;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.Period;

public class SparkProcessor {
private String brokers;
public SparkProcessor(String brokers) {
this.brokers = brokers;
}
public final void process() {
//below is the content of this method
}
public static void main(String[] args) {
(new SparkProcessor("localhost:9092")).process();
}
}
Listing 8.2: SparkProcessor.java

Note that, as in previous examples, the main method invoked the process() method with the IP address and the port of the Kafka brokers.

Now, let's...

Reading Kafka from Spark

There are several connectors for Apache Spark. In this case, we are using the Databricks Inc. (the company responsible for Apache Spark) connector for Kafka.

Using this Spark Kafka connector, we can read data with Spark Structured Streaming from a Kafka topic:

 Dataset<Row> inputDataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", Constants.getHealthChecksTopic())
.load();

Simply by saying Kafka format, we can read a stream from the topic specified in the subscribe option, running on the brokers specified.

At this point in the code, if you invoke the printSchema() method on the inputDataSet, the result will be something similar to Figure 8.1:

Figure 8.1: Print schema output

We can interpret this as follows:

  • The key and the value are binary...

Data conversion

We know that when we produced the data, it was in JSON format, although Spark reads it in binary format. To convert the binary message to string, we use the following code:

Dataset<Row> healthCheckJsonDf =
inputDataset.selectExpr("CAST(value AS STRING)");

The Dataset console output is now human-readable, and is shown as follows:

+--------------------------+
| value|
+--------------------------+
| {"event":"HEALTH_CHECK...|
+--------------------------+

The next step is to provide the fields list to specify the data structure of the JSON message, as follows:

StructType struct = new StructType()
.add("event", DataTypes.StringType)
.add("factory", DataTypes.StringType)
.add("serialNumber", DataTypes.StringType)
.add("type", DataTypes.StringType)
.add("status...

Data processing

Now, what we are going to do is to calculate the uptimes. As is to be expected, Spark does not have a built-in function to calculate the number of days between two dates, so we are going to create a user-defined function.

If we remember the KSQL chapter, it is also possible to build and use new UDFs in KSQL.

To achieve this, the first thing we do is build a function that receives as input a java.sql.Timestamp, as shown in the following code (this is how timestamps are represented in the Spark DataSets) and returns an integer with the number of days from that date:

private final int uptimeFunc(Timestamp date) {
LocalDate localDate = date.toLocalDateTime().toLocalDate();
return Period.between(localDate, LocalDate.now()).getDays();
}

The next step is to generate a Spark UDF as follows:

Dataset<Row> processedDs = healthCheckDs
.withColumn( "lastStartedAt...

Writing to Kafka from Spark

As we already processed the data and calculated the uptime, now all we need to do is to write these values in the Kafka topic called uptimes.

Kafka's connector allows us to write values to Kafka. The requirement is that the Dataset to write must have a column called key and another column called value; each one can be of the type String or binary.

Since we want the machine serial number to be the key, there is no problem if it is already of String type. Now, we just have to convert the uptime column from binary into String.

We use the select() method of the Dataset class to calculate these two columns and assign them new names using the as() method, shown as follows (to do this, we could also use the alias() method of that class):

Dataset<Row> resDf = processedDs.select(
(new Column("serialNumber")).as("key"),
processedDs...

Running the SparkProcessor

To build the project, run this command from the kioto directory as follows:

$ gradle jar

If everything is OK, the output is something similar to the following:

BUILD SUCCESSFUL in 3s
1 actionable task: 1 executed
  1. From a command-line terminal, move to the Confluent directory and start it as follows:
      $ ./bin/confluent start
  1. Run a console consumer for the uptimes topic, shown as follows:
      $ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 
--topic uptimes
  1. From our IDE, run the main method of the PlainProducer built in previous chapters
  2. The output on the console consumer of the producer should be similar to the following:
{"event":"HEALTH_CHECK","factory":"Lake Anyaport","serialNumber":"EW05-HV36","type":"WIND","status":"STARTING...

Summary

If you are someone who uses Spark for batch processing, Spark Structured Streaming is a tool you should try, as its API is similar to its batch processing counterpart.

Now, if we compare Spark to Kafka for stream processing, we must remember that Spark streaming is designed to handle throughput, not latency, and it becomes very complicated to handle streams with low latency.

The Spark Kafka connector has always been a complicated issue. For example, we have to use previous versions of both, because with each new version, there are too many changes on both sides.

In Spark, the deployment model is always much more complicated than with Kafka Streams. Although Spark, Flink, and Beam can perform tasks much more complex tasks, than Kafka Streams, the easiest to learn and implement has always been Kafka.

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka Quick Start Guide
Published in: Dec 2018Publisher: PacktISBN-13: 9781788997829
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada