Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Apache Kafka Quick Start Guide

You're reading from  Apache Kafka Quick Start Guide

Product type Book
Published in Dec 2018
Publisher Packt
ISBN-13 9781788997829
Pages 186 pages
Edition 1st Edition
Languages
Author (1):
Raúl Estrada Raúl Estrada
Profile icon Raúl Estrada

Table of Contents (10) Chapters

Preface Configuring Kafka Message Validation Message Enrichment Serialization Schema Registry Kafka Streams KSQL Kafka Connect Other Books You May Enjoy

Message Enrichment

To fully understand this chapter, it is necessary to have read the previous chapter that focused on how to validate events. This chapter is focused on how to enrich events.

In this chapter, we will continue using the systems of Monedero, our fictitious company that is dedicated to the exchange of cryptocurrencies. If we remember in the previous chapter, the messages of Monedero were validated; in this chapter, we will continue with the same flow, but we will add one more step of enrichment.

In this context, we understand enrichment as adding extra data that was not in the original message. In this chapter, we will see how to enrich a message with geographic location using the MaxMind database and how to extract the current value of the exchange rate using the Open Exchange data. If we remember the events that we modeled for Monedero, each one included the IP...

Extracting the geographic location

Open the build.gradle file on the Monedero project created in Chapter 2, Message Validation, and add the lines highlighted in Listing 3.1.

The following is the content of Listing 3.1, the Monedero build.gradle file:

apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = '1.8'
mainClassName = 'monedero.ProcessingEngine'
repositories {
mavenCentral()
}
version = '0.2.0'
dependencies {
compile group: 'org.apache.kafka', name: 'kafka_2.12', version:
'2.0.0'
compile group: 'com.maxmind.geoip', name: 'geoip-api', version: ...

Enriching the messages

Now, we will recap the steps of our processing engine for Monedero. The customer consults the ETH price in the client's browser and is sent to Kafka through some HTTP event collector.

The first step in our flow is the event correctness validation; remember from the previous chapter that the messages with defects are derived from bad data and that is why they are filtered. The second step now is to enrich our message with geographic location information.

Here are the architecture steps for the Monedero processing engine:

  1. Read the individual events from a Kafka topic called input-messages
  2. Validate the message, sending any defective event to a dedicated Kafka topic called invalid-messages
  3. Enrich the message with the geographic location data
  4. Write the enriched messages in a Kafka topic called valid-messages

All of these steps of the second version of...

Extracting the currency price

At the moment, Monedero has a service that validates the messages that are well formed. The service also enriches the messages with the customer's geographic location.

Recall that the Monedero core business is the cryptocurrencies exchange. So now, the business asks us for a service that returns the requested currency price online at a specific time.

To achieve this, we will use the exchange rate of open exchange rates:

https://openexchangerates.org/

To obtain a free API key, you have to register in a free plan; the key is needed to access the free API.

Now, let's create a file called OpenExchangeService.java in the src/main/java/monedero/extractors directory with the content of Listing 3.4:

package monedero.extractors;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException...

Enriching with currency price

The customer consults the ETH price event, starts in the client's web browser, and is dispatched to Kafka through some HTTP event collector. The second step is to enrich the messages with the geographic location information from MaxMind database. The third step is to enrich the message with the currency price from open exchange rates service.

In summary, here are the architecture steps for the Monedero processing engine:

  1. Read the individual events from a Kafka topic called input-messages
  2. Validate the message, sending any defective event to a specific Kafka topic called invalid-messages
  3. Enrich the message with the geographic location data from MaxMind database
  4. Enrich the message with the currency price from open exchange rates service
  5. Write the enriched messages in a Kafka topic called valid-messages

The final version of the stream processing...

Running the engine

Now that the final version of the Enricher class is coded, we have to compile and execute it.

As we know, the ProcessingEngine class contains the main method to coordinate the reader and writer classes. Now, let's modify the ProcessingEngine.java file on the src/main/java/monedero/ directory and replace Validator with Enricher as in the highlighted code in Listing 3.6:

package monedero;
public class ProcessingEngine {
public static void main(String[] args){
String servers = args[0];
String groupId = args[1];
String sourceTopic = args[2];
String validTopic = args[3];
String invalidTopic = args[4];
Reader reader = new Reader(servers, groupId, sourceTopic);
Enricher enricher = new Enricher(servers, validTopic, invalidTopic);
reader.run(enricher);
}
}
Listing 3.6: ProcessingEngine.java

The processing engine receives the following five...

Extracting the weather data

Obtaining the geographic location from the IP address is a problem that has already been solved in this chapter.

In this last section, we will build another extractor that will be used in the following chapters. Now, suppose we want to know the current temperature of a given a geographic location at a specific time. To achieve this, we use the OpenWeatherService.

Visit the Open Weather page: https://openweathermap.org/.

To obtain a free API key register in a free plan; this key is needed to access the free API.

Now, create a file called OpenWeatherService.java in the src/main/java/monedero/extractors directory with the content of Listing 3.7:

package monedero.extractors;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URL;
import java.util.logging.Level;
import...

Summary

In this chapter, we covered how to make data extraction, how message enrichment works, and how to extract the geographic location given an IP Address. Also, we demonstrated an example of how to extract the currency price given a currency and running a processing engine.

The Chapter 4, Serialization, talks about the schema registry. The extractors built in this chapter are used in the following chapters.

lock icon The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka Quick Start Guide
Published in: Dec 2018 Publisher: Packt ISBN-13: 9781788997829
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}