Apache Kafka Quick Start Guide

5 (2 reviews total)
By Raúl Estrada
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Configuring Kafka

About this book

Apache Kafka is a great open source platform for handling your real-time data pipeline to ensure high-speed filtering and pattern matching on the fly. In this book, you will learn how to use Apache Kafka for efficient processing of distributed applications and will get familiar with solving everyday problems in fast data and processing pipelines.

This book focuses on programming rather than the configuration management of Kafka clusters or DevOps. It starts off with the installation and setting up the development environment, before quickly moving on to performing fundamental messaging operations such as validation and enrichment.

Here you will learn about message composition with pure Kafka API and Kafka Streams. You will look into the transformation of messages in different formats, such asext, binary, XML, JSON, and AVRO. Next, you will learn how to expose the schemas contained in Kafka with the Schema Registry. You will then learn how to work with all relevant connectors with Kafka Connect. While working with Kafka Streams, you will perform various interesting operations on streams, such as windowing, joins, and aggregations. Finally, through KSQL, you will learn how to retrieve, insert, modify, and delete data streams, and how to manipulate watermarks and windows.

Publication date:
December 2018
Publisher
Packt
Pages
186
ISBN
9781788997829

 

Chapter 1. Configuring Kafka

This chapter describes what Kafka is and the concepts related to this technology: brokers, topics, producers, and consumers. It also talks about how to build a simple producer and consumer from the command line, as well as how to install Confluent Platform. The information in this chapter is fundamental to the following chapters.

In this chapter, we will cover the following topics:

  • Kafka in a nutshell
  • Installing Kafka (Linux and macOS)
  • Installing the Confluent Platform
  • Running Kafka
  • Running Confluent Platform
  • Running Kafka brokers
  • Running Kafka topics
  • A command–line message producer
  • A command–line message consumer
  • Using kafkacat
 

Kafka in a nutshell


Apache Kafka is an open source streaming platform. If you are reading this book, maybe you already know that Kafka scales very well in a horizontal way without compromising speed and efficiency.

The Kafka core is written in Scala, and Kafka Streams and KSQL are written in Java. A Kafka server can run in several operating systems: Unix, Linux, macOS, and even Windows. As it usually runs in production on Linux servers, the examples in this book are designed to run on Linux environments. The examples in this book also consider bash environment usage.

 

This chapter explains how to install, configure, and run Kafka. As this is a Quick Start Guide, it does not cover Kafka's theoretical details. At the moment, it is appropriate to mention these three points:

  • Kafka is a service bus: To connect heterogeneous applications, we need to implement a message publication mechanism to send and receive messages among them. A message router is known as message broker. Kafka is a message broker, a solution to deal with routing messages among clients in a quick way.
  • Kafka architecture has two directives: The first is to not block the producers (in order to deal with the back pressure). The second is to isolate producers and consumers. The producers should not know who their consumers are, hence Kafka follows the dumb broker and smart clients model.
  • Kafka is a real-time messaging system: Moreover, Kafka is a software solution with a publish-subscribe model: open source, distributed, partitioned, replicated, and commit-log-based.

There are some concepts and nomenclature in Apache Kafka:

  • Cluster: This is a set of Kafka brokers.
  • Zookeeper: This is acluster coordinator—a tool with different services that are part of the Apache ecosystem.
  • Broker: This is a Kafka server, also the Kafka server process itself.
  • Topic: This is a queue (that has log partitions); a broker can run several topics.
  • Offset: This is an identifier for each message.
  • Partition: This is an immutable and ordered sequence of records continually appended to a structured commit log.
  • Producer: This is the program that publishes data to topics.
  • Consumer: This is the program that processes data from the topics.
  • Retention period: This is the time to keep messages available for consumption.

In Kafka, there are three types of clusters:

  • Single node–single broker
  • Single node–multiple broker
  • Multiple node–multiple broker

 

 

In Kafka, there are three (and just three) ways to deliver messages:

  • Never redelivered: The messages may be lost because, once delivered, they are not sent again.
  • May be redelivered: The messages are never lost because, if it is not received, the message can be sent again.
  • Delivered once: The message is delivered exactly once. This is the most difficult form of delivery; since the message is only sent once and never redelivered, it implies that there is zero loss of any message.

The message log can be compacted in two ways:

  • Coarse-grained: Log compacted by time
  • Fine-grained: Log compacted by message
 

Kafka installation


There are three ways to install a Kafka environment:

  • Downloading the executable files
  • Using brew (in macOS) or yum (in Linux)
  • Installing Confluent Platform

For all three ways, the first step is to install Java; we need Java 8. Download and install the latest JDK 8 from the Oracle's website: 

http://www.oracle.com/technetwork/java/javase/downloads/index.html

At the time of writing, the latest Java 8 JDK version is 8u191.

For Linux users :

  1. Change the file mode to executable as follows, follows these steps:
      > chmod +x jdk-8u191-linux-x64.rpm
  1. Go to the directory in which you want to install Java:
      > cd <directory path>

 

 

  1. Run the rpm installer with the following command:
      > rpm -ivh jdk-8u191-linux-x64.rpm

 

  1. Add to your environment the JAVA_HOME variable. The following command writes the JAVA_HOME environment variable to the /etc/profile file:
      > echo "export JAVA_HOME=/usr/java/jdk1.8.0_191" >> /etc/profile
  1. Validate the Java installation as follows:
      > java -version
      java version "1.8.0_191"
      Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
      Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

At the time of writing, the latest Scala version is 2.12.6. To install Scala in Linux, perform the following steps:

  1. Download the latest Scala binary from http://www.scala-lang.org/download
  2. Extract the downloaded file, scala-2.12.6.tgz, as follows:
      > tar xzf scala-2.12.6.tgz
  1. Add the  SCALA_HOMEvariable to your environment as follows:
      > export SCALA_HOME=/opt/scala
  1. Add the Scala bin directory to your PATH environment variable as follows:
      > export PATH=$PATH:$SCALA_HOME/bin
  1. To validate the Scala installation, do the following:
      >  scala -version
      Scala code runner version 2.12.6 -- Copyright 2002-2018,
      LAMP/EPFL and Lightbend, Inc.

To install Kafka on your machine, ensure that you have at least 4 GB of RAM, and the installation directory will be /usr/local/kafka/ for macOS users and /opt/kafka/ for Linux users. Create these directories according to your operating system.

Kafka installation on Linux

Open the Apache Kafka download page, http://kafka.apache.org/downloads, as in Figure 1.1:

Figure 1.1: Apache Kafka download page

At the time of writing, the current Apache Kafka version is 2.0.0 as a stable release. Remember that, since version 0.8.x, Kafka is not backward-compatible. So, we cannot replace this version for one prior to 0.8. Once you've downloaded the latest available release, let's proceed with the installation.

 

Note

Remember for macOS users, replace the directory /opt/ with /usr/local.

Follow these steps to install Kafka in Linux:

  1. Extract the downloaded file, kafka_2.11-2.0.0.tgz, in the /opt/ directory as follows:
      > tar xzf kafka_2.11-2.0.0.tgz
  1. Create the KAFKA_HOME environment variable as follows:
      > export KAFKA_HOME=/opt/kafka_2.11-2.0.0
  1. Add the Kafka bin directory to the PATH variable as follows:
      > export PATH=$PATH:$KAFKA_HOME/bin

Now Java, Scala, and Kafka are installed.

To do all of the previous steps from the command line, there is a powerful tool for macOS users called brew (the equivalent in Linux would be yum).

Kafka installation on macOS

To install from the command line in macOS (brew must be installed), perform the following steps:

  1. To install sbt (the Scala build tool) with brew, execute the following:
      > brew install sbt

If already have it in your environment (downloaded previously), run the following to upgrade it:

      > brew upgrade sbt

The output is similar to that shown in Figure 1.2:

Figure 1.2: The Scala build tool installation output

  1. To install Scala with brew, execute the following:
      > brew install scala

If you already have it in your environment (downloaded previously), to upgrade it, run the following command:

      > brew upgrade scala

The output is similar to that shown in Figure 1.3:

Figure 1.3: The Scala installation output

  1. To install Kafka with brew, (it also installs Zookeeper), do the following:
      > brew install kafka

If you already have it (downloaded in the past), upgrade it as follows:

      > brew upgrade kafka

 

The output is similar to that shown in Figure 1.4:

Figure 1.4: Kafka installation output

Visit https://brew.sh/ for more aboutbrew.

Confluent Platform installation

The third way to install Kafka is through Confluent Platform. In the rest of this book, we will be using Confluent Platform open source version.

Confluent Platform is an integrated platform that includes the following components:

  • Apache Kafka
  • REST proxy
  • Kafka Connect API
  • Schema Registry
  • Kafka Streams API
  • Pre-built connectors
  • Non-Java clients
  • KSQL

If the reader notices, almost every one of the components has its own chapter in this book.

The commercially licensed Confluent Platform includes, in addition to all of the components of the open source version, the following:

  • Confluent Control Center (CCC)
  • Kafka operator (for Kubernetes)
  • JMS client
  • Replicator
  • MQTT proxy
  • Auto data balancer
  • Security features

It is important to mention that the training on the components of the non-open source version is beyond the scope of this book.

Confluent Platform is available also in Docker images, but here we are going to install it in local.

Open Confluent Platform download page: https://www.confluent.io/download/ .

At the time of this writing, the current version of Confluent Platform is 5.0.0 as a stable release. Remember that, since the Kafka core runs on Scala, there are two versions: for Scala 2.11 and Scala 2.12.

We could run Confluent Platform from our desktop directory, but following this book's conventions, let's use /opt/ for Linux users and /usr/local for macOS users.

To install Confluent Platform, extract the downloaded file, confluent-5.0.0-2.11.tar.gz, in the directory, as follows:

> tar xzf confluent-5.0.0-2.11.tar.gz

 

 

 

Running Kafka


There are two ways to run Kafka, depending on whether we install it directly or through Confluent Platform.

If we install it directly, the steps to run Kafka are as follows.

Note

For macOS users, your paths might be different if you've installed using brew. Check the output of brew install kafka command for the exact command that you can use to start Zookeeper and Kafka.

Go to the Kafka installation directory (/usr/local/kafka for macOS users and /opt/kafka/ for Linux users), as in the example:

> cd /usr/local/kafka

First of all, we need to start Zookeeper (the Kafka dependency with Zookeeper is and will remain strong). Type the following:

> ./bin/zookeeper-server-start.sh ../config/zookeper.properties

ZooKeeper JMX enabled by default

Using config: /usr/local/etc/zookeeper/zoo.cfg

Starting zookeeper ... STARTED

To check whether Zookeeper is running, use the lsof command over the 9093port (default port) as follows:

> lsof -i :9093

COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME

java 12529 admin 406u IPv6 0xc41a24baa4fedb11 0t0 TCP *:9093 (LISTEN)

Now run the Kafka server that comes with the installation by going to /usr/local/kafka/ for macOS users and /opt/kafka/ for Linux users:

> ./bin/kafka-server-start.sh ./config/server.properties

Now there is an Apache Kafka broker running in your machine.

Remember that Zookeeper must be running on the machine before starting Kafka. If you don't want to start Zookeeper manually every time you need to run Kafka, install it as an operation system auto-start service.

 

Running Confluent Platform


Go to the Confluent Platform installation directory (/usr/local/kafka/ for macOS users and /opt/kafka/ for Linux users) and type the following:

> cd /usr/local/confluent-5.0.0

To start Confluent Platform, run the following:

> bin/confluent start

This command-line interface is intended for development only, not for production:

https://docs.confluent.io/current/cli/index.html

The output is similar to what is shown in the following code snippet:

Using CONFLUENT_CURRENT: /var/folders/nc/4jrpd1w5563crr_np997zp980000gn/T/confluent.q3uxpyAt

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

As indicated by the command output, Confluent Platform automatically starts in this order: Zookeeper, Kafka, Schema Registry, REST proxy, Kafka Connect, KSQL, and the Confluent Control Center.

 

 

To access the Confluent Control Center running in your local, go to http://localhost:9021, as shown in Figure 1.5:

Figure 1.5: Confluent Control Center main page

There are other commands for Confluent Platform.

To get the status of all services or the status of a specific service along with its dependencies, enter the following:

> bin/confluent status

To stop all services or a specific service along with the services depending on it, enter the following:

> bin/confluent stop

 

 

To delete the data and logs of the current Confluent Platform, type the following:

> bin/confluent destroy
 

Running Kafka brokers


The real art behind a server is in its configuration. In this section, we will examine how to deal with the basic configuration of a Kafka broker in standalone mode. Since we are learning, at the moment, we will not review the cluster configuration.

As we can suppose, there are two types of configuration: standalone and cluster. The real power of Kafka is unlocked when running with replication in cluster mode and all topics are correctly partitioned.

The cluster mode has two main advantages: parallelism and redundancy. Parallelism is the capacity to run tasks simultaneously among the cluster members. The redundancy warrants that, when a Kafka node goes down, the cluster is safe and accessible from the other running nodes.

This section shows how to configure a cluster with several nodes on our local machine although, in practice, it is always better to have several machines with multiple nodes sharing clusters.

Go to the Confluent Platform installation directory, referenced from now on as <confluent-path>

As mentioned in the beginning of this chapter, a broker is a server instance. A server (or broker) is actually a process running in the operating system and starts based on its configuration file.

The people of Confluent have kindly provided us with a template of a standard broker configuration. This file, which is called server.properties, is located in the Kafka installation directory in the config subdirectory:

  1. Inside <confluent-path>, make a directory with the name mark.
  1. For each Kafka broker (server) that we want to run, we need to make a copy of the configuration file template and rename it accordingly. In this example, our cluster is going to be called mark:
> cp config/server.properties <confluent-path>/mark/mark-1.properties
> cp config/server.properties <confluent-path>/mark/mark-2.properties
  1. Modify each properties file accordingly. If the file is called mark-1, the broker.id should be 1. Then, specify the port in which the server will run; the recommendation is 9093 for mark-1 and 9094 for mark-2. Note that the port property is not set in the template, so add the line. Finally, specify the location of the Kafka logs (a Kafka log is a specific archive to store all of the Kafka broker operations); in this case, we use the /tmp directory. Here, it is common to have problems with write permissions. Do not forget to give write and execute permissions to the user with whom these processes are executed over the log directory, as in the examples:
  • In mark-1.properties, set the following:
      broker.id=1
      port=9093
      log.dirs=/tmp/mark-1-logs
  • In mark-2.properties, set the following:
      broker.id=2
      port=9094
      log.dirs=/tmp/mark-2-logs
  1. Start the Kafka brokers using the kafka-server-start command with the corresponding configuration file passed as the parameter. Don't forget that Confluent Platform must be already running and the ports should not be in use by another process. Start the Kafka brokers as follows:
      > <confluent-path>/bin/kafka-server-start <confluent-
      path>/mark/mark-1.properties &

And, in another command-line window, run the following command:

      > <confluent-path>/bin/kafka-server-start <confluent-
      path>/mark/mark-2.properties &

 

 

 

 

Don't forget that the trailing & is to specify that you want your command line back. If you want to see the broker output, it is recommended to run each command separately in its own command-line window.

Remember that the properties file contains the server configuration and that the server.properties file located in the config directory is just a template.

Now there are two brokers, mark-1 and mark-2 , running in the same machine in the same cluster.

Remember, there are no dumb questions, as in the following examples:

Q: How does each broker know which cluster it belongs to?

A: The brokers know that they belong to the same cluster because, in the configuration, both point to the same Zookeeper cluster.

Q: How does each broker differ from the others within the same cluster? 

A: Every broker is identified inside the cluster by the name specified in the broker.id property.

Q: What happens if the port number is not specified?

A: If the port property is not specified, Zookeeper will assign the same port number and will overwrite the data.

Q: What happens if the log directory is not specified?

A: If log.dir is not specified, all the brokers will write to the same default log.dir. If the brokers are planned to run in different machines, then the port and log.dir properties might not be specified (because they run in the same port and log file but in different machines).

Q: How can I check that there is not a process already running in the port where I want to start my broker?

A: As shown in the previous section, there is a useful command to see what process is running on specific port, in this case the 9093 port:

> lsof -i :9093

 

 

The output of the previous command is something like this:

COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME

java 12529 admin 406u IPv6 0xc41a24baa4fedb11 0t0 TCP *:9093 (LISTEN)

Your turn: try to run this command before starting the Kafka brokers, and run it after starting them to see the change. Also, try to start a broker on a port in use to see how it fails.

OK, what if I want my cluster to run on several machines?

To run Kafka nodes on different machines but in the same cluster, adjust the Zookeeper connection string in the configuration file; its default value is as follows:

zookeeper.connect=localhost:2181

Remember that the machines must be able to be found by each other by DNS and that there are no network security restrictions between them.

The default value for Zookeeper connect is correct only if you are running the Kafka broker in the same machine as Zookeeper. Depending on the architecture, it will be necessary to decide if there will be a broker running on the same Zookeeper machine.

To specify that Zookeeper might run in other machines, do the following:

zookeeper.connect=localhost:2181, 192.168.0.2:2183, 192.168.0.3:2182

The previous line specifies that Zookeeper is running in the local host machine on port 2181, in the machine with IP address 192.168.0.2 on port 2183 , and in the machine with IP address, the 192.168.0.3, on port 2182. The Zookeeper default port is 2181, so normally it runs there.

Your turn: as an exercise, try to start a broker with incorrect information about the Zookeeper cluster. Also, using the lsof command, try to raise Zookeeper on a port in use.

If you have doubts about the configuration, or it is not clear what values to change, the server.properties template (as all of the Kafka project) is open sourced in the following:

https://github.com/apache/kafka/blob/trunk/config/server.properties

 

 

 

Running Kafka topics


The power inside a broker is the topic, namely the queues inside it. Now that we have two brokers running, let's create a Kafka topic on them.

Kafka, like almost all modern infrastructure projects, has three ways of building things: through the command line, through programming, and through a web console (in this case the Confluent Control Center). The management (creation, modification, and destruction) of Kafka brokers can be done through programs written in most modern programming languages. If the language is not supported, it could be managed through the Kafka REST API. The previous section showed how to build a broker using the command line. In later chapters, we will see how to do this process through programming.

Is it possible to only manage (create, modify, or destroy) brokers through programming? No, we can also manage the topics. The topics can also be created through the command line. Kafka has pre-built utilities to manage brokers as we already saw and to manage topics, as we will see next.

To create a topic called amazingTopic in our running cluster, use the following command:

> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic amazingTopic

The output should be as follows:

Created topic amazingTopic

Here, the kafka-topics command is used. With the --create parameter it is specified that we want to create a new topic. The --topic parameter sets the name of the topic, in this case, amazingTopic.

Do you remember the terms parallelism and redundancy? Well, the –-partitions parameter controls the parallelism and the --replication-factor parameter controls the redundancy.

The --replication-factor parameter is fundamental as it specifies in how many servers of the cluster the topic is going to replicate (for example, running). On the other hand, one broker can run just one replica.

 

 

Obviously, if a greater number than the number of running servers on the cluster is specified, it will result in an error (you don't believe me? Try it in your environment). The error will be like this:

Error while executing topic command: replication factor: 3 larger than available brokers: 2

[2018-09-01 07:13:31,350] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 2

(kafka.admin.TopicCommand$)

To be considered, the broker should be running (don't be shy and test all this theory in your environment).

The --partitions parameter, as its name implies, says how many partitions the topic will have. The number determines the parallelism that can be achieved on the consumer's side. This parameter is very important when doing cluster fine-tuning.

Finally, as expected, the --zookeeper parameter indicates where the Zookeeper cluster is running.

When a topic is created, the output in the broker log is something like this:

[2018-09-01 07:05:53,910] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions amazingTopic-0 (kafka.server.ReplicaFetcherManager)

[2018-09-01 07:05:53,950] INFO Completed load of log amazingTopic-0 with 1 log segments and log end offset 0 in 21 ms (kafka.log.Log)

In short, this message reads like a new topic has been born in our cluster.

How can I check my new and shiny topic? By using the same command: kafka-topics.

There are more parameters than --create. To check the status of a topic, run the kafka-topics command with the --list parameter, as follows:

> <confluent-path>/bin/kafka-topics.sh --list --zookeeper localhost:2181

The output is the list of topics, as we know, is as follows:

amazingTopic

This command returns the list with the names of all of the running topics in the cluster. 

 

How can I get details of a topic? Using the same command: kafka-topics.

For a particular topic, run the kafka-topics command with the --describe parameter, as follows:

> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic amazingTopic

The command output is as follows:

Topic:amazingTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: amazingTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Here is a brief explanation of the output:

  • PartitionCount: Number of partitions on the topic (parallelism)
  • ReplicationFactor: Number of replicas on the topic (redundancy)
  • Leader: Node responsible for reading and writing operations of a given partition
  • Replicas: List of brokers replicating this topic data; some of these might even be dead
  • Isr: List of nodes that are currently in-sync replicas

Let's create a topic with multiple replicas (for example, we will run with more brokers in the cluster); we type the following:

> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic redundantTopic

The output is as follows:

Created topic redundantTopic

Now, call the kafka-topics command with the --describe parameter to check the topic details, as follows:

> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic redundantTopic


Topic:redundantTopic PartitionCount:1 ReplicationFactor:2 Configs:

Topic: redundantTopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

As you can see, Replicas and Isr are the same lists; we infer that all of the nodes are in-sync.

 

 

Your turn: play with the kafka-topics command, and try to create replicated topics on dead brokers and see the output. Also, create topics on running servers and then kill them to see the results. Was the output what you expected?

As mentioned before, all of these commands executed through the command line can be executed programmatically or performed through the Confluent Control Center web console.

 

A command-line message producer


Kafka also has a command to send messages through the command line; the input can be a text file or the console standard input. Each line typed in the input is sent as a single message to the cluster.

For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them.

In a new command-line window, run the following command, followed by the lines to be sent as messages to the server:

> <confluent-path>/bin/kafka-console-producer --broker-list localhost:9093 --topic amazingTopic

Fool me once shame on you
Fool me twice shame on me

These lines push two messages into the amazingTopic running on the localhost cluster on the 9093 port.

This command is also the simplest way to check whether a broker with a specific topic is up and running as it is expected.

As we can see, the kafka-console-producer command receives the following parameters:

  • --broker-list: This specifies the Zookeeper servers specified as a comma-separated list in the form, hostname:port.
  • --topic: This parameter is followed by the name of the target topic.
  • --sync: This specifies whether the messages should be sent synchronously.
  • --compression-codec: This specifies the compression codec used to produce the messages. The possible options are: none, gzip, snappy, or lz4. If not specified, the default is gzip.
  • --batch-size: If the messages are not sent synchronously, but the message size is sent in a single batch, this value is specified in bytes.
  • --message-send-max-retries: As the brokers can fail receiving messages, this parameter specifies the number of retries before a producer gives up and drops the message. This number must be a positive integer.
  • --retry-backoff-ms: In case of failure, the node leader election might take some time. This parameter is the time to wait before producer retries after this election. The number is the time in milliseconds.
  • --timeout: If the producer is running in asynchronous mode and this parameter is set, it indicates the maximum amount of time a message will queue awaiting for the sufficient batch size. This value is expressed in milliseconds.
  • --queue-size: If the producer is running in asynchronous mode and this parameter is set, it gives the maximum amount of messages will queue awaiting the sufficient batch size.

In case of a server fine tuning, batch-size, message-send-max-retries, and retry-backoff-ms are very important; take in consideration these parameters to achieve the desired behavior.

If you don't want to type the messages, the command could receive a file where each line is considered a message, as shown in the following example:

<confluent-path>/bin/kafka-console-producer --broker-list localhost:9093 –topic amazingTopic < aLotOfWordsToTell.txt
 

A command-line message consumer


The last step is how to read the generated messages. Kafka also has a powerful command that enables messages to be consumed from the command line. Remember that all of these command-line tasks can also be done programmatically. As the producer, each line in the input is considered a message from the producer. 

For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them. Also, some messages need to be produced with the message console producer, to begin consuming these messages from the console.

 

 

Run the following command:

> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic --bootstrap-server localhost:9093 --from-beginning

The output should be as follows:

Fool me once shame on you
Fool me twice shame on me

The parameters are the topic's name and the name of the broker producer. Also, the --from-beginning parameter indicates that messages should be consumed from the beginning instead of the last messages in the log (now test it, generate many more messages, and don't specify this parameter).

There are more useful parameters for this command, some important ones are as follows:

  • --fetch-size: This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024.
  • --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.
  • --formater: This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter.
  • --autocommit.interval.ms: This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.
  • --max-messages: This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.
  • --skip-message-on-error: If there is an error while processing a message, the system should skip it instead of halting.

The most requested forms of this command are as follows:

  • To consume just one message, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
      amazingTopic --
      bootstrap-server localhost:9093 --max-messages 1

 

  • To consume one message from an offset, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
      amazingTopic --
      bootstrap-server localhost:9093 --max-messages 1 --formatter 
      'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'
  • To consume messages from a specific consumer group, use the following:
      <confluent-path>/bin/kafka-console-consumer –topic amazingTopic -
      - bootstrap-server localhost:9093 --new-consumer --consumer-
      property 
      group.id=my-group
 

Using kafkacat


kafkacat is a generic command-line non-JVM utility used to test and debug Apache Kafka deployments. kafkacat can be used to produce, consume, and list topic and partition information for Kafka. kafkacat is netcat for Kafka, and it is a tool for inspecting and creating data in Kafka.

kafkacat is similar to the Kafka console producer and Kafka console consumer, but more powerful.

kafkacat is an open source utility and it is not included in Confluent Platform. It is available at https://github.com/edenhill/kafkacat.

To install kafkacat on modern Linux, type the following:

apt-get install kafkacat

To install kafkacat on macOS with brew, type the following:

brew install kafkacat

To subscribe to amazingTopic and redundantTopic and print to stdout, type the following:

kafkacat -b localhost:9093 –t amazingTopic redundantTopic

 

 

 

Summary


In this chapter, we've learned what Kafka is, how to install and run Kafka in Linux and macOS and how to install and run Confluent Platform.

Also, we've reviewed how to run Kafka brokers and topics, how to run a command-line message producer and consumer, and how to use kafkacat.

In Chapter 2, Message Validation, we will analyze how to build a producer and a consumer from Java.

About the Author

  • Raúl Estrada

    Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods.

    Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.

    Browse publications by this author

Latest Reviews

(2 reviews total)
Excellent and very good experience
Real quick-start for Kafka novices.

Recommended For You

Book Title
Unlock this book and the full library for FREE
Start free trial