Reader small image

You're reading from  Apache Kafka Quick Start Guide

Product typeBook
Published inDec 2018
Reading LevelBeginner
PublisherPackt
ISBN-139781788997829
Edition1st Edition
Languages
Tools
Right arrow
Author (1)
Raúl Estrada
Raúl Estrada
author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada

Right arrow

Schema Registry

In the previous chapter, we saw how to produce and consume data in JSON format. In this chapter, we will see how to serialize the same messages with Apache Avro.

This chapter covers the following topics:

  • Avro in a nutshell
  • Defining the schema
  • Starting the Schema Registry
  • Using the Schema Registry
  • How to build a Java AvroProducer, a consumer, and a processor
  • How to run the Java AvroProducer and the processor

Avro in a nutshell

Apache Avro is a binary serialization format. The format is schema-based so, it depends on the definition of schemas in JSON format. These schemas define which fields are mandatory and their types. Avro also supports arrays, enums, and nested fields.

One major advantage of Avro is that it supports schema evolution. In this way, we can have several historical versions of the schema.

Normally, the system must adapt to the changing needs of the business. For this reason, we can add or remove fields from our entities, and even change the data types. To support forward or backward compatibility, we must consider which fields are indicated as optional.

Because Avro converts the data into arrays of bytes (serialization), and Kafka's messages are also sent in binary data format, with Apache Kafka, we can send messages in Avro format. The real question is, where...

Defining the schema

The first step is to define the Avro schema. As a reminder, our HealthCheck class looks like Listing 5.1:

public final class HealthCheck {
private String event;
private String factory;
private String serialNumber;
private String type;
private String status;
private Date lastStartedAt;
private float temperature;
private String ipAddress;
}
Listing 5.1: HealthCheck.java

Now, with the representation of this message in Avro format, the schema (that is, the template) of all the messages of this type in Avro would be Listing 5.2:

{
"name": "HealthCheck",
"namespace": "kioto.avro",
"type": "record",
"fields": [
{ "name": "event", "type": "string" },
{ "name": "factory", "type": "string" },
{ "name": "serialNumber...

Starting the Schema Registry

Well, we have our Avro schema; now, we need to register it in the Schema Registry.
When we start the Confluent Platform, the Schema Registry is started, as shown in the following code:

$./bin/confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

If we want just to start the Schema Registry, we need to run the following command:

$./bin/schema-registry-start etc/schema-registry/schema-registry.properties

The output is similar to the one shown here:

...
[2017-03-02 10:01:45,320] INFO Started NetworkTrafficServerConnector@2ee67803{HTTP/1.1,[http/1.1]}{0.0.0.0:8081}

Using the Schema Registry

Now, the Schema Registry is running on port 8081. To interact with the Schema Registry, there is a REST API. We can access it with curl. The first step is to register a schema in the Schema Registry. To do so, we have to embed our JSON schema in another JSON object, and we have to escape some special characters and add a payload:

  • At the beginning, we have to add { \"schema\": \"
  • All the double quotation marks (") should be escaped with a backslash (\")
  • At the end, we have to add \" }

Yes, as you can guess, the API has several commands to query the Schema Registry.

Registering a new version of a schema under a – value subject

To register the Avro schema healthcheck...

Java AvroProducer

Now, we should modify our Java Producer to send messages in Avro format. First, it is important to mention that in Avro there are two types of messages:

  • Specific records: The file with the Avro schema (avsc) is sent to a specific Avro command to generate the corresponding Java classes.
  • Generic records: In this approach, a data structure similar to a map dictionary is used. This means that you set and get the fields by their names and you must know their corresponding types. This option is not type-safe, but it offers much more flexibility than the other, and here the versions are much easier to manage over time. In this example, we will use this approach.

Before we start with the code, remember that in the last chapter we added the library to support Avro to our Kafka client. If you recall, the build.gradle file has a special repository with all this libraries...

Running the AvroProducer

To build the project, run the following command from the kioto directory:

$ gradle jar

If everything is OK, the output is something like the one shown here:

BUILD SUCCESSFUL in 3s
1 actionable task: 1 executed
  1. If it is not running yet, go to Confluent's directory and start it:
$ ./bin/confluent start
  1. The broker is running on port 9092. To create the healthchecks-avro topic, execute the following command:
$ ./bin/kafka-topics --zookeeper localhost:2181 --create --topic healthchecks-avro --replication-factor 1 --partitions 4
  1. Note that we are just creating a normal topic and nothing indicates the messages' format.
  2. Run a console consumer for the healthchecks-avro topic:
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic healthchecks-avro
  1. From our IDE, run the main method of the AvroProducer.
  2. The output on the console...

Java AvroConsumer

Let's create a Kafka AvroConsumer that we will use to receive the input records. As we already know, there are two prerequisites that all the Kafka Consumers should have: to be a KafkaConsumer and to set specific properties, such as in Listing 5.5:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class AvroConsumer {
private Consumer<String, GenericRecord> consumer; //1
public AvroConsumer(String brokers, String schemaRegistryUrl) { //2
Properties props = new Properties();
props.put("group.id", "healthcheck-processor");
props.put("bootstrap.servers", brokers);
props.put...

Java AvroProcessor

Now, in the src/main/java/kioto/avro directory, create a file called AvroProcessor.java with the contents of Listing 5.6:

package kioto.plain;
import ...
public final class AvroProcessor {
private Consumer<String, GenericRecord> consumer;
private Producer<String, String> producer;

public AvroProcessor(String brokers , String schemaRegistryUrl) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", brokers);
consumerProps.put("group.id", "healthcheck-processor");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", schemaRegistryUrl);
consumer = new KafkaConsumer<>(consumerProps);

Properties producerProps...

Running the AvroProcessor

To build the project, run the following command from the kioto directory:

$ gradle jar

If everything is correct, the output will be something like this:

BUILD SUCCESSFUL in 3s
1 actionable task: 1 executed

Run a console consumer for the uptimes topic, as shown here:

$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic uptimes --property print.key=true
  1. From the IDE, run the main method of the AvroProcessor
  2. From the IDE, run the main method of the AvroProducer
  3. The output on the console consumer for the uptimes topic should be similar to this:
EW05-HV36 33
BO58-SB28 20
DV03-ZT93 46
...

Summary

In this chapter, we showed, instead of sending data in JSON format, how to use AVRO as the serialization format. The main benefit of AVRO (over JSON, for example) is that the data must conform to the schema. Another advantage of AVRO over JSON is that the messages are more compact when sent in binary format, although JSON is human readable.

The schemas are stored in the Schema Registry, so that all users can consult the schema version history, even when the code of the producers and consumers for those messages is no longer available.

Apache Avro also guarantees backward and forward compatibility of all messages in this format. Forward compatibility is achieved following some basic rules, such as when adding a new field, declaring its value as optional.

Apache Kafka encourages the use of Apache Avro and the Schema Registry for the storage of all data and schemas in Kafka...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka Quick Start Guide
Published in: Dec 2018Publisher: PacktISBN-13: 9781788997829
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada