In this chapter we will cover:
Connecting to a broker
Producing messages
Consuming messages
Using body serialization with JSON
Using RPC with messaging
Broadcasting messages
Working with message routing using direct exchanges
Working with message routing using topic exchanges
Guaranteeing message processing
Distributing messages to many consumers
Using message properties
Messaging with transactions
Handling unroutable messages
Advanced Message Queuing Protocol (AMQP) has been developed because of the need for interoperability among the many different messaging solutions, that were developed a few years ago by many different vendors such as IBM MQ-Series, TIBCO, or Microsoft MSMQ.
The AMQP 0-9-1 standard gives a complete specification of the protocol, particularly regarding:
The API interface
The wire protocol
RabbitMQ is a free and complete AMQP broker implementation. It implements version 0-9-1 of the AMQP specification; this is the most widespread version today and it is the last version that focuses on the client API. That's what we want to put the focus on, especially in this chapter.
On the other hand, AMQP 1.0 only defines the evolution of the wire-level protocol—the format of the data being passed at the application level—for the exchange of messages between two endpoints; so 0-9-1 is actually the most updated client library specification.
RabbitMQ includes:
The broker itself, that is, the service that will actually handle the messages that are going to be sent and received by the applications
The API implementations for Java, C#, and Erlang languages
It is also possible to use APIs for languages downloadable from the RabbitMQ site itself, from third-party sites, or even using AMQP APIs not strictly related to RabbitMQ (http://www.rabbitmq.com/devtools.html). Since the AMQP standard specifies the wire protocol, they are going to be mostly interoperable, except for some custom extensions. That will be discussed in detail in the next chapter.
In the course of the book we will particularly use some of the following APIs:
The Java AMQP client library (http://www.rabbitmq.com/java-client.html)
Pika, one of the Python AMQP client libraries (http://pypi.python.org/pypi/pika)
The .NET/C# AMQP client (http://www.rabbitmq.com/dotnet.html)
The RabbitMQ C client API (https://github.com/alanxz/rabbitmq-c)
The Ruby client library (https://github.com/ruby-amqp/bunny)
In this first chapter we are mainly using Java since this language is widely used in enterprise software development, integration, and distribution. RabbitMQ is a perfect fit in this environment.
In order to run the examples in this recipe, you will first need to:
Install Java JDK 1.6+
Install the Java RabbitMQ client library
Properly configure
CLASSPATH
and your preferred development environment (Eclipse, NetBeans, and so on)Install the RabbitMQ server on a machine (this can be the same local machine)
The natural choice is to install it on your desktop (Windows, Linux, and Mac OS X are all fine choices), but you can also install it on one or more external servers; for example, virtual machines, physical servers, and Raspberry PI servers (http://www.raspberrypi.org/) on cloud service providers.
Tip
In this book we are not providing instructions on the installation of RabbitMQ itself. You can find detailed instructions on the RabbitMQ site.
Most of the examples will work connecting to the RabbitMQ broker running on the localhost. If you have chosen to install or use RabbitMQ from a different machine, you will need to specify its hostname as a command-line parameter of the examples themselves, for example:
java -cp ./bin rmqexample.Publish [Rabbitmq-host]
For the examples involving Python, you will need Python 2.7+ installed and the Pika library, an AMQP implementation for Python (https://pypi.python.org/pypi/pika). The fastest way to install Pika is by using PIP (https://pypi.python.org/pypi/pip). In the command prompt, just type:
pip install pika
We will also present some recipes using .NET where the accent is mainly on interoperability.
You can download the working examples in their full form at http://www.packtpub.com/support.
The recipes presented in this chapter will tackle all the basic concepts exposed by AMQP, using RabbitMQ.
Every application that uses AMQP needs to establish a connection with the AMQP broker. By default, RabbitMQ (as well as any other AMQP broker up to version 1.0) works over TCP as a reliable transport protocol on port 5672, that is, the IANA-assigned port.
We are now going to discuss how to create the connection. In all the subsequent recipes we will refer to the connection and channel as the results of the operations presented here.
To use this recipe we need to set up the Java development environment as mentioned in the Introduction section.
In order to create a Java client that connects to the RabbitMQ broker, you need to perform the following steps:
Import the needed classes from the Java RabbitMQ client library in the program namespace:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
Create an instance of the client
ConnectionFactory
:ConnectionFactory factory = new ConnectionFactory();
Set the
ConnectionFactory
options:factory.setHost(rabbitMQhostname);
Connect to the RabbitMQ broker:
Connection connection = factory.newConnection();
Create a channel from the freshly created connection:
Channel channel = connection.createChannel();
As soon as we are done with RabbitMQ, release the channel and the connection:
channel.close(); connection.close();
Using the Java client API, the application must create an instance of ConnectionFactory
and set the host where RabbitMQ should be running with the setHost()
method.
After the Java imports (step 1), we have instantiated the factory
object (step 2). In this example we have just set the hostname that we have chosen to optionally get from the command line (step 3), but you can find more information regarding connection options in the section There's more….
In step 4 we have actually established the TCP connection to the RabbitMQ broker.
Tip
In this recipe we have used the default connection parameters user: guest
, password: guest
, and vhost: /
; we will discuss these parameters later.
However, we are not yet ready to communicate with the broker; we need to set up a communication channel (step 5). This is an advanced concept of AMQP; using this abstraction, it is possible to let many different messaging sessions use the same logical connection.
Actually, all the communication operations of the Java client library are performed by the methods of a channel instance.
If you are developing multithreaded applications, it is highly recommended to use a different channel for each thread. If many threads use the same channel, they will serialize their execution in the channel method calls, leading to possible performance degradation.
It is possible to specify many different optional properties for any RabbitMQ connection. You can find them all in the online documentation at (http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc). These options are all self-explanatory, except for the AMQP virtual host.
Virtual hosts are administrative containers; they allow to configure many logically independent brokers hosts within one single RabbitMQ instance, to let many different independent applications share the same RabbitMQ server. Each virtual host can be configured with its independent set of permissions, exchanges, and queues and will work in a logically separated environment.
It's possible to specify connection options by using just a connection string, also called
connection URI, with the factory.setUri()
method:
ConnectionFactory factory = new ConnectionFactory(); String uri="amqp://user:pass@hostname:port/vhost"; factory.setUri(uri);
Tip
The URI must conform to the syntax specified in RFC3986 (http://www.ietf.org/rfc/rfc3986.txt).
In this recipe we are learning how to send a message to an AMQP queue. We will be introduced to the building blocks of AMQP messaging: messages, queues, and exchanges.
You can find the source at Chapter01/Recipe02/src/rmqexample
.
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
After connecting to the broker, as seen in the previous recipe, you can start sending messages performing the following steps:
Declare the queue, calling the
queueDeclare()
method oncom.rabbitmq.client.Channel
:String myQueue = "myFirstQueue"; channel.queueDeclare(myQueue, true, false, false, null);
Send the very first message to the RabbitMQ broker:
String message = "My message to myFirstQueue"; channel.basicPublish("",myQueue, null, message.getBytes());
Send the second message with different options:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
In this first basic example we have been able to just send a message to RabbitMQ.
After the communication channel is established, the first step is to ensure that the destination queue exists. This task is accomplished declaring the queue (step 1) calling queueDeclare()
. The method call does nothing if the queue already exists, otherwise it creates the queue itself.
Note
If the queue already exists but has been created with different parameters, queueDeclare()
will raise an exception.
Note that this, as most of the AMQP operations, is a method of the Channel
Java interface. All the operations that need interactions with the broker are carried out through channels.
Let's examine the meaning of the queueDeclare()
method call in depth. Its template can be found in the Java client reference documentation located at http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/. The documentation will be as shown in the following screenshot:

In particular we have used the second overload of this method that we report here:
AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete, java.util.Map<java.lang.String,java.lang.Object> arguments) throws java.io.IOException
The meanings of the individual arguments are:
queue
: This is just the name of the queue where we will be storing the messages.durable
: This specifies whether the queue will survive server restarts. Note that it is required for a queue to be declared as durable if you want persistent messages to survive a server restart.exclusive
: This specifies whether the queue is restricted to only this connection.autoDelete
: This specifies whether the queue will be automatically deleted by the RabbitMQ broker as soon as it is not in use.arguments
: This is an optional map of queue construction arguments.
In step 2 we have actually sent a message to the RabbitMQ broker.
The message body will never be opened by RabbitMQ. Messages are opaque entities for the AMQP broker, and you can use any serialization format you like. We often use JSON, but XML, ASN.1, standard or custom, ASCII or binary format, are all valid alternatives. The only important thing is that the client applications should know how to interpret the data.
Let's now examine in depth the basicPublish()
method of the Channel
interface for the overload used in our recipe:
void basicPublish(java.lang.String exchange, java.lang.String routingKey, AMQP.BasicProperties props, byte[] body) throws java.io.IOException
In our example the exchange
argument has been set to the empty string ""
, that is, the default exchange, and the routingKey
argument to the name of the queue. In this case the message is directly sent to the queue specified as routingKey
. The body
argument is set to the byte
array of our string, that is, just the message that we sent. The props
argument is set to null
as a default; these are the message properties, discussed in depth in the recipe Using message properties.
For example, in step 3 we have sent an identical message, but with props
set to MessageProperties.PERSISTENT_TEXT_PLAIN
; in this way we have requested RabbitMQ to mark this message as a persistent message.
Both the messages have been dispatched to the RabbitMQ broker, logically queued in the myFirstQueue
queue. The messages will stay buffered there until a client, (typically, a different client) gets it.
If the queue has been declared with the durable
flag set to true
and the message has been marked as persistent, it is stored on the disk by the broker. If one of the two conditions is missing, the message is stored in the memory. In the latter case the buffered messages won't survive a RabbitMQ restart, but the message delivery and retrieval will be much faster. However, we will dig down on this topic in Chapter 8, Performance Tuning for RabbitMQ.
In this section we will discuss the methods to check the status of RabbitMQ and whether a queue already exists.
In order to check the RabbitMQ status, you can use the command-line control tool rabbitmqctl
. It should be in the PATH
in the Linux setup. On Windows it can be found running the RabbitMQ command shell by navigating to Start Menu | All Programs | RabbitMQ Server | RabbitMQ Command Prompt (sbin dir). We can run rabbitmqctl.bat
from this command prompt.
We can check the queue status with the command rabbitmqclt list_queues
. In the following screenshot, we have run it just before and after we have run our example.

We can see our myfirstqueue queue listed in the preceding screenshot, followed by the number 2, which is just the number of the messages buffered into our queue.
Now we can either try to restart RabbitMQ, or reboot the machine hosting it. Restarting RabbitMQ successfully will depend on the used OS:
On Linux, RedHat, Centos, Fedora, Raspbian, and so on:
service rabbitmq-server restart
On Linux, Ubuntu, Debian, and so on:
/etc/init.d/rabbitmq restart
On Windows:
sc stop rabbitmq / sc start rabbitmq
How many messages should we expect when we run rabbitmqclt list_queues
again?
In order to be sure that a specific queue already exists, replace channel.queueDeclare()
with channel.queueDeclarePassive()
. The behavior of the two methods is the same in case the queue already exists; but in case it doesn't, the first one will silently create it and return back (that's actually the most frequently used case), the latter will raise an exception.
In this recipe we are closing the loop; we have already seen how to send messages to RabbitMQ—or to any AMQP broker—and now we are ready to learn how to retrieve them.
You can find the source code of the recipe at Chapter01/Recipe03/src/rmqexample/nonblocking
.
To use this recipe we need to set up the Java development environment as indicated in the introduction.
In order to consume the messages sent as seen in the previous recipe, perform the following steps:
Declare the queue where we want to consume the messages from:
String myQueue="myFirstQueue"; channel.queueDeclare(myQueue, true, false, false, null);
Define a specialized consumer class inherited from
DefaultConsumer
:public class ActualConsumer extends DefaultConsumer { public ActualConsumer(Channel channel) { super(channel); } @Override public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws java.io.IOException { String message = new String(body); System.out.println("Received: " + message); } }
Create a
consumer
object, which is an instance of this class, bound to our channel:ActualConsumer consumer = new ActualConsumer(channel);
Start consuming messages:
String consumerTag = channel.basicConsume(myQueue, true, consumer);
Once done, stop the consumer:
channel.basicCancel(consumerTag);
After we have established the connection and the channel to the AMQP broker as seen in the Connecting to the broker recipe, we need to ensure that the queue from which we are going to consume the messages exists (step 1).
In fact it is possible that the consumer is started before any producer has sent a message to the queue and the queue itself may actually not exist at all. To avoid the failure of the subsequent operations on the queue, we need to declare the queue.
Tip
By allowing both producers and consumers to declare the same queue, we are decoupling their existence; the order in which we start them is not important.
The heart of this recipe is step 2. Here we have defined our specialized consumer that overrides handleDelivery()
and instantiated it in step 3. In the Java client API the consumer callbacks are defined by the com.rabbitmq.client.Consumer
interface. We have extended our consumer from DefaultConsumer
, which provides a no-operation implementation for all the methods declared in the Consumer
interface.
In step 3, by calling channel.basicConsume()
, we let the consumer threads start consuming messages. The consumers of each channel are always executed on the same thread, independent of the calling one.
Now that we have activated a consumer for myQueue
, the Java client library starts getting messages from that queue on the RabbitMQ broker, and invokes handleDelivery()
for each one.
Then after the channel.basicConsume()
method's invocation, we just sit idle waiting for a key press in the main thread. Messages are being consumed with nonblocking semantics respecting the
event-driven paradigm, typical of messaging applications.
Only after we press Enter, the execution proceeds to step 5, cancelling the consumer. At this point the consumer threads stop invoking our consumer object, and we can release the resources and exit.
In this section we will learn more about consumer threads and the use of blockage semantics.
At connection definition time, the RabbitMQ Java API allocates a thread pool from which it will allocate consumer threads on need.
All the consumers bound to one channel will be executed by one single thread in the pool; however, it is possible that consumers from different channels are handled by the same thread. That's why it is important to avoid long-lasting operations in the consumer methods, in order to avoid blocking other consumers.
It is also possible to handle the consumer thread pool by ourselves, as we have shown in our example; however, this not obligatory at all. We have defined a thread pool, java.util.concurrent.ExecutorService
, and passed it at connection time:
ExecutorService eService = Executors.newFixedThreadPool(10); Connection connection = factory.newConnection(eService);
As we were managing it, we were also in charge of terminating it:
eService.shutdown();
However, remember that if you don't define your own ExecutorService
thread pool, the Java client library will create one during connection creation time, and destroy it as soon as we destroy the corresponding connections.
It is possible to use blocking semantics too, but we strongly discourage this approach if it's not being used for simple applications and test cases; the recipe to consume messages is non-blocking.
However, you can find the source code for the blocking approach at Chapter01/Recipe03/src/rmqexample/blocking
.
You can find all the available methods of the consumer interface in the official Javadoc at
In AMQP the messages are opaque entities; AMQP does not provide any standard way to encode/decode them.
However, web applications very often use JSON as an application layer format, that is, the JavaScript serialization format that has become a de-facto standard; in this way, the RabbitMQ client Java library can include some utility functions for this task.
On the other side, this is not the only protocol; any application can choose its own protocol (XML, Google Protocol Buffers, ASN.1, or proprietary).
In this example we are showing how to use the JSON protocol to encode and decode message bodies. We are using a publisher written in Java (Chapter01/Recipe04/Java_4/src/rmqexample
) and a consumer in Python (Chapter01/Recipe04/Python04
).
To use this recipe you will need to set up Java and Python environments as described in the introduction.
To implement a Java producer and a Python consumer, you can perform the following steps:
Java: In addition to the standard import described in the recipe Connecting to the broker, we have to import:
import com.rabbitmq.tools.json.JSONWriter;
Java: We create a queue that is not persistent:
String myQueue="myJSONBodyQueue_4"; channel.queueDeclare(MyQueue, false, false, false, null);
Java: We create a list for the
Book
class and fill it with example data:List<Book>newBooks = new ArrayList<Book>(); for (inti = 1; i< 11; i++) { Book book = new Book(); book.setBookID(i); book.setBookDescription("History VOL: " + i ); book.setAuthor("John Doe"); newBooks.add(book); }
Java: We are ready to serialize the
newBooks
instance withJSONwriter
:JSONWriter rabbitmqJson = new JSONWriter(); String jsonmessage = rabbitmqJson.write(newBooks);
Java: We can finally send our
jsonmessage
:channel.basicPublish("",MyQueue,null, jsonmessage.getBytes());
Python: To use the Pika library we must add the follow import:
import pika; import json;
Python has a powerful built-in library for JSON.
Python: In order to create a connection to RabbitMQ, use the following code:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
Python: Let's declare a queue, bind as a consumer, and then register a callback:
channel = connection.channel() my_queue = "myJSONBodyQueue_4" channel.queue_declare(queue=my_queue) channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) channel.start_consuming()
After we set up the environments (step 1 and step 2), we serialize the newbooks
class with the method write(newbooks)
. The method returns a JSON String (jsonmessage)
as shown in the following code snippet:
[ { "author" : "John Doe", "bookDescription" : "History VOL: 1", "bookID" : 1 }, { "author" : "John Doe", "bookDescription" : "History VOL: 2", "bookID" : 2 } ]
In step 4 we publish jsonmessage
to the queue myJSONBodyQueue_4
. Now the Python Consumer
can get the message from the same queue. Let's see how to do it in Python:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)); channel = connection.channel() queue_name = "myJSONBodyQueue_4" channel.queue_declare(queue=my_queue) .. channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) channel.start_consuming()
As we have seen in the Java implementation, we must create a connection and then create a channel. With the method channel.queue_declare(queue=myQueue)
, we declare a queue that is not durable, exclusive or autodelete. In order to change the queue's attribute, it's enough to add the parameter in the queue_declare
method as follows:
channel.queue_declare(queue=myQueue,
durable=True)
Tip
When different AMQP clients declare the same queue, it's important that all of them specify the same durable, exclusive, and autodelete attributes. Otherwise, channel.queue_declare()
will raise an exception.
With the method channel.basic_consume()
, the client starts consuming messages from the given queue, invoking the callback consumer_callback()
where it will receive the messages.
While the callbacks in Java were defined in the consumer
interface, in Python they are just passed to basic_consume()
, in spite of the more functional, less declarative, and less formal paradigm typical of Python.
The callback consumer_callback
is as follows:
def consumer_callback(ch, method, properties, body):
newBooks=json.loads(body); print" Count books:",len(newBooks); for item in newBooks: print 'ID:',item['bookID'], '-Description:',item['bookDescription'],' -Author:',item['author']
The callback takes the message, deserializes it with json.loads()
, and then the newBooks
structure is ready to be read.
The JSON helper tools included in the RabbitMQ client library are very simple, but in a real project you can evaluate them to use an external JSON library. For example, a powerful Java JSON library is google-gson (https://code.google.com/p/google-gson/) or jackson (http://jackson.codehaus.org/).
Remote Procedure Calls (RPC) are commonly used with client-server architectures. The client is required to perform some actions to the server, and then waits for the server reply.
The messaging paradigm tries to enforce a totally different approach with the fire-and-forget messaging style, but it is possible to use properly designed AMQP queues to perform and enhance RPC, as shown in the following figure:

Graphically it is depicted that the request queue is associated with the responder, the reply queues with the callers.
However, when we use RabbitMQ, all the involved peers (both the callers and the responders) are AMQP clients.
We are now going to describe the steps performed in the example in Chapter01/Recipe05/Java_5/src/rmqexample/rpc
.
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
Let's perform the following steps to implement the RPC responder:
Declare the
request queue
where the responder will be waiting for the RPC requests:channel.queueDeclare(requestQueue, false, false, false, null);
Define our specialized consumer
RpcResponderConsumer
by overridingDefaultConsumer.handleDelivery()
as already seen in the Consuming messages recipe. On the reception of each RPC request, this consumer will:Perform the action required in the RPC request
Prepare the
reply
messageSet the correlation ID in the reply properties by using the following code:
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
Publish the answer on the
reply
queue:getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
Send the
ack
to the RPC:
getChannel().basicAck(envelope.getDeliveryTag(), false);
Start consuming messages, until we stop it as already seen in the Consuming messages recipe.
Now let's perform the following steps to implement the RPC caller:
Declare the request queue where the responder will be waiting for the RPC requests:
channel.queueDeclare(requestQueue,
false, false, false, null);
Create a temporary, private, autodelete reply queue:
String replyQueue = channel.queueDeclare().getQueue();
Define our specialized consumer
RpcCallerConsumer
, which will take care of receiving and handling RPC replies. It will:Allow to specify what to do when it gets the replies (in our example, by defining
AddAction()
)Override
handleDelivery()
:public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws java.io.IOException { String messageIdentifier = properties.getCorrelationId(); String action = actions.get(messageIdentifier); actions.remove(messageIdentifier); String response = new String(body); OnReply(action, response); }
Start consuming reply messages invoking
channel.basicConsume()
.Prepare and serialize the requests (
messageRequest
in our example).Initialize an arbitrary, unique message identifier (
messageIdentifier
).Define what to do when the consumer gets the corresponding reply, by binding the action with the
messageIdentifier
. In our example we do it by calling our custom methodconsumer.AddAction()
.Publish the message to
requestqueue
, setting its properties:BasicProperties props = new BasicProperties.Builder() .correlationId(messageIdentifier) .replyTo(replyQueue).build(); channel.basicPublish("", requestQueue, props,messageRequest.getBytes());
In this example the RPC responder takes the role of an RPC server; the responder listens on the requestQueue
public queue (step 1), where the callers will place their requests.
Each caller, on the other hand, will consume the responder replies on its own private queue, created in step 5.
When the caller sends a message (step 11), it includes two properties: the name of the temporary reply queue (replyTo()
) where it will be listening, and a message identifier (correlationId()
), needed by the caller to identify the call when the reply comes back.
In fact, in our example we have implemented an asynchronous RPC caller. The action to be performed by the RpcCallerConsumer
(step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction()
(step 10).
Coming back to the responder, the RPC logic is all in the RpcResponderConsumer
. This is not different from a specialized non-blocking consumer, as we have seen in the Consuming messages recipe, except for two details:
The reply queue name is got by the message properties,
properties.getReplyTo()
. Its value has been set by the caller to its private, temporary reply queue.The reply message must include in its properties the correlation ID sent in the incoming message.
In this section we will discuss the use of blocking RPC and some scalability notes.
Sometimes simplicity is more important than scalability. In this case it is possible to use the following helper classes, included in the Java RabbitMQ client library, that implement blocking RPC semantics:
com.rabbitmq.client.
RpcClientcom.rabbitmq.client.
StringRpcServer
The logic is identical, but there are no non-blocking consumers involved, and the handling of temporary queues and correlation IDs is transparent to the user.
You can find a working example at Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc.
What happens when there are multiple callers? It mainly works as a standard RPC client/server architecture. But what if we run many responders?
In this case all the responders will take care of consuming messages from the request queue. Furthermore, the responders can be located on different hosts. We have just got load distribution for free. More on this topic is in the recipe Distributing messages to many consumers.
In this example we are seeing how to send the same message to a possibly large number of consumers.
This is a typical messaging application, broadcasting to a huge number of clients. For example, when updating the scoreboard in a massive multiplayer game, or when publishing news in a social network application.
In this recipe we are discussing both the producer and consumer implementation.
Since it is very typical to have consumers using different technologies and programming languages, we are using Java, Python, and Ruby to show interoperability with AMQP.
We are going to appreciate the benefits of having separated exchanges and queues in AMQP.
You can find the source in Chapter01/Recipe06/
.
To use this recipe you will need to set up Java, Python and Ruby environments as described in the Introduction section.
To cook this recipe we are preparing four different codes:
The Java publisher
The Java consumer
The Python consumer
The Ruby consumer
To prepare a Java publisher:
Declare a
fanout
exchange:channel.exchangeDeclare(myExchange, "fanout");
Send one message to the exchange:
channel.basicPublish(myExchange, "", null, jsonmessage.getBytes());
Then to prepare a Java consumer:
Declare the same
fanout
exchange declared by the producer:channel.exchangeDeclare(myExchange, "fanout");
Autocreate a new temporary queue:
String queueName = channel.queueDeclare().getQueue();
Bind the queue to the exchange:
channel.queueBind(queueName, myExchange, "");
Define a custom, non-blocking consumer, as already seen in the Consuming messages recipe.
Consume messages invoking
channel.basicConsume()
The source code of the Python consumer is very similar to the Java consumer, so there is no need to repeat the needed steps. Just follow the steps of the Java consumer, looking to the source code in the archive of the recipes at:
Chapter01/Recipe06/Python_6/PyConsumer.py
In the Ruby consumer you need to use require
"bunny" and then use the URI connection. Check out the source code at:
Chapter01/Recipe06/Ruby_6/RbConsumer.rb
We are now ready to mix all together, to see the recipe in action:
Start one instance of the Java producer; messages start getting published immediately.
Start one or more instances of the Java/Python/Ruby consumer; the consumers receive only the messages sent while they are running.
Stop one of the consumers while the producer is running, and then restart it; we can see that the consumer has lost the messages sent while it was down.
Both the producer and the consumers are connected to RabbitMQ with a single connection, but the logical path of the messages is depicted in the following figure:

In step 1 we have declared the exchange that we are using. The logic is the same as in the queue declaration: if the specified exchange doesn't exist, create it; otherwise, do nothing.
The second argument of exchangeDeclare()
is a string, specifying the type of the exchange, fanout
in this case.
In step 2 the producer sends one message to the exchange. You can just view it along with the other defined exchanges issuing the following command on the RabbitMQ command shell:
rabbitmqctl list_exchanges
The second argument in the call to channel.basicPublish()
is the
routing key, which is always ignored when used with a fanout
exchange. The third argument, set to null
, is the optional message property (more on this in the Using message properties recipe). The fourth argument is just the message itself.
When we started one consumer, it created its own temporary queue (step 9). Using the channel.queueDeclare()
empty overload, we are creating a nondurable, exclusive, autodelete queue with an autogenerated name.
Launching a couple of consumers and issuing rabbitmqctl list_queues
, we can see two queues, one per consumer, with their odd names, along with the persistent myFirstQueue
used in previous recipes as shown in the following screenshot:

In step 5 we have bound the queues to myExchange
. It is possible to monitor these bindings too, issuing the following command:
rabbitmqctl list_bindings
The monitoring is a very important aspect of AMQP; messages are routed by exchanges to the bound queues, and buffered in the queues.
The fanout
exchange routes messages by just placing a copy of them in each bound queue. So, no bound queues and all the messages are just received by no one consumer (see the Handling unroutable messages recipe for more details).
As soon as we close one consumer, we implicitly destroy its private temporary queue (that's why the queues are autodelete; otherwise, these queues would be left behind unused, and the number of queues on the broker would increase indefinitely), and messages are not buffered to it anymore.
When we restart the consumer, it will create a new, independent queue and as soon as we bind it to myExchange
, messages sent by the publisher will be buffered into this queue and pulled by the consumer itself.
When RabbitMQ is started for the first time, it creates some predefined exchanges. Issuing rabbitmqctl list_exchanges
we can observe many existing exchanges, in addition to the one that we have defined in this recipe:

All the amq.*
exchanges listed here are already defined by all the AMQP-compliant brokers and can be used instead of defining your own exchanges; they do not need to be declared at all.
We could have used amq.fanout
in place of myLastnews.fanout_6
, and this is a good choice for very simple applications. However, applications generally declare and use their own exchanges.
With the overload used in the recipe, the exchange is non-autodelete (won't be deleted as soon as the last client detaches it) and non-durable (won't survive server restarts). You can find more available options and overloads at http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/.
In this recipe we are going to see how to select a subset of messages to be consumed, routing them only to the AMQP queues of interest and ignoring all the others.
A typical use case is implementing a chat, where each queue represents a user.
We can find the relative example in the book examples directory at:
Chapter01/Recipe07/Java_7/src/rmqexample/direct
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
We are going to show how to implement both the producer and the consumer, and see them in action. To implement the producer, perform the following steps:
Declare a direct exchange:
channel.exchangeDeclare(exchangeName, "direct", false, false, null);
Send some messages to the exchange, using arbitrary
routingKey
values:channel.basicPublish(exchangeName, routingKey, null, jsonBook.getBytes());
To implement the consumer, perform the following steps:
Declare the same exchange, identical to what was done in step 1.
Create a temporary queue:
String myQueue = channel.queueDeclare().getQueue();
Bind the queue to the exchange using the
bindingKey
. Perform this operation as many times as needed, in case you want to use more than one binding key:channel.queueBind(myQueue,exchangeName,bindingKey);
After having created a suitable consumer object, start consuming messages as already seen in the Consuming messages recipe.
In this recipe we have published messages (step 2) tagged with an arbitrary string (the so called routing key), to a direct exchange.
As with fanout
exchanges, messages are not stored if there are no queues bound; however, in this case the consumers can choose the messages to be forwarded to these queues, depending on the binding key specified when they are bound (step 5).
Only messages with a routing key equal to the one specified in the binding will be delivered to such queues.
Tip
This filtering operation is performed by the AMQP broker, and not by the consumer; the messages with a routing key that is different from the queue binding key won't be placed in that queue at all.
However, it's possible to have more queues bound with the same binding key; in this case, the broker will place a copy of the matching messages in all of them.
It is also possible to bind many different binding keys to the same queue/exchange pair, letting all the corresponding messages be delivered.
In case we deliver a message with a given routing key to an exchange, and there are no queues bound with that specific key, the message is silently dropped.
However, the producer can detect and behave consequently when this happens, as shown in detail in the Handling unroutable messages recipe.
Direct and topic exchanges are conceptually very similar to each other. The main difference is that direct exchanges use exact matching only to select the destination of the messages, while topic exchanges allow using pattern matching with specific wildcards.
For example, the BBC is using topic routing with RabbitMQ to route new stories to all of the appropriate RSS feeds on their websites.
You can find the example for a topic exchange at:
Chapter01/Recipe08/Java_8/src/rmqexample/topic
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
Let's start with the producer:
Declare a topic exchange:
channel.exchangeDeclare(exchangeName, "topic", false, false, null);
Send some messages to the exchange, using arbitrary
routingKey
values:channel.basicPublish(exchangeName, routingKey, null, jsonBook.getBytes());
Declare the same exchange, identical to what was done in step 1.
Create a temporary queue:
String myQueue = channel.queueDeclare().getQueue();
Bind the queue to the exchange using the binding key, which in this case can contain wildcards:
channel.queueBind(myQueue,exchangeName,bindingKey);
After having created a suitable consumer object, start consuming messages as already seen in the Consuming messages recipe.
As in the previous recipe, messages sent to a topic exchange are tagged with a string (step 2), but it is important for a topic exchange to be composed more of dot-separated words; these are supposed to be the topics of the message. For example, in our code we have used:
technology.rabbitmq.ebook sport.golf.paper sport.tennis.ebook
To consume these messages the consumer has to bind myQueue
to the exchange (step 5) using the appropriate key.
Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence of dot-separated words and/or wildcards. AMQP wildcards are just:
#
: This matches zero or more words*
: This matches exactly one word
So, for example:
#.ebook
and*.*.ebook
both match the first and the third sent messagessport.#
andsport.*.*
both match the second and the third sent messages# alone
matches any message sent
In the last case the topic exchange behaves exactly like a fanout
exchange, except for the performance, which is inevitably higher when using the former.
In this example we will show how to use the explicit acknowledgment, the so-called ack
, while consuming messages.
A message is stored in a queue until one consumer gets the message and sends the ack
back to the broker.
The ack
can be either implicit or explicit. In the previous examples we have used theimplicit ack
.
In order to view this example in action, you can run the publisher from the Producing messages recipe and the consumer who gets the message, which you can find in the book archive at Chapter01/Recipe09/Java_9/
.
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
In order to guarantee that the messages have been acknowledged by the consumer after processing them, you can perform the following steps:
Declare a queue:
channel.queueDeclare(myQueue, true, false, false,null);
Bind the consumer to the queue, specifying
false
for theautoAck
parameter ofbasicConsume()
:ActualConsumer consumer = new ActualConsumer(channel); boolean autoAck = false; // n.b. channel.basicConsume(MyQueue, autoAck, consumer);
Consume a message and send the
ack
:public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException { String message = new String(body); this.getChannel().basicAck(envelope.getDeliveryTag(),false);
After we created the queue (step 1), we added the consumer to the queue and defined the ack
behavior (step 2).
The parameter autoack = false
informs the RabbitMQ client API that we are going to send explicit ack
ourselves.
After we have got a message from the queue, we must acknowledge to RabbitMQ that we have received and properly processed the message calling channel.basicAck()
(step 3). The message will be removed from the queue only when RabbitMQ receives the ack
.
Tip
If you don't send the ack
back, the consumer continues to fetch subsequent messages; however, when you disconnect the consumer, all the messages will still be in the queue. Messages are not consumed until RabbitMQ receives the corresponding ack
. Try to comment out the basicAck()
call in the example to experiment this behavior.
The method channel.basicAck()
has two parameters:
deliveryTag
multiple
The deliveryTag
parameter is a value assigned by the server to the message, which you can retrieve using delivery.getEnvelope().getDeliveryTag()
.
If multiple
is set to false
the client acknowledges only the message of the deliveryTag
parameter, otherwise the client acknowledges all the messages until this last one. This flag allows us to optimize consuming messages by sending ack
to RabbitMQ on a block of messages instead of for each one.
Tip
A message must be acknowledged only once; if you try to acknowledge the same message more than once, the method raises a precondition-failed
exception.
Calling channel.basicAck(0,true)
all the unacknowledged messages get acknowledged; the 0
stands for "all the messages".
Furthermore, calling channel.basicAck(0,false)
raises an exception.
In the next chapter we will discuss the basicReject()
method. This method is a RabbitMQ extension that allows further flexibility.
In this example we are showing how to create a dynamic load balancer, and how to distribute messages to many consumers. We are going to create a file downloader.
You can find the source at Chapter01/Recipe10/Java_10/
.
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
In order to let two or more RabbitMQ clients properly balance consuming messages, you need to follow the given steps:
Declare a named queue, and specify the
basicQos
as follows:channel.queueDeclare(myQueue, false, false, false,null); channel.basicQos(1);
Bind a consumer with explicit
ack
:channel.basicConsume(myQueue, false, consumer);
Send one or more messages using
channel.basicPublish()
.Execute two or more consumers.
The publisher sends a message with the URL to download:
String messageUrlToDownload= "http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf"; channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());
The consumer gets the message and downloads the referenced URL:
System.out.println("Url to download:" + messageURL); downloadUrl(messageURL);
Once the download is terminated, the consumer sends the ack
back to the broker and is ready to download the next one:
getChannel().basicAck(envelope.getDeliveryTag(),false); System.out.println("Ack sent!"); System.out.println("Wait for the next download...");
By default, messages are heavily prefetched. Messages are retrieved by the consumers in blocks, but are actually consumed and removed from the queue when the consumers send the ack
, as already seen in the previous recipe.
On the other hand, using many consumers as in this recipe, the first one will prefetch the messages, and the other consumers started later won't find any available in the queue. In order to equally distribute the work among the active consumers, we need to use channel.basicQos(1)
, specifying to prefetch just one message at a time.
You can find more information about load balancing in Chapter 8, Performance Tuning for RabbitMQ.
In this example we will show how an AMQP message is divided, and how to use message properties.
You can find the source at Chapter01/Recipe11/Java_11/.
To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.
In order to access the message properties you need to perform the following steps:
Declare a queue:
channel.queueDeclare(MyQueue, false, false, false,null);
Create a
BasicProperties
class:Map<String,Object>headerMap = new HashMap<String, Object>(); headerMap.put("key1", "value1"); headerMap.put("key2", new Integer(50) ); headerMap.put("key3", new Boolean(false)); headerMap.put("key4", "value4"); BasicProperties messageProperties = new BasicProperties.Builder() .timestamp(new Date()) .contentType("text/plain") .userId("guest") .appId("app id: 20") .deliveryMode(1) .priority(1) .headers(headerMap) .clusterId("cluster id: 1") .build();
Publish a message with basic properties:
channel.basicPublish("",myQueue,messageProperties,message.getBytes())
Consume a message and print the properties:
System.out.println("Property:" + properties.toString());
The AMQP message (also called content) is divided into two parts:
Content header
Content body (as we have already seen in previous examples)
In step 2 we create a content header using BasicProperties
:
Map<String,Object>headerMap = new HashMap<String, Object>(); BasicProperties messageProperties = new BasicProperties.Builder() .timestamp(new Date()) .userId("guest") .deliveryMode(1) .priority(1) .headers(headerMap) .build();
With this object we have set up the following properties:
timestamp
: This is the message time stamp.userId
: This is the broker with whom the user sends the message (by default, it is "guest"). In the next chapter we'll see the users' management.deliveryMode
: If set to 1 the message is nonpersistent, if it is 2 the message is persistent (you can see the recipe Connecting to the broker).priority
: This defines the message priority, which can be 0 to 9.headers
: AHashMap<String, Object>
header, you are free to use it to enter your custom fields.
Tip
The RabbitMQ BasicProperties
class is an AMQP content header implementation. The attribute of BasicProperties
can be built using BasicProperties.Builder()
The header is ready and we can send a message using channel.basicPublish("",myQueue, messageProperties,message.getBytes()),
where messageProperties
is the message header and message
is the message body.
In step 4 the consumer gets a message:
public void handleDelivery(String consumerTag,Envelope envelope, BasicProperties properties,byte[] body) throws java.io.IOException { System.out.println("***********message header****************"); System.out.println("Message sent at:"+ properties.getTimestamp()); System.out.println("Message sent by user:"+ properties.getUserId()); System.out.println("Message sent by App:"+properties.getAppId()); System.out.println("all properties :" + properties.toString()); System.out.println("**********message body**************"); String message = new String(body); System.out.println("Message Body:"+message); }
The parameter properties
contains the message header and body
contains its body.
Using message properties we can optimize the performance. Writing audit information or log information into the body is a typical error, because the consumer should parse the body to get them.
The body message must only contain application data (for example, a Book
class), while the message properties can host other information related to the messaging mechanics or other implementation details.
For example, if the consumer wants to log when a message has been sent you can use the timestamp
attribute, or if the consumer needs to distinguish a message according to a custom tag, you can put it in the headers HashMap
property.
The class MessageProperties
contains some pre-built BasicProperties
class for standard cases. Please check the official link at http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/MessageProperties.html
In this example we have just used some of the properties. You can get more information at http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/AMQP.BasicProperties.html.
In this example we will discuss how to use channel transactions. In the Producing messages recipe we have seen how to use a persistent message, but if the broker can't write the message to the disk, you can lose the message. With the AQMP transactions you can be sure that the message won't be lost.
You can find the source at Chapter01/Recipe12/Java_12/
.
To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.
You can use transactional messages by performing the following steps:
Create a persistent queue
channel.queueDeclare(myQueue, true, false, false, null);
Set the channel to the transactional mode using:
channel.txSelect();
Send the message to the queue and then commit the operation:
channel.basicPublish("", myQueue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.txCommit();
After creating a persistent queue (step 1), we have set the channel in the transaction mode using the method txSelect()
(step 2). Using txCommit()
the message is stored in the queue and written to the disk; the message will then be delivered to the consumer(s).
The method txSelect()
must be called at least once before txCommit()
or txRollback()
.
As in a DBMS you can use a rollback method. In the following case the message isn't stored or delivered:
channel.basicPublish("",myQueue, MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes()); channel.txRollback();
In this example we are showing how to manage unroutable messages. An unroutable message is a message without a destination. For example, a message sent to an exchange without any bound queue.
Unroutable messages are not similar to dead letter messages; the first are messages sent to an exchange without any suitable queue destination. The latter, on the other hand, reach a queue but are rejected because of an explicit consumer decision, expired TTL, or exceeded queue length limit. You can find the source at Chapter01/Recipe13/Java_13/
.
To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.
In order to handle unroutable messages, you need to perform the following steps:
First of all we need to implement the class
ReturnListener
and its interface:public class HandlingReturnListener implements ReturnListener @Override public void handleReturn…
Add the
HandlingReturnListener
class to the channelReturnListener
:channel.addReturnListener(new HandlingReturnListener());
Then create an exchange:
channel.exchangeDeclare(myExchange, "direct", false, false, null);
And finally publish a mandatory message to the exchange:
boolean isMandatory = true; channel.basicPublish(myExchange, "",isMandatory, null, message.getBytes());
When we execute the publisher, the messages sent to myExchange
won't reach any destination since it has no bound queues. However, these messages aren't, they are redirected to an internal queue. The HandlingReturnListener
class will handle such messages using handleReturn()
.
The ReturnListener
class is bound to a publisher channel, and it will trap only its own unroutable messages.
You can also find a consumer in the source code example. Try also to execute the publisher and the consumer together, and then stop the consumer.