This chapter explains the basics of getting started with Kafka. We do not cover the theoretical details of Kafka, but the practical aspects of it. It assumes that you have already installed Kafka version 0.8.2 and ZooKeeper and have started a node as well. You understand that Kafka is a highly distributed messaging system that connects your data ingestion system to your real-time or batch processing systems such as Storm, Spark, or Hadoop. Kafka allows you to scale your systems very well in a horizontal fashion without compromising on speed or efficiency. You are now ready to get started with Kafka broker. We will discuss how you can do basic operations on your Kafka broker; this will also check whether the installation is working well. Since Kafka is usually used on Linux servers, this book assumes that you are using a similar environment. Though you can run Kafka on Mac OS X, similar to Linux, running Kafka on a Windows environment is a very complex process. There is no direct way of running Kafka on Windows, so we are keeping that out of this book. We are going to only consider bash environment for usage here.
You can easily run Kafka in the standalone mode, but the real power of Kafka is unlocked when it is run in the cluster mode with replication and the topics are appropriately partitioned. It gives you the power of parallelism and data safety by making sure that, even if a Kafka node goes down, your data is still safe and accessible from other nodes. In this recipe, you will learn how to run multiple Kafka brokers.
I assume that you already have the experience of starting a Kafka node with the configuration files present at the Kafka install location. Change your current directory to the place where you have Kafka installed:
> cd /opt/kafka
To start multiple brokers, the first thing we need to do is we have to write the configuration files. For ease, you can start with the configuration file present in
config/server.properties and perform the following steps.
For creating three different brokers in our single test machine, we will create two copies of the configuration file and modify them accordingly:
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
We need to modify these files before they can be used to start other Kafka nodes for our cluster. We need to change the
broker.idproperty, which has to be unique for each broker in the cluster. The
portnumber for Kafka to run and the location of the Kafka logs using
log.dirneeds to be specified. So, we will modify the files as follows:
config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
You now need to start the Kafka brokers with this configuration file. This is assuming that you have already started ZooKeeper and have a single Kafka node that is running:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties &
server.properties files contain the configuration of your brokers. They all should point to the same ZooKeeper cluster. The
broker.id property in each of the files is unique and defines the name of the node in the cluster. The
port number and
log.dir are changed so we can get them running on the same machine; else all the nodes will try to bind at the same port and will overwrite the data. If you want to run them on different machines, you need not change them.
To run Kafka nodes on different servers, you also need to change the ZooKeeper connection string's details in the
This is good if you are running Kafka off the same server as ZooKeeper; but in real life, you would be running them off different servers. So, you might want to change them to the correct ZooKeeper connection strings as follows:
ZooKeeper.connect=localhost:2181, 192.168.0.2:2181, 192.168.0.3:2181
This means that you are running the ZooKeeper cluster at the localhost nodes,
192.168.0.3, at the port number
Look at the configuration file
in config/server.propertiesfor details on several other properties that can also be set. You can also look it up online at https://github.com/apache/kafka/blob/trunk/config/server.properties.
Now that we have our cluster up and running, let's get started with other interesting things. In this recipe, you will learn how to create topics in Kafka that would be your first step toward getting things done using Kafka.
You must have already downloaded and set up Kafka. Now, in the command line, change to the Kafka directory. You also must have at least one Kafka node up and running.
It's very easy to create topics from the command line. Kafka comes with a built-in utility to create topics. You need to enter the following command from the directory where you have installed Kafka:
> bin/kafka-topics.sh --create --ZooKeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
The number of partitions determines the parallelism that can be achieved on the consumer's side. So, it is important that the partition number is selected carefully based on how your Kafka data will be consumed.
replication factor determines the number of replicas of this topic present in the cluster. There can be a maximum of one replica for a topic in each broker. This means that, if the number of replicas is more than the number of brokers, the number of replicas will be capped at the number of brokers.
If you want to check whether your topic has been successfully created, you can run the following command:
> bin/kafka-topics.sh --list --ZooKeeper localhost:2181 kafkatest
This will print out all the topics that exist in the Kafka cluster. After successfully running the earlier command, your Kafka topic will be created and printed.
To get details of a particular topic, you can run the following command:
> bin/kafka-topics.sh --describe --ZooKeeper localhost:2181 --topic kafkatest Topic:kafkatest PartitionCount:1 ReplicationFactor:1 Configs: Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
The explanation of the output is as follows:
> bin/kafka-topics.sh --create --ZooKeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replicatedkafkatest
This will give the following output while checking for the details of the topic:
> bin/kafka-topics.sh --describe --ZooKeeper localhost:2181 --topic replicatedkafkatest Topic:replicatedkafkatest PartitionCount:1 ReplicationFactor:3 Configs: Topic: replicatedkafkatest Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
This means that there is a
replicatedkafkatest topic, which has a single partition with replication factor of
3. All the three nodes are in-sync.
Downloading the example code
You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Kafka installation has a command-line utility that enables you to produce data. You can give a file as an input or you can give a standard input. It will send each line in these inputs as a message to the Kafka clusters.
As in the previous recipe, you must have already downloaded and set up Kafka. Now, in the command line, change to the Kafka directory. You have already started the Kafka nodes as mentioned in the previous recipes. You will need to create a topic as well. Now, you are ready to send some messages to Kafka from your console.
To send messages from the console perform the following steps:
You can run the next command followed by some text that will be sent to the server as messages:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatest First message Second message
There are more parameters that you can pass to the console producer program. A short list of them is as follows:
--message-send-max-retries: Brokers can sometimes fail to receive messages for a number of reasons and being unavailable transiently is just one of them. This property specifies the number of retries before a producer gives up and drops the message. This is followed by the number of retries that you want to set.
--retry-backoff-ms: Before each retry, the producer refreshes the metadata of relevant topics. Since leader election might take some time, it's good to specify some time before producer retries. This parameter does just that. This follows the time in
This is a simple way of checking whether your broker with a topic is up and running as expected.
You have produced some messages from the console, but it is important to check whether they can be read properly. For this, Kafka provides a command-line utility that enables you to consume messages. Each line of its output will be a message from the Kafka log.
As in the previous recipe, you must have already downloaded and set up Kafka. Now, in the command line, change to the Kafka directory. I would also assume that you have set up a Kafka node and created a topic with it. You also have to send some messages to a Kafka topic, as mentioned in the previous recipes, before you consume anything.
To consume the messages from the console perform the following steps:
You can run the following command and get the messages in Kafka as an output:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatest --from-beginning First message Second message
The given parameters are the ZooKeeper's
portTopic nameOptional directive to start consuming the messages from the beginning instead of consuming the latest messages in the Kafka log.
This tells the program to get data from the Kafka logs under the given topic from the node mentioned in the given ZooKeeper from the beginning. It will then print them on the console.
Some other parameters that you can pass are shown as follows:
--fetch-size: This specifies the amount of data to be fetched in a single request. Its size in bytes follows this argument.
--socket-buffer-size: This specifies the size of the
TCP RECVsize. The size in bytes follows the argument.
--autocommit.interval.ms: This specifies the time interval in which the current offset is saved in
ms. The time in
msfollows the argument.
--max-messages: This specifies the maximum number of messages to consume before exiting. If it is not set, the consumption is unlimited. The number of messages follows the argument.
--skip-message-on-error: This specifies that, if there is an error while processing a message, the system should not stop. Instead, it should just skip the current messages.