Apache Kafka 1.0 Cookbook

4.8 (4 reviews total)
By Raúl Estrada
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Configuring Kafka

About this book

Apache Kafka provides a unified, high-throughput, low-latency platform to handle real-time data feeds. This book will show you how to use Kafka efficiently, and contains practical solutions to the common problems that developers and administrators usually face while working with it.

This practical guide contains easy-to-follow recipes to help you set up, configure, and use Apache Kafka in the best possible manner. You will use Apache Kafka Consumers and Producers to build effective real-time streaming applications. The book covers the recently released Kafka version 1.0, the Confluent Platform and Kafka Streams. The programming aspect covered in the book will teach you how to perform important tasks such as message validation, enrichment and composition.Recipes focusing on optimizing the performance of your Kafka cluster, and integrate Kafka with a variety of third-party tools such as Apache Hadoop, Apache Spark, and Elasticsearch will help ease your day to day collaboration with Kafka greatly. Finally, we cover tasks related to monitoring and securing your Apache Kafka cluster using tools such as Ganglia and Graphite.

If you're looking to become the go-to person in your organization when it comes to working with Apache Kafka, this book is the only resource you need to have.

Publication date:
December 2017
Publisher
Packt
Pages
250
ISBN
9781787286849

 

Chapter 1. Configuring Kafka

In this chapter, we will cover the following topics:

  • Installing Kafka
  • Running Kafka
  • Configuring Kafka brokers
  • Configuring Kafka topics
  • Creating a message console producer
  • Creating a message console consumer
  • Configuring the broker settings
  • Configuring threads and performance
  • Configuring the log settings
  • Configuring the replica settings
  • Configuring the Zookeeper settings
  • Configuring other miscellaneous parameters
 

Introduction


This chapter explains the basic recipes to get started with Apache Kafka. It discusses how to install, configure, and run Kafka. It also discusses how to make basic operations with a Kafka broker.

Kafka can run in several operating systems: Mac, Linux, and even Windows. As it usually runs in production on Linux servers, the recipes in this book are designed to run in Linux environments. This book also considers the bash environment usage.

Kafka scales very well in a horizontal fashion without compromising speed and efficiency.

This chapter explains how to install, configure, and run Kafka. As this is a practical recipes book, it does not cover the theoretical details of Kafka. These three things are enough theory for the moment:

  1. To connect heterogeneous applications, we need to implement a mechanism for message publishing by sending and receiving messages among them. A message router is known as a message broker. Kafka is a software solution to deal with routing messages among consumers in a quick way.
  2. The message broker has two directives: the first is to not block the producers, and the second is to isolate producers and consumers (the producers should not know who their consumers are).
  3. Kafka is two things: a real-time, publish-subscribe solution, and a messaging system. Moreover, it is a solution: open source, distributed, partitioned, replicated, commit-log based, with a publish-subscribe schema.

Before we begin it is relevant to mention some concepts and nomenclature in Kafka:

  • Broker: A server process
  • Cluster: A set of brokers
  • Topic: A queue (that has log partitions)
  • Offset: A message identifier
  • Partition: An ordered and immutable sequence of records continually appended to a structured commit log
  • Producer: Those who publish data to topics
  • Consumer: Those who process the feed
  • ZooKeeper: The coordinator
  • Retention period: The time to keep messages available for consumption

In Kafka, there are three types of clusters:

  • Single node: Single broker
  • Single node: Multiple Broker
  • Multiple node: Multiple Broker

There are three ways to deliver messages:

  • Never redelivered: The messages may be lost
  • May be redelivered: The messages are never lost
  • Delivered once: The message is delivered exactly once

There are two types of log compaction:

  • Coarse grained: By time
  • Finer grained: By message

The next six recipes contain the necessary steps to make a full Kafka test from zero.

 

Installing Kafka


This is the first step. This recipe shows how to install Apache Kafka.

Getting ready

Ensure that you have at least 4 GB of RAM in your machine; the installation directory will be /usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users. Create these directories.

How to do it...

Go to the Apache Kafka home page at http://kafka.apache.org/downloads, as in Figure 1-1, Apache Kafka download page:

Figure 1-1. Apache Kafka download page

The current available version of Apache Kafka is 0.10.2.1, as a stable release. A major limitation with Kafka since 0.8.x is that it is not backward-compatible. So, we cannot replace this version for one prior to 0.8. Once you've downloaded the latest available release, let's proceed with the installation.

Remember, for Mac users, replace the directory /opt/ for /usr/local/ in the examples.

Installing Java in Linux

We need Java 1.7 or later. Download and install the latest JDK from Oracle's website: http://www.oracle.com/technetwork/java/javase/downloads/index.html

  1. Change the file mode:
> chmod +x jdk-8u131-linux-x64.rpm
  1. Go to the directory in which you want to perform the installation:
> cd <directory path name>
  1. Run the rpm installer with the command:
> rpm -ivh jdk-8u131-linux-x64.rpm
  1. Finally, add the environment variable JAVA_HOME. This command will write the JAVA_HOME environment variable to the file /etc/profile:
> echo "export JAVA_HOME=/usr/java/jdk1.8.0_131" >> /etc/profile

 Installing Scala in Linux

The following are the steps to install Scala in Linux:

  1. Download the latest Scala binary from: http://www.scala-lang.org/download
  2. Extract the downloaded file scala-2.12.2.tgz:
> tar xzf scala-2.12.2.tgz
  1. Most tutorials agree that the best place to set environment variables is in the /etc/profile file.

  2. Create the SCALA_HOME environment variable:
> export SCALA_HOME=/opt/scala
  1. Add the Scala bin directory to the PATH variable:
> export PATH=$PATH:$SCALA_HOME/bin

Installing Kafka in Linux

The following are the steps to install Kafka in Linux:

  1. Extract the downloaded file kafka_2.10-0.10.2.1.tgz:
> tar xzf kafka_2.10-0.10.2.1.tgz
  1. Create the KAFKA_HOME environment variable:
> export KAFKA_HOME=/opt/kafka_2.10-0.10.2.1
  1. Add the Kafka bin directory to the PATH variable:
> export PATH=$PATH:$KAFKA_HOME/bin

Now Java, Scala, and Kafka are installed.

There's more...

To do all these steps in command-line mode, there is a powerful tool for Mac users called brew (the equivalent in Linux would be yum).

To install from the command line, we use the following steps:

  1. With brew, install sbt (Scala build tool):
> brew install sbt

If you already have it (downloaded in the past), upgrade it:

> brew upgrade sbt

The output is similar to:

> brew upgrade sbt
==> Upgrading 1 outdated package, with result:
sbt 0.13.15
==> Upgrading sbt
==> Using the sandbox
==> Downloading https://github.com/sbt/sbt/releases/download/v0.13.15/sbt-0.13.15.tgz
==> Downloading from https://github-cloud.s3.amazonaws.com/releases/279553/09838df4-23c6-11e7-9276-14
######################################################################## 100.0%
==> Caveats
You can use $SBT_OPTS to pass additional JVM options to SBT:
   SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
This formula is now using the standard lightbend sbt launcher script.
Project specific options should be placed in .sbtopts in the root of your project.
Global settings should be placed in /usr/local/etc/sbtopts
==> Summary
/usr/local/Cellar/sbt/0.13.15: 378 files, 63.3MB, built in 1 minute 5 seconds
  1. With brew, install Scala:
> brew install scala

If you already have it (downloaded in the past), upgrade it:

> brew upgrade scala

The output is similar to:

> brew install scala
==> Using the sandbox
==> Downloading https://downloads.lightbend.com/scala/2.12.2/scala-2.12.2.tgz
######################################################################## 100.0%
==> Downloading https://raw.githubusercontent.com/scala/scala-tool-support/0a217bc/bash-completion/sr
######################################################################## 100.0%
==> Caveats
To use with IntelliJ, set the Scala home to:
/usr/local/opt/scala/idea
Bash completion has been installed to:
/usr/local/etc/bash_completion.d
==> Summary
/usr/local/Cellar/scala/2.12.2: 44 files, 19.9MB, built in 19 seconds
Mist:Downloads admin1$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
  1. With brew, install Kafka (it also installs ZooKeeper):
> brew install kafka

If you already have it (downloaded in the past), upgrade it:

> brew upgrade kafka

The output is similar to:

> brew install kafka
==> Installing dependencies for kafka: zookeeper
==> Installing kafka dependency: zookeeper
==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.9.sierra.bottle.tar.gz
######################################################################## 100.0%
==> Pouring zookeeper-3.4.9.sierra.bottle.tar.gz
==> Using the sandbox
==> Caveats
To have launched start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
zkServer start
==> Summary
/usr/local/Cellar/zookeeper/3.4.9:
242 files, 18.2MB
==> Installing kafka
==> Downloading https://homebrew.bintray.com/bottles/kafka-0.10.2.0.sierra.bottle.tar.gz
######################################################################## 100.0%
==> Pouring kafka-0.10.2.0.sierra.bottle.tar.gz
==> Caveats
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
==> Summary
/usr/local/Cellar/kafka/0.10.2.0: 145 files, 37.3MB

See also

 

Running Kafka


This is the second step. This recipe shows how to test the Apache Kafka installation.

Getting ready

Go to the Kafka installation directory (/usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users):

> cd /usr/local/kafka

How to do it...

  1. First of all, we need to run Zookeeper (sorry, the Kafka dependency on Zookeeper is still very strong):
 zkServer start

You will get the following result:

ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
  1. To check if Zookeeper is running, use the lsof command over the port 9093 (default port):
 > lsof -i :9093

You will get the following output:

COMMAND   PID   USER   FD   TYPE            DEVICE SIZE/OFF NODE NAME
java   17479 admin1   97u IPv6 0xcfbcde96aa59c3bf     0t0 TCP *:9093 (LISTEN)
  1. Now run the Kafka server that comes with the installation; go to /usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users, as follows:
 > ./bin/kafka-server-start.sh /config/server.properties

Now there is an Apache Kafka broker running on your machine.

There's more...

Remember that Zookeeper must be running on the machine before you start Kafka. If you don't want to start Zookeeper every time you need to run Kafka, install it as an operating system autostart service.

See also

 

Configuring Kafka brokers


This recipe shows how to deal with the Kafka brokers' basic configuration. For learning and development purposes, one can run Kafka in standalone mode. The real Kafka power is unlocked when it is running with replication in cluster mode and the topics are partitioned accordingly.

There are two main advantages of the cluster mode: parallelism and redundancy. Parallelism is the capacity to run tasks simultaneously among the cluster members. Redundancy warrants that when a Kafka node goes down, the cluster is safe and accessible from the other nodes.

Single node clusters are not recommended for production environments, so this recipe shows how to configure a cluster with several nodes.

Getting ready

Go to the Kafka installation directory (/usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users):

> cd /usr/local/kafka

How to do it...

As already said, a broker is a server's instance. This recipe shows how to start two different servers on one machine. There is a server configuration template called server.properties located in the Kafka installation directory in the config sub-directory:

  1. For each Kafka broker (server) that we want to run, we make a copy of the configuration file template and rename it accordingly. In this example, the cluster is called synergy:
> cp config/server.properties synergy-1.properties
> cp config/server.properties synergy-2.properties
  1. Modify each file according to the plan. If the file is called synergy-1, the broker.id should be 1. Specify the port in which the server should run; the recommendation is 9093 for synergy-1 and 9094 for synergy-2. The port property is not set in the template, so add the line accordingly. Finally, specify the location of the Kafka logs (specific archives to store all the Kafka broker operations); in this case, we use the directory /tmp.

In synergy-1.properties, set:

broker.id=1
port=9093
log.dir=/tmp/synergy-1-logs

In synergy-2.properties, set:

broker.id=2
port=9094
log.dir=/tmp/synergy-2-logs
  1. Start the Kafka brokers using the kafka-server-start.sh command with the corresponding configuration file. Don't forget that Zookeeper must be already running with its corresponding Kafka node and the ports should not be in use by another process:
> bin/kafka-server-start.sh synergy-1.properties &
...
> bin/kafka-server-start.sh synergy-2.properties &

Recall that trailing & is to specify that you want your command line back. If you want to see the broker output, it is recommended that you run each command in its own command-line window.

How it works...

The properties file contains the server configuration. The server.properties file located in the config directory is just a template.

All of the members of the cluster should point to the same Zookeeper cluster. Every broker is identified inside the cluster by the name specified in the broker.id property. If the port property is not specified, Zookeeper will assign the same port number and will overwrite the data. If log.dir is not specified, all the brokers will write to the same default log.dir. If the brokers are going to run on different machines, then port and log.dir might not be specified.

There's more...

Before assigning a port to a server, there is a useful command to see what process is running on a specific port (in this case, the port 9093):

> lsof -i :9093

The output of the previous command is something like this:

COMMAND   PID   USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java   17479 admin   97u IPv6 0xcfbcde96aa59c3bf     0t0 TCP *:9093 (LISTEN)

Try to run this command before starting the Kafka servers and run it after starting to see the change. Also, try to start a broker on a port in use to see how it fails.

To run Kafka nodes on different machines, change the ZooKeeper connection string in the configuration file; its default value is:

zookeeper.connect=localhost:2181

This value is correct only if you are running the Kafka broker on the same machine as Zookeeper. In production, it could not happen. To specify that ZooKeeper is running on different machines (that is, in a ZooKeeper cluster), set:

zookeeper.connect=localhost:2181, 192.168.0.2:2183, 192.168.0.3:2182

The previous line says that Zookeeper is running on the localhost machine on port 2181, on the machine with IP Address 192.168.0.2 on port 2183, and on the machine with IP Address 192.168.0.3 on port 2182. The Zookeeper default port is 2181, so try to run it there.

As an exercise, try to raise a broker with incorrect information about Zookeeper. Also, in combination with the lsof command, try to raise Zookeeper on a port in use.

See also

 

Configuring Kafka topics


The Kafka cluster is running, but the magic inside a broker is the queues, that is, the topics. This recipe shows the second step: how to create Kafka topics.

Getting ready

At this point, you need to:

  • Have installed Kafka
  • Have Zookeeper up and running
  • Have a Kafka server up and running
  • Go to the Kafka installation directory (/usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users):
cd /usr/local/kafka

How to do it...

Recall that almost all modern projects have two ways to do things: through the command line and through code. Yes, believe or not, the Kafka brokers' creation can be done through code in almost all the modern programming languages; the previous recipe showed just the command-line method. In later chapters, the process to achieve it programmatically is explained.

The same goes for the topics. They can be created through the command line and through code. In this recipe, we will show it through the command line. Kafka has built-in utilities to create brokers (as already shown) and topics. From the Kafka installation directory, type the following command:

> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic humbleTopic

The output should be:

Created topic "humbleTopic".

How it works...

Here, the kafka-topics.sh command is used. With the --create parameter, it is specified that we want to create a new topic. The --topic parameter set the name of the topic; in this case, humbleTopic.

The --replication-factor parameter is very important; it specifies how many servers of the cluster the topic is going to be replicated in (we mean, running). One broker can run just one replica. Obviously, if we specify a number greater than the number of running servers on the cluster, it is an error (don't be shy and try it in your environment), like this:

Error while executing topic command : replication factor: 3 larger than available brokers: 1
[2017-02-28 07:13:31,350] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 1
   (kafka.admin.TopicCommand$)

The --partitions parameter, as its name implies, says how many partitions our topic will have. The number of partitions determines the parallelism that can be achieved on the consumer's side. This parameter is fundamental when doing fine tuning on the cluster.

Finally, the --zoookeeper parameter indicates where the Zookeeper cluster is running.

When a topic is created, the output in the broker log is something like this:

[2017-02-28 07:05:53,910] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions humbleTopic-0 (kafka.server.ReplicaFetcherManager)
[2017-02-28 07:05:53,950] INFO Completed load of log humbleTopic-0 with 1 log segments and log end offset 0 in 21 ms (kafka.log.Log)

This message says that a new topic has been born in that broker.

There's more…

Yes, there are more parameters than --create. To check whether a topic has been successfully created, run the kafka-topics command with the --list parameter:

> ./bin/kafka-topics.sh --list --ZooKeeper localhost:2181 humbleTopic

This parameter returns the list of all the existent topics in the Kafka cluster.

To get the details of a particular topic, run the kafka-topics command with the --describe parameter:

> ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic humbleTopic

The command output is:

Topic:humbleTopic     PartitionCount:1      ReplicationFactor:1   Configs:
Topic: humbleTopic    Partition: 0  Leader: 1     Replicas: 1   Isr: 1

The explanation of the output is:

  • PartitionCount: Number of partitions existing on this topic.
  • ReplicationFactor: Number of replicas existing on this topic.
  • Leader: Node responsible for the reading and writing operations of a given partition.
  • Replicas: List of brokers replicating the Kafka data. Some of these might even be dead.
  • ISR: List of nodes that are currently in-sync replicas.

To create a topic with multiple replicas, we need to increase the replication factor as follows:

> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic replicatedTopic

The output is as follows:

Created topic "replicatedTopic".

Call the kafka-topics command with the --describe parameter to check the topic details:

> ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic replicatedTopic
Topic:replicatedTopic PartitionCount:1      ReplicationFactor:2   Configs:
Topic: replicatedTopic       Partition: 0  Leader: 1     Replicas: 1,2 Isr: 1,2

As Replicas and ISR (in-sync replicas) are the same lists, all the nodes are in-sync.

Try to play with all these commands; try to create replicated topics on dead servers and see the output. Also, create topics on running servers and then kill them to see the results.

As mentioned before, all the commands executed through the command line can be executed programmatically.

 

Creating a message console producer


Kafka also has a command to produce data through the console. The input can be a text file or the console standard input. Each line typed in the input is sent as a single message to the Kafka cluster.

Getting ready

For this recipe, you need the execution of the previous recipes in this chapter: Kafka already downloaded and installed, the Kafka nodes up and running, and a topic created inside the cluster. To begin producing some messages from the console, change to the Kafka directory in the command line.

How to do it...

Go to the Kafka installation directory (/usr/local/kafka/ for Mac users and /opt/kafka/ for Linux users):

> cd /usr/local/kafka

Run this command, followed by the lines to be sent as messages to the server:

./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic humbleTopic

Her first word was Mom

Her second word was Dad

How it works...

The previous command pushes two messages to the humbleTopic running on the localhost machine on the port 9093.

This is a simple way to check if a broker with a specific topic is up and running as expected.

There's more…

The kafka-console-producer program receives the following parameters:

  • --broker-list: Specifies the Zookeeper servers, specified as a comma-separated list of hostname and ports.
  • --topic: Followed by the target topic's name.
  • --sync: This parameter specifies whether the messages should be sent synchronously.
  • --compression-codec: This parameter specifies the compression codec used to produce the messages. The possible options are: none, gzip, snappy, or lz4. If not specified, the default is gzip.
  • --batch-size: The number of messages sent in a single batch if they are not sent synchronously. The batch's size value is specified in bytes.
  • --message-send-max-retries: Communication is not perfect; the brokers can fail receiving messages. This parameter specifies the number of retries before a producer gives up and drops the message. The number following this parameter must be a positive integer.
  • --retry-backoff-ms: The election of leader nodes might take some time. This is the time to wait before the producer retries after this election. The number following this parameter is the time in milliseconds.
  • --timeout: If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in milliseconds.
  • --queue-size: If set and the producer is running in asynchronous mode, this gives the maximum amount of time messages will queue awaiting sufficient batch size.

When doing server fine tuning, the batch-size, message-send-max-retries, and retry-backoff-ms are fundamental; take these parameters into consideration to achieve the desired performance.

Just a moment; someone could say, Eeey, I don't want to waste my precious time typing all the messages. For those people, the command receives a file where each line is considered a message:

> ./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic humbleTopic < firstWords.txt

If you want to see the complete list of arguments, take a look at the command source code at: https://apache.googlesource.com/kafka/+/0.8.2/core/src/main/scala/kafka/tools/ConsoleProducer.scala

 

Creating a message console consumer


Now, take the last step. In the previous recipes, it was explained how to produce messages from the console; this recipe indicates how to read the messages generated. Kafka also has a fancy command-line utility that enables consuming messages. Recall that all the command-line tasks can also be done programmatically. Also, recall that each line of the input was considered a message from the producer.

Getting ready

For this recipe, the execution of the previous recipes in this chapter is needed: Kafka already downloaded and installed, the Kafka nodes up and running, and a topic created inside the cluster. Also, some messages need to be produced with the message console producer. To begin consuming some messages from the console, change to the Kafka directory in the command line.

How to do it...

Consuming messages through the console is easy; just run the following command:

> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --from-beginning

Her first word was Mom
Her second word was Dad

How it works...

The parameters are the topic and broker names of the producer. Also, the --from-begining parameter specifies that messages should be consumed from the beginning instead of the last messages in the log (go and give it a try: generate many more messages and don't specify this parameter).

There's more...

There are more useful parameters for this command; some interesting ones are:

  • --fetch-size: The amount of data to be fetched in a single request. The size in bytes follows this argument. The default value is 1024 * 1024.
  • --socket-buffer-size: The size of the TCP RECV. The size in bytes follows this argument. The default value is 2 * 1024 * 1024.
  • --formater: The name of a class to use for formatting messages for display. The default value is NewlineMessageFormatter (already presented in the recipe).
  • --autocommit.interval.ms: The time interval at which to save the current offset (the offset concept will be explained later) in milliseconds. The time in milliseconds follows the argument. The default value is 10,000.
  • --max-messages: The maximum number of messages to consume before exiting. If not set, the consumption is continual. The number of messages follows the argument.
  • --skip-message-on-error: If there is an error while processing a message, the system should skip it instead of halt.

Enough boring theory; this is a practical cookbook, so look at these most solicited menu entries:

Consume just one message:

> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --max-messages 1

Consume one message from an offset:

> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --max-messages 1 --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'

Consume messages from a specific consumer group (consumer groups will be explained further):

> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --new-consumer --consumer-property group.id=my-group

If you want to see the complete list of arguments, take a look at the command source code at: https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

 

Configuring the broker settings


Most of Apache Kafka's magic is achieved through configuration. As with all the intensive messaging systems, the success factor is to configure them well. In this point, Kafka is highly configurable. In practice, most of the systems have average performance with the default settings, but in production, it is required to configure it to achieve optimal performance. Sometimes, finding the right configuration is a test and error task; there is no such thing as a configuration silver bullet.

The rest of the chapter is about Kafka broker fine tuning.

Getting ready

In previous recipes, it was explained how to install and run Kafka. Now, make a copy of the server.properties template in the config directory and open the copy with a text editor.

How to do it...

  1. Configure the basic settings in the configuration file.
  2. Set each one of the following parameters with these values:
broker.id=0
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs

How it works…

As shown in the previous recipes, all of the broker definition is contained in the configuration file. The rest is to pass the configuration file as an argument to the server-start command.

A detailed explanation of every parameter is as follows:

  • broker.id: A non-negative integer; the default value is 0. The name should be unique in the cluster. The important point here is to assign a name to the broker, so when it is moved to a different host or to a different port, no change is made in the consumer's side.
  • listeners: A comma-separated list of URIs the broker will listen on and the listener names. Examples of legal listener lists are: PLAINTEXT://127.0.0.1:9092, SSL://:9091, CLIENT://0.0.0.0:9092, and REPLICATION://localhost:9093.
  • host.name: DEPRECATED. A string; the default value is null. If it is not specified, Kafka will bind all the interfaces on the system. If it is specified, it will bind only to the specified address. Set this name if you want the clients to connect to a particular interface.
  • port: DEPRECATED. A non-negative integer; the default value is 9092. It is the TCP port in which listen connections. Note that in the file template this value is not set.
  • log.dir: A String; the default value is /tmp/kafka-logs. This is the directory where Kafka persists the messages locally. This parameter tells the directory where Kafka will store the data. It is very important that the user that runs the start command have write permissions on that directory.
  • log.dirs: A String; the default value is null. This is the directory where Kafka persists the messages locally. If not set, the value in log.dir is used. There can be more than one location specified, separating the directories with a comma.

There's more…

If bridged connections are used, it means that when the internal host.name and port are different from the ones which external parties need to connect to, this parameter is used:

  • advertised.listeners: The hostname given to producers, consumers, and other brokers specified to connect to. If it is not specified, it is the same as host.name.
 

Configuring threads and performance


No parameter should be left by default when the optimal performance is desired. These parameters should be taken into consideration to achieve the best behavior.

Getting ready

With your favorite text editor, open your server.properties file copy.

How to do it...

Adjust the following parameters:

message.max.bytes=1000000
num.network.threads=3
num.io.threads=8
background.threads=10
queued.max.requests=500
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1

How it works…

With these changes, the network and performance configurations have been set to achieve optimum levels for the application. Again, every system is different, and you might need to experiment a little to come up with the optimal one for a specific configuration.

Here is an explanation of every parameter:

  • message.max.bytes: Default value: 10 00 000. This is the maximum size, in bytes, for each message. This is designed to prevent any producer from sending extra large messages and saturating the consumers.
  • num.network.threads: Default value: 3. This is the number of simultaneous threads running to handle a network's request. If the system has too many simultaneous requests, consider increasing this value.
  • num.io.threads: Default value: 8. This is the number of threads for Input Output operations. This value should be at least the number of present processors.
  • background.threads: Default value: 10. This is the number of threads for background jobs. For example, old log files deletion.
  • queued.max.requests: Default value: 500. This is the number of messages queued while the other messages are processed by the I/O threads. Remember, when the queue is full, the network threads will not accept more requests. If your application has erratic loads, set this to a value at which it will not throttle.
  • socket.send.buffer.bytes: Default value: 102 400. This is SO_SNDBUFF buffer size, used for socket connections.
  • socket.receive.buffer.bytes: Default value: 102 400. This is SO_RCVBUFF buffer size, also used for socket connections.
  • socket.request.max.bytes: Default value: 104 857 600. This is the maximum request size, in bytes, that the server can accept. It should always be smaller than the Java heap size.
  • num.partitions: Default value: 1. This is the number of default partitions of a topic, without giving any partition size.

There's more...

As with everything that runs on the JVM, the Java installation should be tuned to achieve optimal performance. This includes the settings for heap, socket size, memory parameters, and garbage collection.

 

Configuring the log settings


Log refers to the file where all the messages are stored in the machine; here (in this book), when log is mentioned, think in terms of data structures, not just event recording.

The log settings are fundamental, so it is the way the messages are persisted in the broker machine.

Getting ready

With your favorite text editor, open your server.properties file copy.

How to do it...

Adjust the following parameters:

log.segment.bytes=1073741824
log.roll.hours=168
log.cleanup.policy=delete
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms= 30000
log.cleaner.enable=false
log.cleaner.threads=1
log.cleaner.backoff.ms=15000
log.index.size.max.bytes=10485760
log.index.interval.bytes=4096
log.flush.interval.messages=Long.MaxValue
log.flush.interval.ms=Long.MaxValue

How it works…

Here is an explanation of every parameter:

  • log.segment.bytes: Default value: 1 GB. This defines the maximum segment size in bytes (the concept of segment will be explained later). Once a segment file reaches that size, a new segment file is created. Topics are stored as a bunch of segment files in the log directory. This property can also be set per topic.
  • log.roll.{ms,hours}: Default value: 7 days. This defines the time period after a new segment file is created, even if it has not reached the size limit. This property can also be set per topic.
  • log.cleanup.policy: Default value: delete. Possible options are delete or compact. If the delete option is set, the log segments will be deleted periodically when it reaches its time threshold or size limit. If the compact option is set, log compaction is used to clean up obsolete records. This property can also be set per topic.
  • log.retention.{ms,minutes,hours}: Default value: 7 days. This defines the time to retain the log segments. This property can also be set per topic.
  • log.retention.bytes: Default value: -1. This defines the number of logs per partition to retain before deletion. This property can also be set per topic. The segments are deleted when the log time or size limits are reached.
  • log.retention.check.interval.ms: Default value is five minutes. This defines the time periodicity at which the logs are checked for deletion to meet retention policies.
  • log.cleaner.enable: To enable log compaction, set this to true.
  • log.cleaner.threads: Indicates the number of threads working on clean logs for compaction.
  • log.cleaner.backoff.ms: Periodicity at which the logs will check whether any log needs cleaning.
  • log.index.size.max.bytes: Default value: 1 GB. This sets the maximum size, in bytes, of the offset index. This property can also be set per topic.
  • log.index.interval.bytes: Default value: 4096. The interval at which a new entry is added to the offset index (the offset concept will be explained later). In each fetch request, the broker does a linear scan for this number of bytes to find the correct position in the log to begin and end the fetch. Setting this value too high may mean larger index files and more memory used, but less scanning.
  • log.flush.interval.messages: Default value: 9 223 372 036 854 775 807. The number of messages kept in memory before flushed to disk. It does not guarantee durability, but gives finer control.
  • log.flush.interval.ms: Sets maximum time in ms that a message in any topic is kept in memory before it is flushed to disk. If not set, it is used the value in log.flush.scheduler.interval.ms.

There's more…

All of the settings are listed at: http://kafka.apache.org/documentation.html#brokerconfigs.

See also

 

Configuring the replica settings


The replication is configured for reliability purposes. Replication can also be tuned.

Getting ready

With your favorite text editor, open your server.properties file copy.

How to do it...

Adjust the following parameters:

default.replication.factor=1
replica.lag.time.max.ms=10000
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms=5000
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536

How it works…

Here is an explanation of these settings:

  • default.replication.factor: Default value: 1. For an automatically created topic, this sets how many replicas it has.
  • replica.lag.time.max.ms: Default value: 10 000. There are leaders and followers; if a follower has not sent any fetch request or is not consumed up in at least this time, the leader will remove the follower from the ISR list and consider the follower dead.
  • replica.fetch.max.bytes: Default value: 1 048 576. In each request, for each partition, this value sets the maximum number of bytes fetched by a request from its leader. Remember that the maximum message size accepted by the broker is defined by message.max.bytes (broker configuration) or max.message.bytes (topic configuration).
  • replica.fetch.wait.max.ms: Default value: 500. This is the maximum amount of time for the leader to respond to a replica's fetch request. Remember that this value should always be smaller than the replica.lag.time.max.ms.
  • num.replica.fetchers: Default value: 1. The number of fetcher threads used to replicate messages from a source broker. Increasing this number increases the I/O rate in the following broker.
  • replica.high.watermark.checkpoint.interval.ms: Default value: 500. The high watermark (HW) is the offset of the last committed message. This value is the frequency at which each replica saves its high watermark to the disk for recovery.
  • fetch.purgatory.purge.interval.requests: Default value: 1000. Purgatory is the place where the fetch requests are kept on hold till they can be serviced (great name, isn't?). The purge interval is specified in number of requests (not in time) of the fetch request purgatory.
  • producer.purgatory.purge.interval.requests: Default value: 1000. It sets the purge interval in number of requests (not in time) of the producer request purgatory (do you catch the difference to the previous parameter?).

There's more...

Some other settings are listed here: http://kafka.apache.org/documentation.html#brokerconfigs

 

Configuring the ZooKeeper settings


Apache Zookeeper is a centralized service for maintaining configuration information providing distributed synchronization. ZooKeeper is used in Kafka for cluster management and to maintain the topics information synchronized.

Getting ready

With your favorite text editor, open your server.properties file copy.

How to do it…

Adjust the following parameters:

zookeeper.connect=127.0.0.1:2181,192.168.0.32:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

How it works…

Here is an explanation of these settings:

  • zookeeper.connect: Default value: null. This is a comma-separated value in the form of the hostname:port string, indicating the Zookeeper connection. Specifying several connections ensures the Kafka cluster reliability and continuity. When one node fails, Zookeeper uses the chroot path (/chroot/path) to make the data available under that particular path. This enables having the Zookeeper cluster available for multiple Kafka clusters. This path must be created before starting the Kafka cluster, and consumers must use the same string.
  • zookeeper.session.timeout.ms: Default value: 6000. Session timeout means that if in this time period a heartbeat from the server is not received, it is considered dead. This parameter is fundamental, since if it is long and if the server is dead the whole system will experience problems. If it is small, a living server could be considered dead.
  • zookeeper.connection.timeout.ms: Default value: 6000. This is the maximum time that the client will wait while establishing a connection to Zookeeper.
  • zookeeper.sync.time.ms: Default value: 2000. This is the time a Zookeeper follower can be behind its Zookeeper leader.

See also

 

Configuring other miscellaneous parameters


No parameter should be left at default when optimal behavior is desired. These parameters should be taken into consideration to achieve the best performance.

Getting ready

With your favorite text editor, open your server.properties file copy.

How to do it...

Adjust the following parameters:

auto.create.topics.enable=true
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=10
leader.imbalance.check.interval.seconds=300
offset.metadata.max.bytes=4096
max.connections.per.ip=Int.MaxValue
connections.max.idle.ms=600000
unclean.leader.election.enable=true
offsets.topic.num.partitions=50
offsets.topic.retention.minutes=1440
offsets.retention.check.interval.ms=600000
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
offsets.load.buffer.size=5242880
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000

How it works…

Here is an explanation of these settings:

  • auto.create.topics.enable: Default value: true. Suppose that metadata is fetched or a message is produced for a nonexistent topic; if this value is true, the topic will automatically be created. In production environments, this value should be false.
  • controlled.shutdown.enable: Default value: true. If this value is true, when a shutdown is called on the broker, the leader will gracefully move all the leaders to a different broker. When it is true, the availability is increased.
  • controlled.shutdown.max.retries: Default value: 3. This is the maximum number of retries the broker tries a controlled shutdown before making a forced and unclean shutdown.
  • controlled.shutdown.retry.backoff.ms: Default value: 5000. Suppose that a failure happens (controller fail over, replica lag, and so on); this value determines the time to wait before recovery from the state that caused the failure.
  • auto.leader.rebalance.enable: Default value: true. If this value is true, the broker will automatically try to balance the partition leadership among the brokers. At regular intervals, a background thread checks and triggers leader balance if required, setting the leadership to the preferred replica of each partition if available.
  • leader.imbalance.per.broker.percentage: Default value: 10. This value is specified in percentages and is the leader imbalance allowed per broker (the leader imbalance will be explained later). The cluster will rebalance the leadership if this percentage goes above the set value.
  • leader.imbalance.check.interval.seconds: Default value: 300. This value is the frequency at which to check the leader imbalance by the controller.
  • offset.metadata.max.bytes: Default value: 4096. This is the maximum size allowed to the client for a metadata to be stored with an offset commit.
  • max.connections.per.ip: Default value: 2 147 483 647. This is the maximum number of connections that the broker accepts from each IP address.
  • connections.max.idle.ms: Default value: 600 000. This is the idle connection's timeout. The server socket processor threads close the connections that idle more than this value.
  • unclean.leader.election.enable: Default value: true. If this value is true, the replicas that are not ISR can become leaders. Doing so may result in data loss.
  • offsets.topic.num.partitions: Default value: 50. This is the number of partitions for the offset commit topic. This value cannot be changed post deployment.
  • offsets.retention.minutes: Default value: 1440. This is the log retention window for the offsets topic. This is the time to keep the offsets. Passed this, the offsets will be marked for deletion.
  • offsets.retention.check.interval.ms: Default value: 60 000. This is the frequency at which to check for stale offsets
  • offsets.topic.replication.factor: Default value: 3. This is the number of replicas for the offset commit topic. The higher this value, the higher the availability. As shown in the previous recipes, if the number of brokers is lower than the replication factor, the number of replicas will be equal to the number of brokers.
  • offsets.topic.segment.bytes: Default value: 104 857 600. This is the segment size for the offsets topic. The lower this value, the faster the log compaction and cache loading are.
  • offsets.load.buffer.size: Default value: 5 242 880. This is the batch size to be used for reading offset segments when loading offsets into the cache.
  • offsets.commit.required.acks: Default value: -1. This is the number of acknowledgements required before the offset commit can be accepted. It is recommended to not override the default value of -1, meaning no acknowledgements required.
  • offsets.commit.timeout.ms: Default value: 5000. This is the time that an offset commit will be delayed until all replicas for the offsets topic receive the commit or this time value is reached. This value is similar to the producer request.timeout.ms.

See also

 

About the Author

  • Raúl Estrada

    Raúl Estrada has been a programmer since 1996 and a Java developer since 2001. He loves all topics related to computer science. With more than 15 years of experience in high-availability and enterprise software, he has been designing and implementing architectures since 2003. His specialization is in systems integration, and he mainly participates in projects related to the financial sector. He has been an enterprise architect for BEA Systems and Oracle Inc., but he also enjoys web, mobile, and game programming. Raúl is a supporter of free software and enjoys experimenting with new technologies, frameworks, languages, and methods.

    Raúl is the author of other Packt Publishing titles, such as Fast Data Processing Systems with SMACK and Apache Kafka Cookbook.

    Browse publications by this author

Latest Reviews

(4 reviews total)
I learned a lot how to implement more complex scenarios from simple use cases.
Best look, exceptional when the contents are up to what is required.
great book, so much useful information.

Recommended For You

Book Title
Unlock this book and the full library for only $5/m
Access now