This chapter describes what Kafka is and the concepts related to this technology: brokers, topics, producers, and consumers. It also talks about how to build a simple producer and consumer from the command line, as well as how to install Confluent Platform. The information in this chapter is fundamental to the following chapters.
In this chapter, we will cover the following topics:
- Kafka in a nutshell
- Installing Kafka (Linux and macOS)
- Installing the Confluent Platform
- Running Kafka
- Running Confluent Platform
- Running Kafka brokers
- Running Kafka topics
- A command–line message producer
- A command–line message consumer
- Using kafkacat
The Kafka core is written in Scala, and Kafka Streams and KSQL are written in Java. A Kafka server can run in several operating systems: Unix, Linux, macOS, and even Windows. As it usually runs in production on Linux servers, the examples in this book are designed to run on Linux environments. The examples in this book also consider bash environment usage.
This chapter explains how to install, configure, and run Kafka. As this is a Quick Start Guide, it does not cover Kafka's theoretical details. At the moment, it is appropriate to mention these three points:
- Kafka is a service bus: To connect heterogeneous applications, we need to implement a message publication mechanism to send and receive messages among them. A message router is known as message broker. Kafka is a message broker, a solution to deal with routing messages among clients in a quick way.
- Kafka architecture has two directives: The first is to not block the producers (in order to deal with the back pressure). The second is to isolate producers and consumers. The producers should not know who their consumers are, hence Kafka follows the dumb broker and smart clients model.
- Kafka is a real-time messaging system: Moreover, Kafka is a software solution with a publish-subscribe model: open source, distributed, partitioned, replicated, and commit-log-based.
- Cluster: This is a set of Kafka brokers.
- Zookeeper: This is acluster coordinator—a tool with different services that are part of the Apache ecosystem.
- Broker: This is a Kafka server, also the Kafka server process itself.
- Topic: This is a queue (that has log partitions); a broker can run several topics.
- Offset: This is an identifier for each message.
- Partition: This is an immutable and ordered sequence of records continually appended to a structured commit log.
- Producer: This is the program that publishes data to topics.
- Consumer: This is the program that processes data from the topics.
- Retention period: This is the time to keep messages available for consumption.
- Single node–single broker
- Single node–multiple broker
- Multiple node–multiple broker
In Kafka, there are three (and just three) ways to deliver messages:
- Never redelivered: The messages may be lost because, once delivered, they are not sent again.
- May be redelivered: The messages are never lost because, if it is not received, the message can be sent again.
- Delivered once: The message is delivered exactly once. This is the most difficult form of delivery; since the message is only sent once and never redelivered, it implies that there is zero loss of any message.
The message log can be compacted in two ways:
- Downloading the executable files
brew(in macOS) or
- Installing Confluent Platform
For all three ways, the first step is to install Java; we need Java 8. Download and install the latest JDK 8 from the Oracle's website:
For Linux users :
- Change the file mode to executable as follows, follows these steps:
> chmod +x jdk-8u191-linux-x64.rpm
- Go to the directory in which you want to install Java:
> cd <directory path>
- Run the
rpminstaller with the following command:
> rpm -ivh jdk-8u191-linux-x64.rpm
- Add to your environment the
JAVA_HOMEvariable. The following command writes the
JAVA_HOMEenvironment variable to the
> echo "export JAVA_HOME=/usr/java/jdk1.8.0_191" >> /etc/profile
> java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
At the time of writing, the latest Scala version is 2.12.6. To install Scala in Linux, perform the following steps:
- Download the latest Scala binary from http://www.scala-lang.org/download
- Extract the downloaded file,
scala-2.12.6.tgz, as follows:
> tar xzf scala-2.12.6.tgz
- Add the
SCALA_HOMEvariable to your environment as follows:
> export SCALA_HOME=/opt/scala
- Add the Scala bin directory to your
PATHenvironment variable as follows:
> export PATH=$PATH:$SCALA_HOME/bin
- To validate the Scala installation, do the following:
> scala -version Scala code runner version 2.12.6 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.
To install Kafka on your machine, ensure that you have at least 4 GB of RAM, and the installation directory will be
/usr/local/kafka/ for macOS users and
/opt/kafka/ for Linux users. Create these directories according to your operating system.
Open the Apache Kafka download page, http://kafka.apache.org/downloads, as in Figure 1.1:
Figure 1.1: Apache Kafka download page
At the time of writing, the current Apache Kafka version is 2.0.0 as a stable release. Remember that, since version 0.8.x, Kafka is not backward-compatible. So, we cannot replace this version for one prior to 0.8. Once you've downloaded the latest available release, let's proceed with the installation.
Follow these steps to install Kafka in Linux:
- Extract the downloaded file,
kafka_2.11-2.0.0.tgz, in the
/opt/directory as follows:
> tar xzf kafka_2.11-2.0.0.tgz
- Create the
KAFKA_HOMEenvironment variable as follows:
> export KAFKA_HOME=/opt/kafka_2.11-2.0.0
- Add the Kafka bin directory to the
PATHvariable as follows:
> export PATH=$PATH:$KAFKA_HOME/bin
Now Java, Scala, and Kafka are installed.
- To install
sbt(the Scala build tool) with
brew, execute the following:
> brew install sbt
If already have it in your environment (downloaded previously), run the following to upgrade it:
> brew upgrade sbt
The output is similar to that shown in Figure 1.2:
Figure 1.2: The Scala build tool installation output
- To install Scala with
brew, execute the following:
> brew install scala
> brew upgrade scala
The output is similar to that shown in Figure 1.3:
Figure 1.3: The Scala installation output
- To install Kafka with
brew, (it also installs Zookeeper), do the following:
> brew install kafka
If you already have it (downloaded in the past), upgrade it as follows:
> brew upgrade kafka
Figure 1.4: Kafka installation output
Visit https://brew.sh/ for more about
- Apache Kafka
- REST proxy
- Kafka Connect API
- Schema Registry
- Kafka Streams API
- Pre-built connectors
- Non-Java clients
If the reader notices, almost every one of the components has its own chapter in this book.
- Confluent Control Center (CCC)
- Kafka operator (for Kubernetes)
- JMS client
- MQTT proxy
- Auto data balancer
- Security features
It is important to mention that the training on the components of the non-open source version is beyond the scope of this book.
Confluent Platform is available also in Docker images, but here we are going to install it in local.
Open Confluent Platform download page: https://www.confluent.io/download/ .
At the time of this writing, the current version of Confluent Platform is 5.0.0 as a stable release. Remember that, since the Kafka core runs on Scala, there are two versions: for Scala 2.11 and Scala 2.12.
We could run Confluent Platform from our desktop directory, but following this book's conventions, let's use
/opt/ for Linux users and
/usr/local for macOS users.
> tar xzf confluent-5.0.0-2.11.tar.gz
If we install it directly, the steps to run Kafka are as follows.
For macOS users, your paths might be different if you've installed using
brew. Check the output of
brew install kafka command for the exact command that you can use to start Zookeeper and Kafka.
Go to the Kafka installation directory (
/usr/local/kafka for macOS users and
/opt/kafka/ for Linux users), as in the example:
> cd /usr/local/kafka
First of all, we need to start Zookeeper (the Kafka dependency with Zookeeper is and will remain strong). Type the following:
> ./bin/zookeeper-server-start.sh ../config/zookeper.properties ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED
To check whether Zookeeper is running, use the
lsof command over the
9093port (default port) as follows:
> lsof -i :9093 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 12529 admin 406u IPv6 0xc41a24baa4fedb11 0t0 TCP *:9093 (LISTEN)
Now run the Kafka server that comes with the installation by going to
/usr/local/kafka/ for macOS users and
/opt/kafka/ for Linux users:
> ./bin/kafka-server-start.sh ./config/server.properties
Now there is an Apache Kafka broker running in your machine.
Remember that Zookeeper must be running on the machine before starting Kafka. If you don't want to start Zookeeper manually every time you need to run Kafka, install it as an operation system auto-start service.
> cd /usr/local/confluent-5.0.0
To start Confluent Platform, run the following:
> bin/confluent start
The output is similar to what is shown in the following code snippet:
Using CONFLUENT_CURRENT: /var/folders/nc/4jrpd1w5563crr_np997zp980000gn/T/confluent.q3uxpyAt Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP]
To access the Confluent Control Center running in your local, go to
http://localhost:9021, as shown in Figure 1.5:
Figure 1.5: Confluent Control Center main page
There are other commands for Confluent Platform.
To get the status of all services or the status of a specific service along with its dependencies, enter the following:
> bin/confluent status
> bin/confluent stop
To delete the data and logs of the current Confluent Platform, type the following:
> bin/confluent destroy
The real art behind a server is in its configuration. In this section, we will examine how to deal with the basic configuration of a Kafka broker in standalone mode. Since we are learning, at the moment, we will not review the cluster configuration.
As we can suppose, there are two types of configuration: standalone and cluster. The real power of Kafka is unlocked when running with replication in cluster mode and all topics are correctly partitioned.
The cluster mode has two main advantages: parallelism and redundancy. Parallelism is the capacity to run tasks simultaneously among the cluster members. The redundancy warrants that, when a Kafka node goes down, the cluster is safe and accessible from the other running nodes.
This section shows how to configure a cluster with several nodes on our local machine although, in practice, it is always better to have several machines with multiple nodes sharing clusters.
Go to the Confluent Platform installation directory, referenced from now on as
As mentioned in the beginning of this chapter, a broker is a server instance. A server (or broker) is actually a process running in the operating system and starts based on its configuration file.
The people of Confluent have kindly provided us with a template of a standard broker configuration. This file, which is called
server.properties, is located in the Kafka installation directory in the
<confluent-path>, make a directory with the name mark.
- For each Kafka broker (server) that we want to run, we need to make a copy of the configuration file template and rename it accordingly. In this example, our cluster is going to be called
> cp config/server.properties <confluent-path>/mark/mark-1.properties
> cp config/server.properties <confluent-path>/mark/mark-2.properties
- Modify each properties file accordingly. If the file is called
1. Then, specify the port in which the server will run; the recommendation is
mark-2. Note that the port property is not set in the template, so add the line. Finally, specify the location of the Kafka logs (a Kafka log is a specific archive to store all of the Kafka broker operations); in this case, we use the
/tmpdirectory. Here, it is common to have problems with write permissions. Do not forget to give write and execute permissions to the user with whom these processes are executed over the log directory, as in the examples:
mark-1.properties, set the following:
broker.id=1 port=9093 log.dirs=/tmp/mark-1-logs
mark-2.properties, set the following:
broker.id=2 port=9094 log.dirs=/tmp/mark-2-logs
- Start the Kafka brokers using the
kafka-server-startcommand with the corresponding configuration file passed as the parameter. Don't forget that Confluent Platform must be already running and the ports should not be in use by another process. Start the Kafka brokers as follows:
> <confluent-path>/bin/kafka-server-start <confluent- path>/mark/mark-1.properties &
And, in another command-line window, run the following command:
> <confluent-path>/bin/kafka-server-start <confluent- path>/mark/mark-2.properties &
Don't forget that the trailing
& is to specify that you want your command line back. If you want to see the broker output, it is recommended to run each command separately in its own command-line window.
Remember that the properties file contains the server configuration and that the
server.properties file located in the
config directory is just a template.
Now there are two brokers,
mark-2 , running in the same machine in the same cluster.
Remember, there are no dumb questions, as in the following examples:
A: The brokers know that they belong to the same cluster because, in the configuration, both point to the same Zookeeper cluster.
Q: How does each broker differ from the others within the same cluster?
A: Every broker is identified inside the cluster by the name specified in the
Q: What happens if the port number is not specified?
A: If the port property is not specified, Zookeeper will assign the same port number and will overwrite the data.
Q: What happens if the log directory is not specified?
log.dir is not specified, all the brokers will write to the same default
log.dir. If the brokers are planned to run in different machines, then the port and
log.dir properties might not be specified (because they run in the same port and log file but in different machines).
Q: How can I check that there is not a process already running in the port where I want to start my broker?
A: As shown in the previous section, there is a useful command to see what process is running on specific port, in this case the
> lsof -i :9093
The output of the previous command is something like this:
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 12529 admin 406u IPv6 0xc41a24baa4fedb11 0t0 TCP *:9093 (LISTEN)
Your turn: try to run this command before starting the Kafka brokers, and run it after starting them to see the change. Also, try to start a broker on a port in use to see how it fails.
To run Kafka nodes on different machines but in the same cluster, adjust the Zookeeper connection string in the configuration file; its default value is as follows:
Remember that the machines must be able to be found by each other by DNS and that there are no network security restrictions between them.
The default value for Zookeeper connect is correct only if you are running the Kafka broker in the same machine as Zookeeper. Depending on the architecture, it will be necessary to decide if there will be a broker running on the same Zookeeper machine.
To specify that Zookeeper might run in other machines, do the following:
zookeeper.connect=localhost:2181, 192.168.0.2:2183, 192.168.0.3:2182
The previous line specifies that Zookeeper is running in the local host machine on port
2181, in the machine with IP address
192.168.0.2 on port
2183 , and in the machine with IP address, the
192.168.0.3, on port
2182. The Zookeeper default port is
2181, so normally it runs there.
Your turn: as an exercise, try to start a broker with incorrect information about the Zookeeper cluster. Also, using the
lsof command, try to raise Zookeeper on a port in use.
Kafka, like almost all modern infrastructure projects, has three ways of building things: through the command line, through programming, and through a web console (in this case the Confluent Control Center). The management (creation, modification, and destruction) of Kafka brokers can be done through programs written in most modern programming languages. If the language is not supported, it could be managed through the Kafka REST API. The previous section showed how to build a broker using the command line. In later chapters, we will see how to do this process through programming.
Is it possible to only manage (create, modify, or destroy) brokers through programming? No, we can also manage the topics. The topics can also be created through the command line. Kafka has pre-built utilities to manage brokers as we already saw and to manage topics, as we will see next.
To create a topic called
amazingTopic in our running cluster, use the following command:
> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic amazingTopic
The output should be as follows:
Created topic amazingTopic
kafka-topics command is used. With the
--create parameter it is specified that we want to create a new topic. The
--topic parameter sets the name of the topic, in this case,
Do you remember the terms parallelism and redundancy? Well, the
–-partitions parameter controls the parallelism and the
--replication-factor parameter controls the redundancy.
--replication-factor parameter is fundamental as it specifies in how many servers of the cluster the topic is going to replicate (for example, running). On the other hand, one broker can run just one replica.
Obviously, if a greater number than the number of running servers on the cluster is specified, it will result in an error (you don't believe me? Try it in your environment). The error will be like this:
Error while executing topic command: replication factor: 3 larger than available brokers: 2 [2018-09-01 07:13:31,350] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 2 (kafka.admin.TopicCommand$)
To be considered, the broker should be running (don't be shy and test all this theory in your environment).
--partitions parameter, as its name implies, says how many partitions the topic will have. The number determines the parallelism that can be achieved on the consumer's side. This parameter is very important when doing cluster fine-tuning.
Finally, as expected, the
--zookeeper parameter indicates where the Zookeeper cluster is running.
When a topic is created, the output in the broker log is something like this:
[2018-09-01 07:05:53,910] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions amazingTopic-0 (kafka.server.ReplicaFetcherManager) [2018-09-01 07:05:53,950] INFO Completed load of log amazingTopic-0 with 1 log segments and log end offset 0 in 21 ms (kafka.log.Log)
In short, this message reads like a new topic has been born in our cluster.
How can I check my new and shiny topic? By using the same command:
> <confluent-path>/bin/kafka-topics.sh --list --zookeeper localhost:2181
The output is the list of topics, as we know, is as follows:
This command returns the list with the names of all of the running topics in the cluster.
How can I get details of a topic? Using the same command:
For a particular topic, run the
kafka-topics command with the
--describe parameter, as follows:
> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic amazingTopic
The command output is as follows:
Topic:amazingTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: amazingTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Here is a brief explanation of the output:
PartitionCount: Number of partitions on the topic (parallelism)
ReplicationFactor: Number of replicas on the topic (redundancy)
Leader: Node responsible for reading and writing operations of a given partition
Replicas: List of brokers replicating this topic data; some of these might even be dead
Isr: List of nodes that are currently in-sync replicas
> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic redundantTopic
The output is as follows:
Created topic redundantTopic
Now, call the
kafka-topics command with the
--describe parameter to check the topic details, as follows:
> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic redundantTopic Topic:redundantTopic PartitionCount:1 ReplicationFactor:2 Configs: Topic: redundantTopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
As you can see,
Isr are the same lists; we infer that all of the nodes are in-sync.
Your turn: play with the
kafka-topics command, and try to create replicated topics on dead brokers and see the output. Also, create topics on running servers and then kill them to see the results. Was the output what you expected?
Kafka also has a command to send messages through the command line; the input can be a text file or the console standard input. Each line typed in the input is sent as a single message to the cluster.
For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them.
In a new command-line window, run the following command, followed by the lines to be sent as messages to the server:
> <confluent-path>/bin/kafka-console-producer --broker-list localhost:9093 --topic amazingTopic Fool me once shame on you Fool me twice shame on me
These lines push two messages into the
amazingTopic running on the localhost cluster on the
This command is also the simplest way to check whether a broker with a specific topic is up and running as it is expected.
As we can see, the
kafka-console-producer command receives the following parameters:
--broker-list: This specifies the Zookeeper servers specified as a comma-separated list in the form, hostname:port.
--topic: This parameter is followed by the name of the target topic.
--sync: This specifies whether the messages should be sent synchronously.
--compression-codec: This specifies the compression codec used to produce the messages. The possible options are:
snappy, or lz4. If not specified, the default is gzip.
--batch-size: If the messages are not sent synchronously, but the message size is sent in a single batch, this value is specified in bytes.
--message-send-max-retries: As the brokers can fail receiving messages, this parameter specifies the number of retries before a producer gives up and drops the message. This number must be a positive integer.
--retry-backoff-ms: In case of failure, the node leader election might take some time. This parameter is the time to wait before producer retries after this election. The number is the time in milliseconds.
--timeout: If the producer is running in asynchronous mode and this parameter is set, it indicates the maximum amount of time a message will queue awaiting for the sufficient batch size. This value is expressed in milliseconds.
--queue-size: If the producer is running in asynchronous mode and this parameter is set, it gives the maximum amount of messages will queue awaiting the sufficient batch size.
In case of a server fine tuning,
retry-backoff-ms are very important; take in consideration these parameters to achieve the desired behavior.
<confluent-path>/bin/kafka-console-producer --broker-list localhost:9093 –topic amazingTopic < aLotOfWordsToTell.txt
The last step is how to read the generated messages. Kafka also has a powerful command that enables messages to be consumed from the command line. Remember that all of these command-line tasks can also be done programmatically. As the producer, each line in the input is considered a message from the producer.
For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them. Also, some messages need to be produced with the message console producer, to begin consuming these messages from the console.
Run the following command:
> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic --bootstrap-server localhost:9093 --from-beginning
The output should be as follows:
Fool me once shame on you Fool me twice shame on me
The parameters are the topic's name and the name of the broker producer. Also, the
--from-beginning parameter indicates that messages should be consumed from the beginning instead of the last messages in the log (now test it, generate many more messages, and don't specify this parameter).
--fetch-size: This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024.
--socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.
--formater: This is the name of the class to use for formatting messages for display. The default value is
--autocommit.interval.ms: This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.
--max-messages: This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.
--skip-message-on-error: If there is an error while processing a message, the system should skip it instead of halting.
The most requested forms of this command are as follows:
- To consume just one message, use the following:
> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic -- bootstrap-server localhost:9093 --max-messages 1
- To consume one message from an offset, use the following:
> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic -- bootstrap-server localhost:9093 --max-messages 1 --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'
<confluent-path>/bin/kafka-console-consumer –topic amazingTopic - - bootstrap-server localhost:9093 --new-consumer --consumer- property group.id=my-group
kafkacat is a generic command-line non-JVM utility used to test and debug Apache Kafka deployments. kafkacat can be used to produce, consume, and list topic and partition information for Kafka. kafkacat is netcat for Kafka, and it is a tool for inspecting and creating data in Kafka.
kafkacat is similar to the Kafka console producer and Kafka console consumer, but more powerful.
kafkacat is an open source utility and it is not included in Confluent Platform. It is available at https://github.com/edenhill/kafkacat.
kafkacat on modern Linux, type the following:
apt-get install kafkacat
kafkacat on macOS with
brew, type the following:
brew install kafkacat
kafkacat -b localhost:9093 –t amazingTopic redundantTopic
In this chapter, we've learned what Kafka is, how to install and run Kafka in Linux and macOS and how to install and run Confluent Platform.
Also, we've reviewed how to run Kafka brokers and topics, how to run a command-line message producer and consumer, and how to use kafkacat.
In Chapter 2, Message Validation, we will analyze how to build a producer and a consumer from Java.