Reader small image

You're reading from  Apache Kafka Quick Start Guide

Product typeBook
Published inDec 2018
Reading LevelBeginner
PublisherPackt
ISBN-139781788997829
Edition1st Edition
Languages
Tools
Right arrow
Author (1)
Raúl Estrada
Raúl Estrada
author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada

Right arrow

Message Validation

Chapter 1, Configuring Kafka, focused on how to set up a Kafka cluster and run a command-line producer and a consumer. Having the event producer, we now have to process those events.

Before going into detail, let's present our case study. We need to model the systems of Monedero, a fictional company whose core business is cryptocurrency exchange. Monedero wants to base its IT infrastructure on an enterprise service bus (ESB) built with Apache Kafka. The Monedero IT department wants to unify the service backbone across the organization. Monedero also has worldwide, web-based, and mobile-app-based clients, so a real-time response is fundamental.

Online customers worldwide browse the Monedero website to exchange their cryptocurrencies. There are a lot of use cases that customers can perform in Monedero, but this example is focused on the part of the exchange...

Enterprise service bus in a nutshell

Event processing consists of taking one or more events from an event stream and applying actions over those events. In general, in an enterprise service bus, there are commodity services; the most common are the following:

  • Data transformation
  • Event handling
  • Protocol conversion
  • Data mapping

Message processing in the majority of cases involves the following:

  • Message structure validation against a message schema
  • Given an event stream, filtering the messages from the stream
  • Message enrichment with additional data
  • Message aggregation (composition) from two or more message to produce a new message

This chapter is about event validation. The chapters that follow are about composition and enrichment.

Event modeling

The first step in event modeling is to express the event in English in the following form:

Subject-verb-direct object

For this example, we are modeling the event customer consults the ETH price:

  • The subject in this sentence is customer, a noun in nominative case. The subject is the entity performing the action.
  • The verb in this sentence is consults; it describes the action performed by the subject.
  • The direct object in this sentence is ETH price. The object is the entity in which the action is being done.

We can represent our message in several message formats (covered in other sections of this book):

  • JavaScript Object Notation (JSON)
  • Apache Avro
  • Apache Thrift
  • Protocol Buffers

JSON is easily read and written by both humans and machines. For example, we could chose binary as the representation, but it has a rigid format and it was not designed for humans to...

Setting up the project

This time, we are going to build our project with Gradle. The first step is to download and install Gradle from http://www.gradle.org/downloads.

Gradle only requires a Java JDK (version 7 or higher).

macOS users can install Gradle with the brew command, as follows:

$ brew update && brew install gradle

The output is something like the following:

==> Downloading https://services.gradle.org/distributions/gradle-4.10.2-all.zip
==> Downloading from https://downloads.gradle.org/distributions/gradle-4.10.2-al
######################################################################## 100.0%
/usr/local/Cellar/gradle/4.10.2: 203 files, 83.7MB, built in 59 seconds

Linux users can install Gradle with the apt-get command, as follows:

$ apt-get install gradle

Unix users can install with sdkman, a tool for managing parallel versions of most Unix-based systems...

Reading from Kafka

Now that we have our project skeleton, let's recall the project requirements for the stream processing engine. Remember that our event customer consults ETH price occurs outside Monedero and that these messages may not be well formed, that is, they may have defects. The first step in our pipeline is to validate that the input events have the correct data and the correct structure. Our project will be called ProcessingEngine.

The ProcessingEngine specification shall create a pipeline application that does the following:

  • Reads each message from a Kafka topic called input-messages
  • Validates each message, sending any invalid event to a specific Kafka topic called invalid-messages
  • Writes the correct messages in a Kafka topic called valid-messages

These steps are detailed in Figure 2.1, the first sketch for the pipeline processing engine:

Figure 2.1: The processing...

Writing to Kafka

Our Reader invokes the process() method; this method belonging to the Producer class. As with the consumer interface, the producer interface encapsulates all of the common behavior of the Kafka producers. The two producers in this chapter implement this producer interface.

In a file called Producer.java, located in the src/main/java/monedero directory, copy the content of Listing 2.6:

package monedero;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public interface Producer {
void process(String message); //1
static void write(KafkaProducer<String, String> producer,
String topic, String message) { //2
ProducerRecord<String, String> pr = new ProducerRecord<>(topic, message);
producer...

Running the processing engine

The ProcessingEngine class coordinates the Reader and Writer classes. It contains the main method to coordinate them. Create a new file called ProcessingEngine.java in the src/main/java/monedero/ directory and copy therein the code in Listing 2.8.

The following is the content of Listing 2.8, ProcessingEngine.java:

package monedero;
public class ProcessingEngine {
public static void main(String[] args) {
String servers = args[0];
String groupId = args[1];
String sourceTopic = args[2];
String targetTopic = args[3];
Reader reader = new Reader(servers, groupId, sourceTopic);
Writer writer = new Writer(servers, targetTopic);
reader.run(writer);
}
}
Listing 2.8: ProcessingEngine.java

ProcessingEngine receives four arguments from the command line:

  • args[0] servers, the host and port of the Kafka broker
  • args[1] groupId, the consumer group...

Coding a validator in Java

The Writer class implements the producer interface. The idea is to modify that Writer and build a validation class with minimum effort. The Validator process is as follows:

  • Read the Kafka messages from the input-messages topic
  • Validate the messages, sending defective messages to the invalid-messages topic
  • Write the well-formed messages to valid-messages topic

At the moment, for this example, the definition of a valid message is a message t0 which the following applies:

  • It is in JSON format
  • It contains the four required fields: event, customer, currency, and timestamp

If these conditions are not met, a new error message in JSON format is generated, sending it to the invalid-messages Kafka topic. The schema of this error message is very simple:

{"error": "Failure description" }

The first step is create a new Validator.java file...

Running the validation

At the moment, the ProcessingEngine class coordinates the Reader and Writer classes. It contains the main method to coordinate them. We have to edit the ProcessingEngine class located in the src/main/java/monedero/ directory and change Writer with Validator, as in Listing 2.10.

The following is the content of Listing 2.10, ProcessingEngine.java:

package monedero;
public class ProcessingEngine {
public static void main(String[] args) {
String servers = args[0];
String groupId = args[1];
String inputTopic = args[2];
String validTopic = args[3];
String invalidTopic = args[4];
Reader reader = new Reader(servers, groupId, inputTopic);
Validator validator = new Validator(servers, validTopic, invalidTopic);
reader.run(validator);
}
}
Listing 2.10: ProcessingEngine.java

ProcessingEngine receives five arguments from the command line:

  • args...

Summary

In this chapter we learned how to model the messages in JSON format and how to set up a Kafka project with Gradle.

Also, we learned how to write to and read from Kafka with a Java client, how to run the processing engine, how to code a validator in Java, and how to run the message validation.

In Chapter 3, Message Enrichment, the architecture of this chapter will be redesigned to incorporate message enrichment.

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka Quick Start Guide
Published in: Dec 2018Publisher: PacktISBN-13: 9781788997829
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada