Reader small image

You're reading from  Apache Kafka 1.0 Cookbook

Product typeBook
Published inDec 2017
Reading LevelIntermediate
PublisherPackt
ISBN-139781787286849
Edition1st Edition
Languages
Tools
Right arrow
Authors (2):
Raúl Estrada
Raúl Estrada
author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada

View More author details
Right arrow

Chapter 4. Message Enrichment

In this chapter, we will cover the following recipes:

  • Geolocation extractor
  • Geolocation enricher
  • Currency price extractor
  • Currency price enricher
  • Running the currency price enricher
  • Modeling the events
  • Setting up the project
  • Open weather extractor
  • Location temperature enricher
  • Running the location temperature enricher

Introduction


The previous chapter focused on how to perform message validation with Kafka. This chapter is about message enrichment and the following chapter is about message composition. This chapter continues modeling the systems of Doubloon, the fictional company dedicated to cryptocurrency exchange.

Here, a new company is introduced, Treu Technologies. Treu is a fictional company dedicated to energy production and distribution. To operate, Treu uses a lot of Internet of Things (IoT) devices.

Treu also wants to implement an enterprise service bus with Apache Kafka. The goal is to manage all the messages received every minute from the machines and sensors. Treu has hundreds of machines sending thousands of messages per minute of different kinds to the enterprise service bus.

In the last chapter, the validation of the Doubloon messages was implemented. In this chapter we add enrichment. In this context, enriching means adding extra information to the messages. In the following recipes, the...

Geolocation extractor


In Doubloon, there is a service that validates that the messages are well formed. But now, the business indicates that there should be validation at the customer's location. This is very simple, there is a term called a bit license, which limits virtual currency activities to a geographical area. At the time of writing, the regulations are limited to New York residents. For this purpose, those that reside, are located, have a place of business, or are conducting business, in the state of New York count as New York residents.

Getting ready

The execution of the recipes in Chapter 3, Message Validation is needed.

How to do it...

  1. The first step is to open the build.gradle file on the Doubloon project created in Chapter 3, Message Validation, and add the lines in the following code:
apply plugin: 'java' 
apply plugin: 'application' 
 
sourceCompatibility = '1.8' 
 
mainClassName = 'doubloon.ProcessingApp' 
 
repositories { 
  mavenCentral() 
} 
 
version = '0.2.0' 
 
dependencies...

Geolocation enricher


Let's remember the Doubloon project requirements for the stream processing app. The customer sees BTC price event happens in the customer's web browser and is dispatched to Kafka via an HTTP event collector. The second step is to enrich the messages with the geolocation information. Remember from the previous chapter that defective messages result in bad data, so they are filtered.

Getting ready

Putting it all together, the specification is to create a stream application that does the following:

  • Reads individual messages from a Kafka topic called raw-messages
  • Validates the message, sending any invalid message to a dedicated Kafka topic called invalid-messages
  • Enriches the message with the geolocation information
  • Writes the enriched messages in a Kafka topic called valid-messages

All this is detailed in the following diagram and is the second version of the stream processing application:

Figure 4.1: The processing application reads events from the raw-messages topic, validates...

Currency price extractor


Okay, now in Doubloon there is a service that validates that the messages are well-formed. Also, the service enriches the messages with the customer's geolocation. Now the business indicates that they need a service that properly returns the requested currency price.

Getting ready

The execution of the previous recipes in this chapter is needed.

How to do it...

Go to the Open Exchange Rates page at: https://openexchangerates.org/. Register for a free plan to obtain your free API key. This key is needed to access the free API.

Create a file called OpenExchange.java in the src/main/java/doubloon/extractors directory with the following contents:

package doubloon.extractors; 
 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import java.io.IOException; 
import java.net.MalformedURLException; 
import java.net.URL; 
import java.util.logging.Level; 
import java.util.logging.Logger; 
 
public class OpenExchange { 
   
  private...

Currency price enricher


The customer sees BTC price event happens in the customer's web browser and is dispatched to Kafka via an HTTP event collector. The second step is to enrich the messages with the geolocation information. The third step is to enrich the message with the currency price.

Getting ready

Recapitulating, the specification is to create a stream application that does the following:

  • Reads individual messages from a Kafka topic called raw-messages
  • Validates the messages, sending any invalid messages to a dedicated Kafka topic called invalid-messages
  • Enriches the messages with the geolocation information and the currency price
  • Writes the enriched messages in a Kafka topic called valid-messages

All this is detailed in the following diagram, which is the final version of the stream processing application:

Figure 4.2: The processing application reads events from the raw-messages topic, validates the messages, routes the errors to the invalid-messages topic, enriches the messages with geolocation...

Running the currency price enricher


In the previous recipe, the final version of the Enricher class was coded. Now, in this recipe, everything is compiled and executed.

Getting ready

The execution of the previous recipes in this chapter is needed.

How to do it...

The ProcessingApp class coordinates the Reader and Writer classes. It contains the main method to execute them. Create a new file called src/main/java/doubloon/ProcessingApp.java and fill it with the following code:

 
package doubloon; 
 
import java.io.IOException; 
 
public class ProcessingApp { 
 
  public static void main(String[] args) throws IOException{ 
    String servers = args[0]; 
    String groupId = args[1]; 
    String sourceTopic = args[2]; 
    String goodTopic = args[3]; 
    String badTopic = args[4]; 
     
    Reader reader = new Reader(servers, groupId, sourceTopic);       
    Enricher enricher = new Enricher(servers, goodTopic, badTopic); 
   
    reader.run(enricher); 
   
  } 
} 
 

How it works...

The ProcessingApp...

Modeling the events


This recipe shows how to model the Treu messages in JSON format.

Getting ready

For this recipe, basic knowledge of JSON is required.

How to do it...

As mentioned, Treu Technologies has a lot of IoT machines that continuously send messages about their status to the control center.

These machines are used to generate electricity. So, it is very important for Treu to know the exact temperature of the machine and its state (running, shutdown, starting, shutting down, and so on).

Treu needs to know the weather forecast, because the machine should not operate over certain temperatures. These machines have different behaviors based on the temperature. It is different starting a machine in the cold than in warm conditions. The startup time also depends on the temperature. To warrant the electricity supply, the information has to be precise.

In a nutshell, it is always better to face an electrical power failure having to start the machines from warm than from cold.

The following code...

Setting up the project


Before writing code, let's recall the project requirements for the Treu Technologies stream processing app.

Getting ready

Putting it all together, the specification is to create a stream application that does the following:

  • Reads individual messages from a Kafka topic called raw-messages
  • Enriches the messages with the geolocalization of the machine's IP address
  • Enriches the messages with the weather information of the geolocalization
  • Writes the correct events in a Kafka topic called enriched-messages

All these processes are detailed in the following diagram, which is the Treu stream processing application:

Figure 4.4: The processing application reads events from the raw-messages topic, enriches the messages with geolocalization and weather temperature information, and writes to the enriched-messages queue

How to do it...

  1. The first step is to create our project with Gradle, which will be called ProcessingApp. Create a directory called treu, go to that directory, and execute the...

Open weather extractor


We have solved the problem of obtaining the geolocation from the IP address in this chapter. As the business requested, we also need to know the current temperature given a geolocation.

Getting ready

The execution of the previous recipe is needed.

How to do it...

Go to the OpenWeatherMap page at: https://openweathermap.org/. Register for a free plan to obtain your free API key, that key is needed to access the free API.

Create a file called OpenWeather.java in the src/main/java/treu/extractors directory with the following contents:

package treu.extractors; 
 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import doubloon.extractors.OpenExchange; 
import java.io.IOException; 
import java.net.MalformedURLException; 
import java.net.URL; 
import java.util.logging.Level; 
import java.util.logging.Logger; 
 
public class OpenWeather { 
   
  private static final String API_KEY = "API_KEY_VALUE";                 //1 
  protected...

Location temperature enricher


The next step is to enrich the messages with the geolocation information. The third step is to enrich the message with the temperature.

Getting ready

Recapitulating, the specification is to create a stream application that does the following:

  • Reads individual messages from a Kafka topic called raw-messages
  • Enriches the messages with the geolocalization of the machine's IP address
  • Enriches the messages with the weather information of the geolocalization
  • Writes the correct events in a Kafka topic called enriched-messages

How to do it...

Modify the Enricher.java file in the src/main/java/treu/ directory with the following contents:

package treu; 
 
import com.fasterxml.jackson.databind.*; 
import com.fasterxml.jackson.databind.node.ObjectNode; 
import com.maxmind.geoip.Location; 
import treu.extractors.GeoIP; 
import treu.extractors.OpenWeather; 
 
import java.io.IOException; 
 
import org.apache.kafka.clients.producer.*; 
 
public class Enricher implements Producer { ...

Running the location temperature enricher


In the previous recipe, the final version of the Enricher class was coded. Now, in this recipe, everything is compiled and executed.

Getting ready

The execution of the previous recipe in this chapter is needed.

How to do it...

The ProcessingApp class coordinates the Reader and Writer classes. It contains the main method to execute them. Create a new file called src/main/java/treu/ProcessingApp.java and fill it with the following code:

package treu; 
 
import java.io.IOException; 
 
public class ProcessingApp { 
 
  public static void main(String[] args) throws IOException{ 
    String servers = args[0]; 
    String groupId = args[1]; 
    String sourceTopic = args[2]; 
    String enrichedTopic = args[3]; 
     
    Reader reader = new Reader(servers, groupId, sourceTopic);       
    Enricher enricher = new Enricher(servers, goodTopic, enrichedTopic); 
   
    reader.run(enricher); 
   
  } 
}  

How it works...

The ProcessingApp receives four arguments...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Apache Kafka 1.0 Cookbook
Published in: Dec 2017Publisher: PacktISBN-13: 9781787286849
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Authors (2)

author image
Raúl Estrada

Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods. Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.
Read more about Raúl Estrada