In this chapter, we will cover the following topics:
This chapter explains the basic recipes to get started with Apache Kafka. It discusses how to install, configure, and run Kafka. It also discusses how to make basic operations with a Kafka broker.
Kafka can run in several operating systems: Mac, Linux, and even Windows. As it usually runs in production on Linux servers, the recipes in this book are designed to run in Linux environments. This book also considers the bash environment usage.
Kafka scales very well in a horizontal fashion without compromising speed and efficiency.
This chapter explains how to install, configure, and run Kafka. As this is a practical recipes book, it does not cover the theoretical details of Kafka. These three things are enough theory for the moment:
Before we begin it is relevant to mention some concepts and nomenclature in Kafka:
In Kafka, there are three types of clusters:
There are three ways to deliver messages:
There are two types of log compaction:
The next six recipes contain the necessary steps to make a full Kafka test from zero.
This is the first step. This recipe shows how to install Apache Kafka.
Ensure that you have at least 4 GB of RAM in your machine; the installation directory will be /usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users. Create these directories.
Go to the Apache Kafka home page at http://kafka.apache.org/downloads, as in Figure 1-1, Apache Kafka download page:
Figure 1-1. Apache Kafka download page
The current available version of Apache Kafka is 0.10.2.1, as a stable release. A major limitation with Kafka since 0.8.x is that it is not backward-compatible. So, we cannot replace this version for one prior to 0.8. Once you've downloaded the latest available release, let's proceed with the installation.
Remember, for Mac users, replace the directory /opt/
for /usr/local/
in the examples.
We need Java 1.7 or later. Download and install the latest JDK from Oracle's website: http://www.oracle.com/technetwork/java/javase/downloads/index.html
> chmod +x jdk-8u131-linux-x64.rpm
> cd <directory path name>
rpm
installer with the command:> rpm -ivh jdk-8u131-linux-x64.rpm
JAVA_HOME
. This command will write the JAVA_HOME
environment variable to the file /etc/profile
:> echo "export JAVA_HOME=/usr/java/jdk1.8.0_131" >> /etc/profile
The following are the steps to install Scala in Linux:
scala-2.12.2.tgz
:> tar xzf scala-2.12.2.tgz
Most tutorials agree that the best place to set environment variables is in the /etc/profile
file.
SCALA_HOME
environment variable:> export SCALA_HOME=/opt/scala
PATH
variable:> export PATH=$PATH:$SCALA_HOME/bin
The following are the steps to install Kafka in Linux:
kafka_2.10-0.10.2.1.tgz
:> tar xzf kafka_2.10-0.10.2.1.tgz
KAFKA_HOME
environment variable:> export KAFKA_HOME=/opt/kafka_2.10-0.10.2.1
PATH
variable:> export PATH=$PATH:$KAFKA_HOME/bin
Now Java, Scala, and Kafka are installed.
To do all these steps in command-line mode, there is a powerful tool for Mac users called brew (the equivalent in Linux would be yum).
To install from the command line, we use the following steps:
install sbt
(Scala build tool):> brew install sbt
If you already have it (downloaded in the past), upgrade it:
> brew upgrade sbt
The output is similar to:
> brew upgrade sbt ==> Upgrading 1 outdated package, with result: sbt 0.13.15 ==> Upgrading sbt ==> Using the sandbox ==> Downloading https://github.com/sbt/sbt/releases/download/v0.13.15/sbt-0.13.15.tgz ==> Downloading from https://github-cloud.s3.amazonaws.com/releases/279553/09838df4-23c6-11e7-9276-14 ######################################################################## 100.0% ==> Caveats You can use $SBT_OPTS to pass additional JVM options to SBT: SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M" This formula is now using the standard lightbend sbt launcher script. Project specific options should be placed in .sbtopts in the root of your project. Global settings should be placed in /usr/local/etc/sbtopts ==> Summary /usr/local/Cellar/sbt/0.13.15: 378 files, 63.3MB, built in 1 minute 5 seconds
> brew install scala
If you already have it (downloaded in the past), upgrade it:
> brew upgrade scala
The output is similar to:
> brew install scala ==> Using the sandbox ==> Downloading https://downloads.lightbend.com/scala/2.12.2/scala-2.12.2.tgz ######################################################################## 100.0% ==> Downloading https://raw.githubusercontent.com/scala/scala-tool-support/0a217bc/bash-completion/sr ######################################################################## 100.0% ==> Caveats To use with IntelliJ, set the Scala home to: /usr/local/opt/scala/idea Bash completion has been installed to: /usr/local/etc/bash_completion.d ==> Summary /usr/local/Cellar/scala/2.12.2: 44 files, 19.9MB, built in 19 seconds Mist:Downloads admin1$ scala -version Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
> brew install kafka
If you already have it (downloaded in the past), upgrade it:
> brew upgrade kafka
The output is similar to:
> brew install kafka ==> Installing dependencies for kafka: zookeeper ==> Installing kafka dependency: zookeeper ==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.9.sierra.bottle.tar.gz ######################################################################## 100.0% ==> Pouring zookeeper-3.4.9.sierra.bottle.tar.gz ==> Using the sandbox ==> Caveats To have launched start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start ==> Summary /usr/local/Cellar/zookeeper/3.4.9: 242 files, 18.2MB ==> Installing kafka ==> Downloading https://homebrew.bintray.com/bottles/kafka-0.10.2.0.sierra.bottle.tar.gz ######################################################################## 100.0% ==> Pouring kafka-0.10.2.0.sierra.bottle.tar.gz ==> Caveats To have launchd start kafka now and restart at login: brew services start kafka Or, if you don't want/need a background service you can just run: zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties ==> Summary /usr/local/Cellar/kafka/0.10.2.0: 145 files, 37.3MB
This is the second step. This recipe shows how to test the Apache Kafka installation.
Go to the Kafka installation directory (/usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users):
> cd /usr/local/kafka
zkServer start
You will get the following result:
ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED
lsof
command over the port 9093
(default port): > lsof -i :9093
You will get the following output:
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 17479 admin1 97u IPv6 0xcfbcde96aa59c3bf 0t0 TCP *:9093 (LISTEN)
/usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users, as follows: > ./bin/kafka-server-start.sh /config/server.properties
Now there is an Apache Kafka broker running on your machine.
Remember that Zookeeper must be running on the machine before you start Kafka. If you don't want to start Zookeeper every time you need to run Kafka, install it as an operating system autostart service.
This recipe shows how to deal with the Kafka brokers' basic configuration. For learning and development purposes, one can run Kafka in standalone mode. The real Kafka power is unlocked when it is running with replication in cluster mode and the topics are partitioned accordingly.
There are two main advantages of the cluster mode: parallelism and redundancy. Parallelism is the capacity to run tasks simultaneously among the cluster members. Redundancy warrants that when a Kafka node goes down, the cluster is safe and accessible from the other nodes.
Single node clusters are not recommended for production environments, so this recipe shows how to configure a cluster with several nodes.
Go to the Kafka installation directory (/usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users):
> cd /usr/local/kafka
As already said, a broker is a server's instance. This recipe shows how to start two different servers on one machine. There is a server configuration template called server.properties
located in the Kafka installation directory in the config
sub-directory:
synergy
:> cp config/server.properties synergy-1.properties > cp config/server.properties synergy-2.properties
synergy-1
, the broker.id
should be 1. Specify the port
in which the server should run; the recommendation is 9093
for synergy-1
and 9094
for synergy-2
. The port
property is not set in the template, so add the line accordingly. Finally, specify the location of the Kafka logs (specific archives to store all the Kafka broker operations); in this case, we use the directory /tmp
.In synergy-1.properties
, set:
broker.id=1 port=9093 log.dir=/tmp/synergy-1-logs
In synergy-2.properties
, set:
broker.id=2 port=9094 log.dir=/tmp/synergy-2-logs
kafka-server-start.sh
command with the corresponding configuration file. Don't forget that Zookeeper must be already running with its corresponding Kafka node and the ports should not be in use by another process:> bin/kafka-server-start.sh synergy-1.properties & ... > bin/kafka-server-start.sh synergy-2.properties &
Recall that trailing &
is to specify that you want your command line back. If you want to see the broker output, it is recommended that you run each command in its own command-line window.
The properties file contains the server configuration. The server.properties
file located in the config
directory is just a template.
All of the members of the cluster should point to the same Zookeeper cluster. Every broker is identified inside the cluster by the name specified in the broker.id
property. If the port
property is not specified, Zookeeper will assign the same port number and will overwrite the data. If log.dir
is not specified, all the brokers will write to the same default log.dir
. If the brokers are going to run on different machines, then port
and log.dir
might not be specified.
Before assigning a port to a server, there is a useful command to see what process is running on a specific port (in this case, the port 9093
):
> lsof -i :9093
The output of the previous command is something like this:
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 17479 admin 97u IPv6 0xcfbcde96aa59c3bf 0t0 TCP *:9093 (LISTEN)
Try to run this command before starting the Kafka servers and run it after starting to see the change. Also, try to start a broker on a port in use to see how it fails.
To run Kafka nodes on different machines, change the ZooKeeper connection string in the configuration file; its default value is:
zookeeper.connect=localhost:2181
This value is correct only if you are running the Kafka broker on the same machine as Zookeeper. In production, it could not happen. To specify that ZooKeeper is running on different machines (that is, in a ZooKeeper cluster), set:
zookeeper.connect=localhost:2181, 192.168.0.2:2183, 192.168.0.3:2182
The previous line says that Zookeeper is running on the localhost machine on port 2181
, on the machine with IP Address 192.168.0.2
on port 2183
, and on the machine with IP Address 192.168.0.3
on port 2182
. The Zookeeper default port is 2181
, so try to run it there.
As an exercise, try to raise a broker with incorrect information about Zookeeper. Also, in combination with the lsof
command, try to raise Zookeeper on a port in use.
server.properties
template (as all the Kafka projects) is published online at: https://github.com/apache/kafka/blob/trunk/config/server.propertiesThe Kafka cluster is running, but the magic inside a broker is the queues, that is, the topics. This recipe shows the second step: how to create Kafka topics.
At this point, you need to:
/usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users):cd /usr/local/kafka
Recall that almost all modern projects have two ways to do things: through the command line and through code. Yes, believe or not, the Kafka brokers' creation can be done through code in almost all the modern programming languages; the previous recipe showed just the command-line method. In later chapters, the process to achieve it programmatically is explained.
The same goes for the topics. They can be created through the command line and through code. In this recipe, we will show it through the command line. Kafka has built-in utilities to create brokers (as already shown) and topics. From the Kafka installation directory, type the following command:
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic humbleTopic
The output should be:
Created topic "humbleTopic".
Here, the kafka-topics.sh
command is used. With the --create
parameter, it is specified that we want to create a new topic. The --topic
parameter set the name of the topic; in this case, humbleTopic
.
The --replication-factor
parameter is very important; it specifies how many servers of the cluster the topic is going to be replicated in (we mean, running). One broker can run just one replica. Obviously, if we specify a number greater than the number of running servers on the cluster, it is an error (don't be shy and try it in your environment), like this:
Error while executing topic command : replication factor: 3 larger than available brokers: 1 [2017-02-28 07:13:31,350] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 1 (kafka.admin.TopicCommand$)
The --partitions
parameter, as its name implies, says how many partitions our topic will have. The number of partitions determines the parallelism that can be achieved on the consumer's side. This parameter is fundamental when doing fine tuning on the cluster.
Finally, the --zoookeeper
parameter indicates where the Zookeeper cluster is running.
When a topic is created, the output in the broker log is something like this:
[2017-02-28 07:05:53,910] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions humbleTopic-0 (kafka.server.ReplicaFetcherManager) [2017-02-28 07:05:53,950] INFO Completed load of log humbleTopic-0 with 1 log segments and log end offset 0 in 21 ms (kafka.log.Log)
This message says that a new topic has been born in that broker.
Yes, there are more parameters than --create
. To check whether a topic has been successfully created, run the kafka-topics
command with the --list
parameter:
> ./bin/kafka-topics.sh --list --ZooKeeper localhost:2181 humbleTopic
This parameter returns the list of all the existent topics in the Kafka cluster.
To get the details of a particular topic, run the kafka-topics
command with the --describe
parameter:
> ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic humbleTopic
The command output is:
Topic:humbleTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: humbleTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
The explanation of the output is:
PartitionCount
: Number of partitions existing on this topic.ReplicationFactor
: Number of replicas existing on this topic.Leader
: Node responsible for the reading and writing operations of a given partition.Replicas
: List of brokers replicating the Kafka data. Some of these might even be dead.ISR
: List of nodes that are currently in-sync replicas.To create a topic with multiple replicas, we need to increase the replication factor as follows:
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic replicatedTopic
The output is as follows:
Created topic "replicatedTopic".
Call the kafka-topics
command with the --describe
parameter to check the topic details:
> ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic replicatedTopic Topic:replicatedTopic PartitionCount:1 ReplicationFactor:2 Configs: Topic: replicatedTopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
As Replicas and ISR (in-sync replicas) are the same lists, all the nodes are in-sync.
Try to play with all these commands; try to create replicated topics on dead servers and see the output. Also, create topics on running servers and then kill them to see the results.
As mentioned before, all the commands executed through the command line can be executed programmatically.
Kafka also has a command to produce data through the console. The input can be a text file or the console standard input. Each line typed in the input is sent as a single message to the Kafka cluster.
For this recipe, you need the execution of the previous recipes in this chapter: Kafka already downloaded and installed, the Kafka nodes up and running, and a topic created inside the cluster. To begin producing some messages from the console, change to the Kafka directory in the command line.
Go to the Kafka installation directory (/usr/local/kafka/
for Mac users and /opt/kafka/
for Linux users):
> cd /usr/local/kafka
Run this command, followed by the lines to be sent as messages to the server:
./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic humbleTopic
Her first word was Mom
Her second word was Dad
The previous command pushes two messages to the humbleTopic
running on the localhost machine on the port 9093
.
This is a simple way to check if a broker with a specific topic is up and running as expected.
The kafka-console-producer
program receives the following parameters:
--broker-list
: Specifies the Zookeeper servers, specified as a comma-separated list of hostname and ports.--topic
: Followed by the target topic's name.--sync
: This parameter specifies whether the messages should be sent synchronously.--compression-codec
: This parameter specifies the compression codec used to produce the messages. The possible options are: none
, gzip
, snappy
, or lz4
. If not specified, the default is gzip
.--batch-size
: The number of messages sent in a single batch if they are not sent synchronously. The batch's size value is specified in bytes.--message-send-max-retries
: Communication is not perfect; the brokers can fail receiving messages. This parameter specifies the number of retries before a producer gives up and drops the message. The number following this parameter must be a positive integer.--retry-backoff-ms
: The election of leader nodes might take some time. This is the time to wait before the producer retries after this election. The number following this parameter is the time in milliseconds.--timeout
: If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in milliseconds.--queue-size
: If set and the producer is running in asynchronous mode, this gives the maximum amount of time messages will queue awaiting sufficient batch size.When doing server fine tuning, the batch-size
, message-send-max-retries
, and retry-backoff-ms
are fundamental; take these parameters into consideration to achieve the desired performance.
Just a moment; someone could say, Eeey, I don't want to waste my precious time typing all the messages. For those people, the command receives a file where each line is considered a message:
> ./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic humbleTopic < firstWords.txt
If you want to see the complete list of arguments, take a look at the command source code at: https://apache.googlesource.com/kafka/+/0.8.2/core/src/main/scala/kafka/tools/ConsoleProducer.scala
Now, take the last step. In the previous recipes, it was explained how to produce messages from the console; this recipe indicates how to read the messages generated. Kafka also has a fancy command-line utility that enables consuming messages. Recall that all the command-line tasks can also be done programmatically. Also, recall that each line of the input was considered a message from the producer.
For this recipe, the execution of the previous recipes in this chapter is needed: Kafka already downloaded and installed, the Kafka nodes up and running, and a topic created inside the cluster. Also, some messages need to be produced with the message console producer. To begin consuming some messages from the console, change to the Kafka directory in the command line.
Consuming messages through the console is easy; just run the following command:
> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --from-beginning Her first word was Mom Her second word was Dad
The parameters are the topic and broker names of the producer. Also, the --from-begining
parameter specifies that messages should be consumed from the beginning instead of the last messages in the log (go and give it a try: generate many more messages and don't specify this parameter).
There are more useful parameters for this command; some interesting ones are:
--fetch-size
: The amount of data to be fetched in a single request. The size in bytes follows this argument. The default value is 1024 * 1024.--socket-buffer-size
: The size of the TCP RECV
. The size in bytes follows this argument. The default value is 2 * 1024 * 1024.--formater
: The name of a class to use for formatting messages for display. The default value is NewlineMessageFormatter
(already presented in the recipe).--autocommit.interval.ms
: The time interval at which to save the current offset (the offset concept will be explained later) in milliseconds. The time in milliseconds follows the argument. The default value is 10,000.--max-messages
: The maximum number of messages to consume before exiting. If not set, the consumption is continual. The number of messages follows the argument.--skip-message-on-error
: If there is an error while processing a message, the system should skip it instead of halt.Enough boring theory; this is a practical cookbook, so look at these most solicited menu entries:
Consume just one message:
> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --max-messages 1
Consume one message from an offset:
> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --max-messages 1 --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'
Consume messages from a specific consumer group (consumer groups will be explained further):
> ./bin/kafka-console-consumer.sh --topic humbleTopic --bootstrap-server localhost:9093 --new-consumer --consumer-property group.id=my-group
If you want to see the complete list of arguments, take a look at the command source code at: https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Most of Apache Kafka's magic is achieved through configuration. As with all the intensive messaging systems, the success factor is to configure them well. In this point, Kafka is highly configurable. In practice, most of the systems have average performance with the default settings, but in production, it is required to configure it to achieve optimal performance. Sometimes, finding the right configuration is a test and error task; there is no such thing as a configuration silver bullet.
The rest of the chapter is about Kafka broker fine tuning.
In previous recipes, it was explained how to install and run Kafka. Now, make a copy of the server.properties
template in the config
directory and open the copy with a text editor.
broker.id=0 listeners=PLAINTEXT://localhost:9093 log.dirs=/tmp/kafka-logs
As shown in the previous recipes, all of the broker definition is contained in the configuration file. The rest is to pass the configuration file as an argument to the server-start
command.
A detailed explanation of every parameter is as follows:
broker.id
: A non-negative integer; the default value is 0. The name should be unique in the cluster. The important point here is to assign a name to the broker, so when it is moved to a different host or to a different port, no change is made in the consumer's side.listeners
: A comma-separated list of URIs the broker will listen on and the listener names. Examples of legal listener lists are: PLAINTEXT://127.0.0.1:9092
, SSL://:9091
, CLIENT://0.0.0.0:9092
, and REPLICATION://localhost:9093
.host.name
: DEPRECATED. A string; the default value is null. If it is not specified, Kafka will bind all the interfaces on the system. If it is specified, it will bind only to the specified address. Set this name if you want the clients to connect to a particular interface.port
: DEPRECATED. A non-negative integer; the default value is 9092
. It is the TCP port in which listen connections. Note that in the file template this value is not set.log.dir
: A String; the default value is /tmp/kafka-logs
. This is the directory where Kafka persists the messages locally. This parameter tells the directory where Kafka will store the data. It is very important that the user that runs the start command have write permissions on that directory.log.dirs
: A String; the default value is null. This is the directory where Kafka persists the messages locally. If not set, the value in log.dir
is used. There can be more than one location specified, separating the directories with a comma.If bridged connections are used, it means that when the internal host.name
and port
are different from the ones which external parties need to connect to, this parameter is used:
advertised.listeners
: The hostname given to producers, consumers, and other brokers specified to connect to. If it is not specified, it is the same as host.name
.No parameter should be left by default when the optimal performance is desired. These parameters should be taken into consideration to achieve the best behavior.
Adjust the following parameters:
message.max.bytes=1000000 num.network.threads=3 num.io.threads=8 background.threads=10 queued.max.requests=500 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1
With these changes, the network and performance configurations have been set to achieve optimum levels for the application. Again, every system is different, and you might need to experiment a little to come up with the optimal one for a specific configuration.
Here is an explanation of every parameter:
message.max.bytes
: Default value: 10 00 000. This is the maximum size, in bytes, for each message. This is designed to prevent any producer from sending extra large messages and saturating the consumers.num.network.threads
: Default value: 3. This is the number of simultaneous threads running to handle a network's request. If the system has too many simultaneous requests, consider increasing this value.
num.io.threads
: Default value: 8. This is the number of threads for Input Output operations. This value should be at least the number of present processors.background.threads
: Default value: 10. This is the number of threads for background jobs. For example, old log files deletion.queued.max.requests
: Default value: 500. This is the number of messages queued while the other messages are processed by the I/O threads. Remember, when the queue is full, the network threads will not accept more requests. If your application has erratic loads, set this to a value at which it will not throttle.socket.send.buffer.bytes
: Default value: 102 400. This is SO_SNDBUFF
buffer size, used for socket connections.socket.receive.buffer.bytes
: Default value: 102 400. This is SO_RCVBUFF
buffer size, also used for socket connections.socket.request.max.bytes
: Default value: 104 857 600. This is the maximum request size, in bytes, that the server can accept. It should always be smaller than the Java heap size.num.partitions
: Default value: 1. This is the number of default partitions of a topic, without giving any partition size.Log refers to the file where all the messages are stored in the machine; here (in this book), when log is mentioned, think in terms of data structures, not just event recording.
The log settings are fundamental, so it is the way the messages are persisted in the broker machine.
Adjust the following parameters:
log.segment.bytes=1073741824 log.roll.hours=168 log.cleanup.policy=delete log.retention.hours=168 log.retention.bytes=-1 log.retention.check.interval.ms= 30000 log.cleaner.enable=false log.cleaner.threads=1 log.cleaner.backoff.ms=15000 log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 log.flush.interval.messages=Long.MaxValue log.flush.interval.ms=Long.MaxValue
Here is an explanation of every parameter:
log.segment.bytes
: Default value: 1 GB. This defines the maximum segment size in bytes (the concept of segment will be explained later). Once a segment file reaches that size, a new segment file is created. Topics are stored as a bunch of segment files in the log directory. This property can also be set per topic.log.roll.{ms,hours}
: Default value: 7 days. This defines the time period after a new segment file is created, even if it has not reached the size limit. This property can also be set per topic.log.cleanup.policy
: Default value: delete. Possible options are delete or compact. If the delete option is set, the log segments will be deleted periodically when it reaches its time threshold or size limit. If the compact option is set, log compaction is used to clean up obsolete records. This property can also be set per topic.log.retention.{ms,minutes,hours}
: Default value: 7 days. This defines the time to retain the log segments. This property can also be set per topic.log.retention.bytes
: Default value: -1. This defines the number of logs per partition to retain before deletion. This property can also be set per topic. The segments are deleted when the log time or size limits are reached.
log.retention.check.interval.ms
: Default value is five minutes. This defines the time periodicity at which the logs are checked for deletion to meet retention policies.log.cleaner.enable
: To enable log compaction, set this to true.log.cleaner.threads
: Indicates the number of threads working on clean logs for compaction.log.cleaner.backoff.ms
: Periodicity at which the logs will check whether any log needs cleaning.log.index.size.max.bytes
: Default value: 1 GB. This sets the maximum size, in bytes, of the offset index. This property can also be set per topic.log.index.interval.bytes
: Default value: 4096. The interval at which a new entry is added to the offset index (the offset concept will be explained later). In each fetch request, the broker does a linear scan for this number of bytes to find the correct position in the log to begin and end the fetch. Setting this value too high may mean larger index files and more memory used, but less scanning.log.flush.interval.messages
: Default value: 9 223 372 036 854 775 807. The number of messages kept in memory before flushed to disk. It does not guarantee durability, but gives finer control.log.flush.interval.ms
: Sets maximum time in ms that a message in any topic is kept in memory before it is flushed to disk. If not set, it is used the value in log.flush.scheduler.interval.ms
.All of the settings are listed at: http://kafka.apache.org/documentation.html#brokerconfigs.
The replication is configured for reliability purposes. Replication can also be tuned.
Adjust the following parameters:
default.replication.factor=1 replica.lag.time.max.ms=10000 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 num.replica.fetchers=1 replica.high.watermark.checkpoint.interval.ms=5000 fetch.purgatory.purge.interval.requests=1000 producer.purgatory.purge.interval.requests=1000 replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536
Here is an explanation of these settings:
default.replication.factor
: Default value: 1. For an automatically created topic, this sets how many replicas it has.replica.lag.time.max.ms
: Default value: 10 000. There are leaders and followers; if a follower has not sent any fetch request or is not consumed up in at least this time, the leader will remove the follower from the ISR list and consider the follower dead.replica.fetch.max.bytes
: Default value: 1 048 576. In each request, for each partition, this value sets the maximum number of bytes fetched by a request from its leader. Remember that the maximum message size accepted by the broker is defined by message.max.bytes
(broker configuration) or max.message.bytes
(topic configuration).
replica.fetch.wait.max.ms
: Default value: 500. This is the maximum amount of time for the leader to respond to a replica's fetch request. Remember that this value should always be smaller than the replica.lag.time.max.ms
.num.replica.fetchers
: Default value: 1. The number of fetcher threads used to replicate messages from a source broker. Increasing this number increases the I/O rate in the following broker.replica.high.watermark.checkpoint.interval.ms
: Default value: 500. The high watermark (HW) is the offset of the last committed message. This value is the frequency at which each replica saves its high watermark to the disk for recovery.fetch.purgatory.purge.interval.requests
: Default value: 1000. Purgatory is the place where the fetch requests are kept on hold till they can be serviced (great name, isn't?). The purge interval is specified in number of requests (not in time) of the fetch request purgatory.producer.purgatory.purge.interval.requests
: Default value: 1000. It sets the purge interval in number of requests (not in time) of the producer request purgatory (do you catch the difference to the previous parameter?).Some other settings are listed here: http://kafka.apache.org/documentation.html#brokerconfigs
Apache Zookeeper is a centralized service for maintaining configuration information providing distributed synchronization. ZooKeeper is used in Kafka for cluster management and to maintain the topics information synchronized.
Adjust the following parameters:
zookeeper.connect=127.0.0.1:2181,192.168.0.32:2181 zookeeper.session.timeout.ms=6000 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000
Here is an explanation of these settings:
zookeeper.connect
: Default value: null. This is a comma-separated value in the form of the hostname:port
string, indicating the Zookeeper connection. Specifying several connections ensures the Kafka cluster reliability and continuity. When one node fails, Zookeeper uses the chroot path (/chroot/path
) to make the data available under that particular path. This enables having the Zookeeper cluster available for multiple Kafka clusters. This path must be created before starting the Kafka cluster, and consumers must use the same string.zookeeper.session.timeout.ms
: Default value: 6000. Session timeout means that if in this time period a heartbeat from the server is not received, it is considered dead. This parameter is fundamental, since if it is long and if the server is dead the whole system will experience problems. If it is small, a living server could be considered dead.zookeeper.connection.timeout.ms
: Default value: 6000. This is the maximum time that the client will wait while establishing a connection to Zookeeper.zookeeper.sync.time.ms
: Default value: 2000. This is the time a Zookeeper follower can be behind its Zookeeper leader.No parameter should be left at default when optimal behavior is desired. These parameters should be taken into consideration to achieve the best performance.
Adjust the following parameters:
auto.create.topics.enable=true controlled.shutdown.enable=true controlled.shutdown.max.retries=3 controlled.shutdown.retry.backoff.ms=5000 auto.leader.rebalance.enable=true leader.imbalance.per.broker.percentage=10 leader.imbalance.check.interval.seconds=300 offset.metadata.max.bytes=4096 max.connections.per.ip=Int.MaxValue connections.max.idle.ms=600000 unclean.leader.election.enable=true offsets.topic.num.partitions=50 offsets.topic.retention.minutes=1440 offsets.retention.check.interval.ms=600000 offsets.topic.replication.factor=3 offsets.topic.segment.bytes=104857600 offsets.load.buffer.size=5242880 offsets.commit.required.acks=-1 offsets.commit.timeout.ms=5000
Here is an explanation of these settings:
auto.create.topics.enable
: Default value: true. Suppose that metadata is fetched or a message is produced for a nonexistent topic; if this value is true, the topic will automatically be created. In production environments, this value should be false.controlled.shutdown.enable
: Default value: true. If this value is true, when a shutdown is called on the broker, the leader will gracefully move all the leaders to a different broker. When it is true, the availability is increased.controlled.shutdown.max.retries
: Default value: 3. This is the maximum number of retries the broker tries a controlled shutdown before making a forced and unclean shutdown.controlled.shutdown.retry.backoff.ms
: Default value: 5000. Suppose that a failure happens (controller fail over, replica lag, and so on); this value determines the time to wait before recovery from the state that caused the failure.auto.leader.rebalance.enable
: Default value: true. If this value is true, the broker will automatically try to balance the partition leadership among the brokers. At regular intervals, a background thread checks and triggers leader balance if required, setting the leadership to the preferred replica of each partition if available.leader.imbalance.per.broker.percentage
: Default value: 10. This value is specified in percentages and is the leader imbalance allowed per broker (the leader imbalance will be explained later). The cluster will rebalance the leadership if this percentage goes above the set value.leader.imbalance.check.interval.seconds
: Default value: 300. This value is the frequency at which to check the leader imbalance by the controller.offset.metadata.max.bytes
: Default value: 4096. This is the maximum size allowed to the client for a metadata to be stored with an offset commit.max.connections.per.ip
: Default value: 2 147 483 647. This is the maximum number of connections that the broker accepts from each IP address.connections.max.idle.ms
: Default value: 600 000. This is the idle connection's timeout. The server socket processor threads close the connections that idle more than this value.unclean.leader.election.enable
: Default value: true. If this value is true, the replicas that are not ISR can become leaders. Doing so may result in data loss.
offsets.topic.num.partitions
: Default value: 50. This is the number of partitions for the offset commit topic. This value cannot be changed post deployment.offsets.retention.minutes
: Default value: 1440. This is the log retention window for the offsets topic. This is the time to keep the offsets. Passed this, the offsets will be marked for deletion.offsets.retention.check.interval.ms
: Default value: 60 000. This is the frequency at which to check for stale offsetsoffsets.topic.replication.factor
: Default value: 3. This is the number of replicas for the offset commit topic. The higher this value, the higher the availability. As shown in the previous recipes, if the number of brokers is lower than the replication factor, the number of replicas will be equal to the number of brokers.offsets.topic.segment.bytes
: Default value: 104 857 600. This is the segment size for the offsets topic. The lower this value, the faster the log compaction and cache loading are.offsets.load.buffer.size
: Default value: 5 242 880. This is the batch size to be used for reading offset segments when loading offsets into the cache.offsets.commit.required.acks
: Default value: -1. This is the number of acknowledgements required before the offset commit can be accepted. It is recommended to not override the default value of -1, meaning no acknowledgements required.offsets.commit.timeout.ms
: Default value: 5000. This is the time that an offset commit will be delayed until all replicas for the offsets topic receive the commit or this time value is reached. This value is similar to the producer request.timeout.ms
.
Where there is an eBook version of a title available, you can buy it from the book details for that title. Add either the standalone eBook or the eBook and print book bundle to your shopping cart. Your eBook will show in your cart as a product on its own. After completing checkout and payment in the normal way, you will receive your receipt on the screen containing a link to a personalised PDF download file. This link will remain active for 30 days. You can download backup copies of the file by logging in to your account at any time.
If you already have Adobe reader installed, then clicking on the link will download and open the PDF file directly. If you don't, then save the PDF file on your machine and download the Reader to view it.
Please Note: Packt eBooks are non-returnable and non-refundable.
Packt eBook and Licensing When you buy an eBook from Packt Publishing, completing your purchase means you accept the terms of our licence agreement. Please read the full text of the agreement. In it we have tried to balance the need for the ebook to be usable for you the reader with our needs to protect the rights of us as Publishers and of our authors. In summary, the agreement says:
If you want to purchase a video course, eBook or Bundle (Print+eBook) please follow below steps:
Our eBooks are currently available in a variety of formats such as PDF and ePubs. In the future, this may well change with trends and development in technology, but please note that our PDFs are not Adobe eBook Reader format, which has greater restrictions on security.
You will need to use Adobe Reader v9 or later in order to read Packt's PDF eBooks.
Packt eBooks are a complete electronic version of the print edition, available in PDF and ePub formats. Every piece of content down to the page numbering is the same. Because we save the costs of printing and shipping the book to you, we are able to offer eBooks at a lower cost than print editions.
When you have purchased an eBook, simply login to your account and click on the link in Your Download Area. We recommend you saving the file to your hard drive before opening it.
For optimal viewing of our eBooks, we recommend you download and install the free Adobe Reader version 9.