RabbitMQ Cookbook

4 (2 reviews total)
By Sigismondo Boschi , Gabriele Santomaggio
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Working with AMQP

About this book

RabbitMQ is an open source message broker software (sometimes called message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover. Messaging enables software applications to connect and scale. Applications can connect to each other as components of a larger application or to user devices and data.

RabbitMQ Cookbook touches on all the aspects of RabbitMQ messaging. You will learn how to use this enabling technology for the solution of highly scalable problems dictated by the dynamic requirements of Web and mobile architectures, based for example on cloud computing platforms. This is a practical guide with several examples that will help you to understand the usefulness and the power of RabbitMQ.

This book helps you learn the basic functionalities of RabbitMQ with simple examples which describe the use of RabbitMQ client APIs and how a RabbitMQ server works. You will find examples of RabbitMQ deployed in real-life use-cases, where its functionalities will be exploited combined with other technologies. This book helps you understand the advanced features of RabbitMQ that are useful for even the most demanding programmer. Over the course of the book, you will learn about the usage of basic AMQP functionalities and use RabbitMQ to let decoupled applications exchange messages as per enterprise integration applications. The same building blocks are used to implement the architecture of highly scalable applications like today's social networks, and they are presented in the book with some examples. You will also learn how to extend RabbitMQ functionalities by implementing Erlang plugins.

This book combines information with detailed examples coupled with screenshots and diagrams to help you create a messaging application with ease.

Publication date:
December 2013
Publisher
Packt
Pages
288
ISBN
9781849516501

 

Chapter 1. Working with AMQP

In this chapter we will cover:

  • Connecting to a broker

  • Producing messages

  • Consuming messages

  • Using body serialization with JSON

  • Using RPC with messaging

  • Broadcasting messages

  • Working with message routing using direct exchanges

  • Working with message routing using topic exchanges

  • Guaranteeing message processing

  • Distributing messages to many consumers

  • Using message properties

  • Messaging with transactions

  • Handling unroutable messages

 

Introduction


Advanced Message Queuing Protocol (AMQP) has been developed because of the need for interoperability among the many different messaging solutions, that were developed a few years ago by many different vendors such as IBM MQ-Series, TIBCO, or Microsoft MSMQ.

The AMQP 0-9-1 standard gives a complete specification of the protocol, particularly regarding:

  • The API interface

  • The wire protocol

RabbitMQ is a free and complete AMQP broker implementation. It implements version 0-9-1 of the AMQP specification; this is the most widespread version today and it is the last version that focuses on the client API. That's what we want to put the focus on, especially in this chapter.

On the other hand, AMQP 1.0 only defines the evolution of the wire-level protocol—the format of the data being passed at the application level—for the exchange of messages between two endpoints; so 0-9-1 is actually the most updated client library specification.

RabbitMQ includes:

  • The broker itself, that is, the service that will actually handle the messages that are going to be sent and received by the applications

  • The API implementations for Java, C#, and Erlang languages

It is also possible to use APIs for languages downloadable from the RabbitMQ site itself, from third-party sites, or even using AMQP APIs not strictly related to RabbitMQ (http://www.rabbitmq.com/devtools.html). Since the AMQP standard specifies the wire protocol, they are going to be mostly interoperable, except for some custom extensions. That will be discussed in detail in the next chapter.

In the course of the book we will particularly use some of the following APIs:

In this first chapter we are mainly using Java since this language is widely used in enterprise software development, integration, and distribution. RabbitMQ is a perfect fit in this environment.

In order to run the examples in this recipe, you will first need to:

  • Install Java JDK 1.6+

  • Install the Java RabbitMQ client library

  • Properly configure CLASSPATH and your preferred development environment (Eclipse, NetBeans, and so on)

  • Install the RabbitMQ server on a machine (this can be the same local machine)

The natural choice is to install it on your desktop (Windows, Linux, and Mac OS X are all fine choices), but you can also install it on one or more external servers; for example, virtual machines, physical servers, and Raspberry PI servers (http://www.raspberrypi.org/) on cloud service providers.

Tip

In this book we are not providing instructions on the installation of RabbitMQ itself. You can find detailed instructions on the RabbitMQ site.

Most of the examples will work connecting to the RabbitMQ broker running on the localhost. If you have chosen to install or use RabbitMQ from a different machine, you will need to specify its hostname as a command-line parameter of the examples themselves, for example:

java -cp ./bin  rmqexample.Publish [Rabbitmq-host]

For the examples involving Python, you will need Python 2.7+ installed and the Pika library, an AMQP implementation for Python (https://pypi.python.org/pypi/pika). The fastest way to install Pika is by using PIP (https://pypi.python.org/pypi/pip). In the command prompt, just type:

pip install pika

We will also present some recipes using .NET where the accent is mainly on interoperability.

You can download the working examples in their full form at http://www.packtpub.com/support.

The recipes presented in this chapter will tackle all the basic concepts exposed by AMQP, using RabbitMQ.

 

Connecting to the broker


Every application that uses AMQP needs to establish a connection with the AMQP broker. By default, RabbitMQ (as well as any other AMQP broker up to version 1.0) works over TCP as a reliable transport protocol on port 5672, that is, the IANA-assigned port.

We are now going to discuss how to create the connection. In all the subsequent recipes we will refer to the connection and channel as the results of the operations presented here.

Getting ready

To use this recipe we need to set up the Java development environment as mentioned in the Introduction section.

How to do it…

In order to create a Java client that connects to the RabbitMQ broker, you need to perform the following steps:

  1. Import the needed classes from the Java RabbitMQ client library in the program namespace:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
  2. Create an instance of the client ConnectionFactory:

    ConnectionFactory factory = new ConnectionFactory();
  3. Set the ConnectionFactory options:

    factory.setHost(rabbitMQhostname);
  4. Connect to the RabbitMQ broker:

    Connection connection = factory.newConnection();
  5. Create a channel from the freshly created connection:

    Channel channel = connection.createChannel();
  6. As soon as we are done with RabbitMQ, release the channel and the connection:

    channel.close();
    connection.close();

How it works…

Using the Java client API, the application must create an instance of ConnectionFactory and set the host where RabbitMQ should be running with the setHost() method.

After the Java imports (step 1), we have instantiated the factory object (step 2). In this example we have just set the hostname that we have chosen to optionally get from the command line (step 3), but you can find more information regarding connection options in the section There's more….

In step 4 we have actually established the TCP connection to the RabbitMQ broker.

Tip

In this recipe we have used the default connection parameters user: guest, password: guest, and vhost: /; we will discuss these parameters later.

However, we are not yet ready to communicate with the broker; we need to set up a communication channel (step 5). This is an advanced concept of AMQP; using this abstraction, it is possible to let many different messaging sessions use the same logical connection.

Actually, all the communication operations of the Java client library are performed by the methods of a channel instance.

If you are developing multithreaded applications, it is highly recommended to use a different channel for each thread. If many threads use the same channel, they will serialize their execution in the channel method calls, leading to possible performance degradation.

Tip

The best practice is to open a connection and share it with different threads. Each thread creates, uses, and destroys its own independent channel(s).

There's more…

It is possible to specify many different optional properties for any RabbitMQ connection. You can find them all in the online documentation at (http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc). These options are all self-explanatory, except for the AMQP virtual host.

Virtual hosts are administrative containers; they allow to configure many logically independent brokers hosts within one single RabbitMQ instance, to let many different independent applications share the same RabbitMQ server. Each virtual host can be configured with its independent set of permissions, exchanges, and queues and will work in a logically separated environment.

It's possible to specify connection options by using just a connection string, also called connection URI, with the factory.setUri() method:

ConnectionFactory factory = new ConnectionFactory();
String uri="amqp://user:[email protected]:port/vhost";
factory.setUri(uri);

Tip

The URI must conform to the syntax specified in RFC3986 (http://www.ietf.org/rfc/rfc3986.txt).

 

Producing messages


In this recipe we are learning how to send a message to an AMQP queue. We will be introduced to the building blocks of AMQP messaging: messages, queues, and exchanges.

You can find the source at Chapter01/Recipe02/src/rmqexample.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

After connecting to the broker, as seen in the previous recipe, you can start sending messages performing the following steps:

  1. Declare the queue, calling the queueDeclare() method on com.rabbitmq.client.Channel:

    String myQueue = "myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null);
  2. Send the very first message to the RabbitMQ broker:

    String message = "My message to myFirstQueue";
    channel.basicPublish("",myQueue, null, message.getBytes());
  3. Send the second message with different options:

    channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

Tip

The queue names are case sensitive: MYFIRSTQUEUE is different from myFirstQueue.

How it works…

In this first basic example we have been able to just send a message to RabbitMQ.

After the communication channel is established, the first step is to ensure that the destination queue exists. This task is accomplished declaring the queue (step 1) calling queueDeclare(). The method call does nothing if the queue already exists, otherwise it creates the queue itself.

Note

If the queue already exists but has been created with different parameters, queueDeclare() will raise an exception.

Note that this, as most of the AMQP operations, is a method of the Channel Java interface. All the operations that need interactions with the broker are carried out through channels.

Let's examine the meaning of the queueDeclare() method call in depth. Its template can be found in the Java client reference documentation located at http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/. The documentation will be as shown in the following screenshot:

In particular we have used the second overload of this method that we report here:

AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete, java.util.Map<java.lang.String,java.lang.Object> arguments) throws java.io.IOException

The meanings of the individual arguments are:

  • queue: This is just the name of the queue where we will be storing the messages.

  • durable: This specifies whether the queue will survive server restarts. Note that it is required for a queue to be declared as durable if you want persistent messages to survive a server restart.

  • exclusive: This specifies whether the queue is restricted to only this connection.

  • autoDelete: This specifies whether the queue will be automatically deleted by the RabbitMQ broker as soon as it is not in use.

  • arguments: This is an optional map of queue construction arguments.

In step 2 we have actually sent a message to the RabbitMQ broker.

The message body will never be opened by RabbitMQ. Messages are opaque entities for the AMQP broker, and you can use any serialization format you like. We often use JSON, but XML, ASN.1, standard or custom, ASCII or binary format, are all valid alternatives. The only important thing is that the client applications should know how to interpret the data.

Let's now examine in depth the basicPublish() method of the Channel interface for the overload used in our recipe:

void basicPublish(java.lang.String exchange, java.lang.String routingKey, AMQP.BasicProperties props, byte[] body) throws java.io.IOException

In our example the exchange argument has been set to the empty string "", that is, the default exchange, and the routingKey argument to the name of the queue. In this case the message is directly sent to the queue specified as routingKey. The body argument is set to the byte array of our string, that is, just the message that we sent. The props argument is set to null as a default; these are the message properties, discussed in depth in the recipe Using message properties.

For example, in step 3 we have sent an identical message, but with props set to MessageProperties.PERSISTENT_TEXT_PLAIN; in this way we have requested RabbitMQ to mark this message as a persistent message.

Both the messages have been dispatched to the RabbitMQ broker, logically queued in the myFirstQueue queue. The messages will stay buffered there until a client, (typically, a different client) gets it.

If the queue has been declared with the durable flag set to true and the message has been marked as persistent, it is stored on the disk by the broker. If one of the two conditions is missing, the message is stored in the memory. In the latter case the buffered messages won't survive a RabbitMQ restart, but the message delivery and retrieval will be much faster. However, we will dig down on this topic in Chapter 8, Performance Tuning for RabbitMQ.

There's more…

In this section we will discuss the methods to check the status of RabbitMQ and whether a queue already exists.

How to check the RabbitMQ status

In order to check the RabbitMQ status, you can use the command-line control tool rabbitmqctl. It should be in the PATH in the Linux setup. On Windows it can be found running the RabbitMQ command shell by navigating to Start Menu | All Programs | RabbitMQ Server | RabbitMQ Command Prompt (sbin dir). We can run rabbitmqctl.bat from this command prompt.

We can check the queue status with the command rabbitmqclt list_queues. In the following screenshot, we have run it just before and after we have run our example.

We can see our myfirstqueue queue listed in the preceding screenshot, followed by the number 2, which is just the number of the messages buffered into our queue.

Now we can either try to restart RabbitMQ, or reboot the machine hosting it. Restarting RabbitMQ successfully will depend on the used OS:

  • On Linux, RedHat, Centos, Fedora, Raspbian, and so on:

    service rabbitmq-server restart
    
  • On Linux, Ubuntu, Debian, and so on:

    /etc/init.d/rabbitmq restart
    
  • On Windows:

    sc stop rabbitmq / sc start rabbitmq
    

How many messages should we expect when we run rabbitmqclt list_queues again?

Checking whether a queue already exists

In order to be sure that a specific queue already exists, replace channel.queueDeclare() with channel.queueDeclarePassive(). The behavior of the two methods is the same in case the queue already exists; but in case it doesn't, the first one will silently create it and return back (that's actually the most frequently used case), the latter will raise an exception.

 

Consuming messages


In this recipe we are closing the loop; we have already seen how to send messages to RabbitMQ—or to any AMQP broker—and now we are ready to learn how to retrieve them.

You can find the source code of the recipe at Chapter01/Recipe03/src/rmqexample/nonblocking.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the introduction.

How to do it…

In order to consume the messages sent as seen in the previous recipe, perform the following steps:

  1. Declare the queue where we want to consume the messages from:

    String myQueue="myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null);
  2. Define a specialized consumer class inherited from DefaultConsumer:

    public class ActualConsumer extends DefaultConsumer {
      public ActualConsumer(Channel channel) {
        super(channel);
      }
      @Override
      public void handleDelivery(
        String consumerTag, 
        Envelope envelope, 
        BasicProperties properties, 
        byte[] body) throws java.io.IOException {
          String message = new String(body);
          System.out.println("Received: " + message);
        }
    }
  3. Create a consumer object, which is an instance of this class, bound to our channel:

    ActualConsumer consumer = new ActualConsumer(channel);
  4. Start consuming messages:

    String consumerTag = channel.basicConsume(myQueue, true, consumer);
  5. Once done, stop the consumer:

    channel.basicCancel(consumerTag);

How it works…

After we have established the connection and the channel to the AMQP broker as seen in the Connecting to the broker recipe, we need to ensure that the queue from which we are going to consume the messages exists (step 1).

In fact it is possible that the consumer is started before any producer has sent a message to the queue and the queue itself may actually not exist at all. To avoid the failure of the subsequent operations on the queue, we need to declare the queue.

Tip

By allowing both producers and consumers to declare the same queue, we are decoupling their existence; the order in which we start them is not important.

The heart of this recipe is step 2. Here we have defined our specialized consumer that overrides handleDelivery() and instantiated it in step 3. In the Java client API the consumer callbacks are defined by the com.rabbitmq.client.Consumer interface. We have extended our consumer from DefaultConsumer, which provides a no-operation implementation for all the methods declared in the Consumer interface.

In step 3, by calling channel.basicConsume(), we let the consumer threads start consuming messages. The consumers of each channel are always executed on the same thread, independent of the calling one.

Now that we have activated a consumer for myQueue, the Java client library starts getting messages from that queue on the RabbitMQ broker, and invokes handleDelivery() for each one.

Then after the channel.basicConsume() method's invocation, we just sit idle waiting for a key press in the main thread. Messages are being consumed with nonblocking semantics respecting the event-driven paradigm, typical of messaging applications.

Only after we press Enter, the execution proceeds to step 5, cancelling the consumer. At this point the consumer threads stop invoking our consumer object, and we can release the resources and exit.

There's more…

In this section we will learn more about consumer threads and the use of blockage semantics.

More on consumer threads

At connection definition time, the RabbitMQ Java API allocates a thread pool from which it will allocate consumer threads on need.

All the consumers bound to one channel will be executed by one single thread in the pool; however, it is possible that consumers from different channels are handled by the same thread. That's why it is important to avoid long-lasting operations in the consumer methods, in order to avoid blocking other consumers.

It is also possible to handle the consumer thread pool by ourselves, as we have shown in our example; however, this not obligatory at all. We have defined a thread pool, java.util.concurrent.ExecutorService, and passed it at connection time:

ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);

As we were managing it, we were also in charge of terminating it:

eService.shutdown();

However, remember that if you don't define your own ExecutorService thread pool, the Java client library will create one during connection creation time, and destroy it as soon as we destroy the corresponding connections.

Blocking semantics

It is possible to use blocking semantics too, but we strongly discourage this approach if it's not being used for simple applications and test cases; the recipe to consume messages is non-blocking.

However, you can find the source code for the blocking approach at Chapter01/Recipe03/src/rmqexample/blocking.

See also

You can find all the available methods of the consumer interface in the official Javadoc at

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Consumer.html

 

Using body serialization with JSON


In AMQP the messages are opaque entities; AMQP does not provide any standard way to encode/decode them.

However, web applications very often use JSON as an application layer format, that is, the JavaScript serialization format that has become a de-facto standard; in this way, the RabbitMQ client Java library can include some utility functions for this task.

On the other side, this is not the only protocol; any application can choose its own protocol (XML, Google Protocol Buffers, ASN.1, or proprietary).

In this example we are showing how to use the JSON protocol to encode and decode message bodies. We are using a publisher written in Java (Chapter01/Recipe04/Java_4/src/rmqexample) and a consumer in Python (Chapter01/Recipe04/Python04).

Getting ready

To use this recipe you will need to set up Java and Python environments as described in the introduction.

How to do it…

To implement a Java producer and a Python consumer, you can perform the following steps:

  1. Java: In addition to the standard import described in the recipe Connecting to the broker, we have to import:

    import
    com.rabbitmq.tools.json.JSONWriter;
  2. Java: We create a queue that is not persistent:

    String myQueue="myJSONBodyQueue_4";
      channel.queueDeclare(MyQueue, false, false, false, null);
  3. Java: We create a list for the Book class and fill it with example data:

    List<Book>newBooks = new ArrayList<Book>();
      for (inti = 1; i< 11; i++) {
        Book book = new Book();
        book.setBookID(i);
        book.setBookDescription("History VOL: " + i  );
        book.setAuthor("John Doe");
        newBooks.add(book);
      }
  4. Java: We are ready to serialize the newBooks instance with JSONwriter:

    JSONWriter rabbitmqJson = new JSONWriter();
    String jsonmessage = rabbitmqJson.write(newBooks);
  5. Java: We can finally send our jsonmessage:

    channel.basicPublish("",MyQueue,null, jsonmessage.getBytes());
  6. Python: To use the Pika library we must add the follow import:

    import pika;
    import json;

    Python has a powerful built-in library for JSON.

  7. Python: In order to create a connection to RabbitMQ, use the following code:

    connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
  8. Python: Let's declare a queue, bind as a consumer, and then register a callback:

    channel = connection.channel()
    my_queue = "myJSONBodyQueue_4"
    channel.queue_declare(queue=my_queue)
    channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) 
    channel.start_consuming()

How it works…

After we set up the environments (step 1 and step 2), we serialize the newbooks class with the method write(newbooks). The method returns a JSON String (jsonmessage) as shown in the following code snippet:

[
  {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 1",
    "bookID" : 1
  },
  {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 2",
    "bookID" : 2
  }
]

In step 4 we publish jsonmessage to the queue myJSONBodyQueue_4. Now the Python Consumer can get the message from the same queue. Let's see how to do it in Python:

  connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
  channel = connection.channel()
  queue_name = "myJSONBodyQueue_4"
  channel.queue_declare(queue=my_queue)
  ..
  channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) 
  channel.start_consuming()

As we have seen in the Java implementation, we must create a connection and then create a channel. With the method channel.queue_declare(queue=myQueue), we declare a queue that is not durable, exclusive or autodelete. In order to change the queue's attribute, it's enough to add the parameter in the queue_declare method as follows:

channel.queue_declare(queue=myQueue,durable=True)

Tip

When different AMQP clients declare the same queue, it's important that all of them specify the same durable, exclusive, and autodelete attributes. Otherwise, channel.queue_declare() will raise an exception.

With the method channel.basic_consume(), the client starts consuming messages from the given queue, invoking the callback consumer_callback()where it will receive the messages.

While the callbacks in Java were defined in the consumer interface, in Python they are just passed to basic_consume(), in spite of the more functional, less declarative, and less formal paradigm typical of Python.

The callback consumer_callback is as follows:

def consumer_callback(ch, method, properties, body):
  newBooks=json.loads(body); 
  print" Count books:",len(newBooks);
  for item in newBooks:
    print 'ID:',item['bookID'], '-Description:',item['bookDescription'],' -Author:',item['author']

The callback takes the message, deserializes it with json.loads(), and then the newBooks structure is ready to be read.

There's more…

The JSON helper tools included in the RabbitMQ client library are very simple, but in a real project you can evaluate them to use an external JSON library. For example, a powerful Java JSON library is google-gson (https://code.google.com/p/google-gson/) or jackson (http://jackson.codehaus.org/).

 

Using RPC with messaging


Remote Procedure Calls (RPC) are commonly used with client-server architectures. The client is required to perform some actions to the server, and then waits for the server reply.

The messaging paradigm tries to enforce a totally different approach with the fire-and-forget messaging style, but it is possible to use properly designed AMQP queues to perform and enhance RPC, as shown in the following figure:

Graphically it is depicted that the request queue is associated with the responder, the reply queues with the callers.

However, when we use RabbitMQ, all the involved peers (both the callers and the responders) are AMQP clients.

We are now going to describe the steps performed in the example in Chapter01/Recipe05/Java_5/src/rmqexample/rpc.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

Let's perform the following steps to implement the RPC responder:

  1. Declare the request queue where the responder will be waiting for the RPC requests:

    channel.queueDeclare(requestQueue, false, false, false, null);
  2. Define our specialized consumer RpcResponderConsumer by overriding DefaultConsumer.handleDelivery() as already seen in the Consuming messages recipe. On the reception of each RPC request, this consumer will:

    • Perform the action required in the RPC request

    • Prepare the reply message

    • Set the correlation ID in the reply properties by using the following code:

        BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
    • Publish the answer on the reply queue:

        getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
      
    • Send the ack to the RPC request:

        getChannel().basicAck(envelope.getDeliveryTag(), false);
  3. Start consuming messages, until we stop it as already seen in the Consuming messages recipe.

Now let's perform the following steps to implement the RPC caller:

  1. Declare the request queue where the responder will be waiting for the RPC requests:

    channel.queueDeclare(requestQueue, false, false, false, null);
    
  2. Create a temporary, private, autodelete reply queue:

    String replyQueue = channel.queueDeclare().getQueue();
  3. Define our specialized consumer RpcCallerConsumer, which will take care of receiving and handling RPC replies. It will:

    • Allow to specify what to do when it gets the replies (in our example, by defining AddAction())

    • Override handleDelivery():

          public void handleDelivery(String consumerTag, 
          Envelope envelope,
          AMQP.BasicProperties properties, 
          byte[] body) throws java.io.IOException {
        
          String messageIdentifier = properties.getCorrelationId();
          String action = actions.get(messageIdentifier);
          actions.remove(messageIdentifier);
      
          String response = new String(body);
          OnReply(action, response);
        }
  4. Start consuming reply messages invoking channel.basicConsume().

  5. Prepare and serialize the requests (messageRequest in our example).

  6. Initialize an arbitrary, unique message identifier (messageIdentifier).

  7. Define what to do when the consumer gets the corresponding reply, by binding the action with the messageIdentifier. In our example we do it by calling our custom method consumer.AddAction().

  8. Publish the message to requestqueue, setting its properties:

    BasicProperties props = new BasicProperties.Builder()
    .correlationId(messageIdentifier)
    .replyTo(replyQueue).build();
    channel.basicPublish("", requestQueue, props,messageRequest.getBytes());

How it works…

In this example the RPC responder takes the role of an RPC server; the responder listens on the requestQueue public queue (step 1), where the callers will place their requests.

Each caller, on the other hand, will consume the responder replies on its own private queue, created in step 5.

When the caller sends a message (step 11), it includes two properties: the name of the temporary reply queue (replyTo()) where it will be listening, and a message identifier (correlationId()), needed by the caller to identify the call when the reply comes back.

In fact, in our example we have implemented an asynchronous RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).

Coming back to the responder, the RPC logic is all in the RpcResponderConsumer. This is not different from a specialized non-blocking consumer, as we have seen in the Consuming messages recipe, except for two details:

  • The reply queue name is got by the message properties, properties.getReplyTo(). Its value has been set by the caller to its private, temporary reply queue.

  • The reply message must include in its properties the correlation ID sent in the incoming message.

Tip

The correlation ID is not used by the RPC responder; it is only used to let the caller receiving this message correlate this reply with its corresponding request.

There's more…

In this section we will discuss the use of blocking RPC and some scalability notes.

Using blocking RPC

Sometimes simplicity is more important than scalability. In this case it is possible to use the following helper classes, included in the Java RabbitMQ client library, that implement blocking RPC semantics:

com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer

The logic is identical, but there are no non-blocking consumers involved, and the handling of temporary queues and correlation IDs is transparent to the user.

You can find a working example at Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc.

Scalability notes

What happens when there are multiple callers? It mainly works as a standard RPC client/server architecture. But what if we run many responders?

In this case all the responders will take care of consuming messages from the request queue. Furthermore, the responders can be located on different hosts. We have just got load distribution for free. More on this topic is in the recipe Distributing messages to many consumers.

 

Broadcasting messages


In this example we are seeing how to send the same message to a possibly large number of consumers.

This is a typical messaging application, broadcasting to a huge number of clients. For example, when updating the scoreboard in a massive multiplayer game, or when publishing news in a social network application.

In this recipe we are discussing both the producer and consumer implementation.

Since it is very typical to have consumers using different technologies and programming languages, we are using Java, Python, and Ruby to show interoperability with AMQP.

We are going to appreciate the benefits of having separated exchanges and queues in AMQP.

You can find the source in Chapter01/Recipe06/.

Getting ready

To use this recipe you will need to set up Java, Python and Ruby environments as described in the Introduction section.

How to do it…

To cook this recipe we are preparing four different codes:

  • The Java publisher

  • The Java consumer

  • The Python consumer

  • The Ruby consumer

To prepare a Java publisher:

  1. Declare a fanout exchange:

    channel.exchangeDeclare(myExchange, "fanout");
  2. Send one message to the exchange:

    channel.basicPublish(myExchange, "", null, jsonmessage.getBytes());

Then to prepare a Java consumer:

  1. Declare the same fanout exchange declared by the producer:

    channel.exchangeDeclare(myExchange, "fanout");
  2. Autocreate a new temporary queue:

    String queueName = channel.queueDeclare().getQueue();
  3. Bind the queue to the exchange:

    channel.queueBind(queueName, myExchange, "");
  4. Define a custom, non-blocking consumer, as already seen in the Consuming messages recipe.

  5. Consume messages invoking channel.basicConsume()

The source code of the Python consumer is very similar to the Java consumer, so there is no need to repeat the needed steps. Just follow the steps of the Java consumer, looking to the source code in the archive of the recipes at:

Chapter01/Recipe06/Python_6/PyConsumer.py

In the Ruby consumer you need to use require "bunny" and then use the URI connection. Check out the source code at:

Chapter01/Recipe06/Ruby_6/RbConsumer.rb

We are now ready to mix all together, to see the recipe in action:

  1. Start one instance of the Java producer; messages start getting published immediately.

  2. Start one or more instances of the Java/Python/Ruby consumer; the consumers receive only the messages sent while they are running.

  3. Stop one of the consumers while the producer is running, and then restart it; we can see that the consumer has lost the messages sent while it was down.

How it works…

Both the producer and the consumers are connected to RabbitMQ with a single connection, but the logical path of the messages is depicted in the following figure:

In step 1 we have declared the exchange that we are using. The logic is the same as in the queue declaration: if the specified exchange doesn't exist, create it; otherwise, do nothing.

The second argument of exchangeDeclare() is a string, specifying the type of the exchange, fanout in this case.

In step 2 the producer sends one message to the exchange. You can just view it along with the other defined exchanges issuing the following command on the RabbitMQ command shell:

rabbitmqctl list_exchanges

The second argument in the call to channel.basicPublish() is the routing key, which is always ignored when used with a fanout exchange. The third argument, set to null, is the optional message property (more on this in the Using message properties recipe). The fourth argument is just the message itself.

When we started one consumer, it created its own temporary queue (step 9). Using the channel.queueDeclare() empty overload, we are creating a nondurable, exclusive, autodelete queue with an autogenerated name.

Launching a couple of consumers and issuing rabbitmqctl list_queues, we can see two queues, one per consumer, with their odd names, along with the persistent myFirstQueue used in previous recipes as shown in the following screenshot:

In step 5 we have bound the queues to myExchange. It is possible to monitor these bindings too, issuing the following command:

rabbitmqctl list_bindings

The monitoring is a very important aspect of AMQP; messages are routed by exchanges to the bound queues, and buffered in the queues.

Note

Exchanges do not buffer messages; they are just logical elements.

The fanout exchange routes messages by just placing a copy of them in each bound queue. So, no bound queues and all the messages are just received by no one consumer (see the Handling unroutable messages recipe for more details).

As soon as we close one consumer, we implicitly destroy its private temporary queue (that's why the queues are autodelete; otherwise, these queues would be left behind unused, and the number of queues on the broker would increase indefinitely), and messages are not buffered to it anymore.

When we restart the consumer, it will create a new, independent queue and as soon as we bind it to myExchange, messages sent by the publisher will be buffered into this queue and pulled by the consumer itself.

There's more…

When RabbitMQ is started for the first time, it creates some predefined exchanges. Issuing rabbitmqctl list_exchanges we can observe many existing exchanges, in addition to the one that we have defined in this recipe:

All the amq.* exchanges listed here are already defined by all the AMQP-compliant brokers and can be used instead of defining your own exchanges; they do not need to be declared at all.

We could have used amq.fanout in place of myLastnews.fanout_6, and this is a good choice for very simple applications. However, applications generally declare and use their own exchanges.

See also

With the overload used in the recipe, the exchange is non-autodelete (won't be deleted as soon as the last client detaches it) and non-durable (won't survive server restarts). You can find more available options and overloads at http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/.

 

Working with message routing using direct exchanges


In this recipe we are going to see how to select a subset of messages to be consumed, routing them only to the AMQP queues of interest and ignoring all the others.

A typical use case is implementing a chat, where each queue represents a user.

We can find the relative example in the book examples directory at:

Chapter01/Recipe07/Java_7/src/rmqexample/direct

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

We are going to show how to implement both the producer and the consumer, and see them in action. To implement the producer, perform the following steps:

  1. Declare a direct exchange:

    channel.exchangeDeclare(exchangeName, "direct", false, false, null);
  2. Send some messages to the exchange, using arbitrary routingKey values:

    channel.basicPublish(exchangeName, routingKey, null, jsonBook.getBytes());

To implement the consumer, perform the following steps:

  1. Declare the same exchange, identical to what was done in step 1.

  2. Create a temporary queue:

    String myQueue = channel.queueDeclare().getQueue();
  3. Bind the queue to the exchange using the bindingKey. Perform this operation as many times as needed, in case you want to use more than one binding key:

    channel.queueBind(myQueue,exchangeName,bindingKey);
  4. After having created a suitable consumer object, start consuming messages as already seen in the Consuming messages recipe.

How it works…

In this recipe we have published messages (step 2) tagged with an arbitrary string (the so called routing key), to a direct exchange.

As with fanout exchanges, messages are not stored if there are no queues bound; however, in this case the consumers can choose the messages to be forwarded to these queues, depending on the binding key specified when they are bound (step 5).

Only messages with a routing key equal to the one specified in the binding will be delivered to such queues.

Tip

This filtering operation is performed by the AMQP broker, and not by the consumer; the messages with a routing key that is different from the queue binding key won't be placed in that queue at all.

However, it's possible to have more queues bound with the same binding key; in this case, the broker will place a copy of the matching messages in all of them.

It is also possible to bind many different binding keys to the same queue/exchange pair, letting all the corresponding messages be delivered.

There's more…

In case we deliver a message with a given routing key to an exchange, and there are no queues bound with that specific key, the message is silently dropped.

However, the producer can detect and behave consequently when this happens, as shown in detail in the Handling unroutable messages recipe.

 

Working with message routing using topic exchanges


Direct and topic exchanges are conceptually very similar to each other. The main difference is that direct exchanges use exact matching only to select the destination of the messages, while topic exchanges allow using pattern matching with specific wildcards.

For example, the BBC is using topic routing with RabbitMQ to route new stories to all of the appropriate RSS feeds on their websites.

You can find the example for a topic exchange at:

Chapter01/Recipe08/Java_8/src/rmqexample/topic

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

Let's start with the producer:

  1. Declare a topic exchange:

    channel.exchangeDeclare(exchangeName, "topic", false, false, null);
  2. Send some messages to the exchange, using arbitrary routingKey values:

    channel.basicPublish(exchangeName, routingKey, null, jsonBook.getBytes());

Then, the consumers:

  1. Declare the same exchange, identical to what was done in step 1.

  2. Create a temporary queue:

    String myQueue = channel.queueDeclare().getQueue();
  3. Bind the queue to the exchange using the binding key, which in this case can contain wildcards:

    channel.queueBind(myQueue,exchangeName,bindingKey);
  4. After having created a suitable consumer object, start consuming messages as already seen in the Consuming messages recipe.

How it works…

As in the previous recipe, messages sent to a topic exchange are tagged with a string (step 2), but it is important for a topic exchange to be composed more of dot-separated words; these are supposed to be the topics of the message. For example, in our code we have used:

technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook

To consume these messages the consumer has to bind myQueue to the exchange (step 5) using the appropriate key.

Tip

Using the messaging jargon, the consumer has to subscribe to the topics it's interested in.

Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence of dot-separated words and/or wildcards. AMQP wildcards are just:

  • #: This matches zero or more words

  • *: This matches exactly one word

So, for example:

  • #.ebook and *.*.ebook both match the first and the third sent messages

  • sport.# and sport.*.* both match the second and the third sent messages

  • # alone matches any message sent

In the last case the topic exchange behaves exactly like a fanout exchange, except for the performance, which is inevitably higher when using the former.

There's more…

Again, if some messages cannot be delivered to any one queue, they are silently dropped.

The producer can detect and behave consequently when this happens, as shown in detail in the Handling unroutable messages recipe.

 

Guaranteeing message processing


In this example we will show how to use the explicit acknowledgment, the so-called ack, while consuming messages.

A message is stored in a queue until one consumer gets the message and sends the ack back to the broker.

The ack can be either implicit or explicit. In the previous examples we have used theimplicit ack.

In order to view this example in action, you can run the publisher from the Producing messages recipe and the consumer who gets the message, which you can find in the book archive at Chapter01/Recipe09/Java_9/.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

In order to guarantee that the messages have been acknowledged by the consumer after processing them, you can perform the following steps:

  1. Declare a queue:

    channel.queueDeclare(myQueue, true, false, false,null);
  2. Bind the consumer to the queue, specifying false for the autoAck parameter of basicConsume():

    ActualConsumer consumer = new ActualConsumer(channel);
    boolean autoAck = false; // n.b.
    channel.basicConsume(MyQueue, autoAck, consumer);
  3. Consume a message and send the ack:

    public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
    
    String message = new String(body);
    this.getChannel().basicAck(envelope.getDeliveryTag(),false);

How it works…

After we created the queue (step 1), we added the consumer to the queue and defined the ack behavior (step 2).

The parameter autoack = false informs the RabbitMQ client API that we are going to send explicit ack ourselves.

After we have got a message from the queue, we must acknowledge to RabbitMQ that we have received and properly processed the message calling channel.basicAck()(step 3). The message will be removed from the queue only when RabbitMQ receives the ack.

Tip

If you don't send the ack back, the consumer continues to fetch subsequent messages; however, when you disconnect the consumer, all the messages will still be in the queue. Messages are not consumed until RabbitMQ receives the corresponding ack. Try to comment out the basicAck() call in the example to experiment this behavior.

The method channel.basicAck() has two parameters:

  • deliveryTag

  • multiple

The deliveryTag parameter is a value assigned by the server to the message, which you can retrieve using delivery.getEnvelope().getDeliveryTag().

If multiple is set to false the client acknowledges only the message of the deliveryTag parameter, otherwise the client acknowledges all the messages until this last one. This flag allows us to optimize consuming messages by sending ack to RabbitMQ on a block of messages instead of for each one.

Tip

A message must be acknowledged only once; if you try to acknowledge the same message more than once, the method raises a precondition-failed exception.

Calling channel.basicAck(0,true) all the unacknowledged messages get acknowledged; the 0 stands for "all the messages".

Furthermore, calling channel.basicAck(0,false) raises an exception.

There's more…

In the next chapter we will discuss the basicReject() method. This method is a RabbitMQ extension that allows further flexibility.

See also

The Distributing messages to many consumers recipe is a real example that explains better explicit ack use.

 

Distributing messages to many consumers


In this example we are showing how to create a dynamic load balancer, and how to distribute messages to many consumers. We are going to create a file downloader.

You can find the source at Chapter01/Recipe10/Java_10/.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

In order to let two or more RabbitMQ clients properly balance consuming messages, you need to follow the given steps:

  1. Declare a named queue, and specify the basicQos as follows:

    channel.queueDeclare(myQueue, false, false, false,null);
    channel.basicQos(1);
  2. Bind a consumer with explicit ack:

    channel.basicConsume(myQueue, false, consumer); 
  3. Send one or more messages using channel.basicPublish().

  4. Execute two or more consumers.

How it works...

The publisher sends a message with the URL to download:

String messageUrlToDownload=  "http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf";
channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());

The consumer gets the message and downloads the referenced URL:

System.out.println("Url to download:" + messageURL);
downloadUrl(messageURL);

Once the download is terminated, the consumer sends the ack back to the broker and is ready to download the next one:

getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("Ack sent!");
System.out.println("Wait for the next download...");

By default, messages are heavily prefetched. Messages are retrieved by the consumers in blocks, but are actually consumed and removed from the queue when the consumers send the ack, as already seen in the previous recipe.

On the other hand, using many consumers as in this recipe, the first one will prefetch the messages, and the other consumers started later won't find any available in the queue. In order to equally distribute the work among the active consumers, we need to use channel.basicQos(1), specifying to prefetch just one message at a time.

See also…

You can find more information about load balancing in Chapter 8, Performance Tuning for RabbitMQ.

 

Using message properties


In this example we will show how an AMQP message is divided, and how to use message properties.

You can find the source at Chapter01/Recipe11/Java_11/.

Getting ready

To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.

How to do it…

In order to access the message properties you need to perform the following steps:

  1. Declare a queue:

    channel.queueDeclare(MyQueue, false, false, false,null);
  2. Create a BasicProperties class:

    Map<String,Object>headerMap = new HashMap<String,   Object>();
    headerMap.put("key1", "value1");
    headerMap.put("key2", new Integer(50) );
    headerMap.put("key3", new Boolean(false));
    headerMap.put("key4", "value4");
    
    BasicProperties messageProperties = new BasicProperties.Builder()
    .timestamp(new Date())
    .contentType("text/plain")
    .userId("guest")
    .appId("app id: 20")
    .deliveryMode(1)
    .priority(1)
    .headers(headerMap)
    .clusterId("cluster id: 1")
    .build();
  3. Publish a message with basic properties:

    channel.basicPublish("",myQueue,messageProperties,message.getBytes())
  4. Consume a message and print the properties:

    System.out.println("Property:" + properties.toString());

How it works…

The AMQP message (also called content) is divided into two parts:

  • Content header

  • Content body (as we have already seen in previous examples)

In step 2 we create a content header using BasicProperties:

Map<String,Object>headerMap = new HashMap<String, Object>();
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest")  
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.build();

With this object we have set up the following properties:

  • timestamp: This is the message time stamp.

  • userId: This is the broker with whom the user sends the message (by default, it is "guest"). In the next chapter we'll see the users' management.

  • deliveryMode: If set to 1 the message is nonpersistent, if it is 2 the message is persistent (you can see the recipe Connecting to the broker).

  • priority: This defines the message priority, which can be 0 to 9.

  • headers: A HashMap<String, Object> header, you are free to use it to enter your custom fields.

Tip

The RabbitMQ BasicProperties class is an AMQP content header implementation. The attribute of BasicProperties can be built using BasicProperties.Builder()

The header is ready and we can send a message using channel.basicPublish("",myQueue, messageProperties,message.getBytes()), where messageProperties is the message header and message is the message body.

In step 4 the consumer gets a message:

public void handleDelivery(String consumerTag,Envelope envelope, 
BasicProperties properties,byte[] body) throws java.io.IOException {
System.out.println("***********message header****************");
System.out.println("Message sent at:"+ properties.getTimestamp());
System.out.println("Message sent by user:"+ properties.getUserId());
System.out.println("Message sent by App:"+properties.getAppId());
System.out.println("all properties :" + properties.toString());
System.out.println("**********message body**************");
String message = new String(body);
System.out.println("Message Body:"+message);
}

The parameter properties contains the message header and body contains its body.

There's more…

Using message properties we can optimize the performance. Writing audit information or log information into the body is a typical error, because the consumer should parse the body to get them.

The body message must only contain application data (for example, a Book class), while the message properties can host other information related to the messaging mechanics or other implementation details.

For example, if the consumer wants to log when a message has been sent you can use the timestamp attribute, or if the consumer needs to distinguish a message according to a custom tag, you can put it in the headers HashMap property.

See also

The class MessageProperties contains some pre-built BasicProperties class for standard cases. Please check the official link at http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/MessageProperties.html

In this example we have just used some of the properties. You can get more information at http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/AMQP.BasicProperties.html.

 

Messaging with transactions


In this example we will discuss how to use channel transactions. In the Producing messages recipe we have seen how to use a persistent message, but if the broker can't write the message to the disk, you can lose the message. With the AQMP transactions you can be sure that the message won't be lost.

You can find the source at Chapter01/Recipe12/Java_12/.

Getting ready

To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.

How to do it…

You can use transactional messages by performing the following steps:

  1. Create a persistent queue

    channel.queueDeclare(myQueue, true, false, false, null);
  2. Set the channel to the transactional mode using:

    channel.txSelect();
  3. Send the message to the queue and then commit the operation:

    channel.basicPublish("", myQueue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 
    channel.txCommit();

How it works…

After creating a persistent queue (step 1), we have set the channel in the transaction mode using the method txSelect() (step 2). Using txCommit() the message is stored in the queue and written to the disk; the message will then be delivered to the consumer(s).

The method txSelect()must be called at least once before txCommit() or txRollback().

As in a DBMS you can use a rollback method. In the following case the message isn't stored or delivered:

channel.basicPublish("",myQueue, MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
channel.txRollback(); 

There's more…

The transactions can reduce the application's performance, because the broker doesn't cache the messages and the tx operations are synchronous.

See also

In the next chapter we will discuss the publish confirm plugin, which is a faster way to get the confirmation for the operations.

 

Handling unroutable messages


In this example we are showing how to manage unroutable messages. An unroutable message is a message without a destination. For example, a message sent to an exchange without any bound queue.

Unroutable messages are not similar to dead letter messages; the first are messages sent to an exchange without any suitable queue destination. The latter, on the other hand, reach a queue but are rejected because of an explicit consumer decision, expired TTL, or exceeded queue length limit. You can find the source at Chapter01/Recipe13/Java_13/.

Getting ready

To use this recipe you will need to set up the Java development environment as indicated in the Introduction section.

How to do it…

In order to handle unroutable messages, you need to perform the following steps:

  1. First of all we need to implement the class ReturnListener and its interface:

    public class HandlingReturnListener implements ReturnListener
    @Override
      public void handleReturn…
  2. Add the HandlingReturnListener class to the channel ReturnListener:

    channel.addReturnListener(new HandlingReturnListener());
  3. Then create an exchange:

    channel.exchangeDeclare(myExchange, "direct", false, false, null);
  4. And finally publish a mandatory message to the exchange:

    boolean isMandatory = true; 
    channel.basicPublish(myExchange, "",isMandatory, null, message.getBytes()); 

How it works…

When we execute the publisher, the messages sent to myExchange won't reach any destination since it has no bound queues. However, these messages aren't, they are redirected to an internal queue. The HandlingReturnListener class will handle such messages using handleReturn().

The ReturnListener class is bound to a publisher channel, and it will trap only its own unroutable messages.

You can also find a consumer in the source code example. Try also to execute the publisher and the consumer together, and then stop the consumer.

There's more…

If you don't set the channel ReturnListener, the unroutable messages are silently dropped by the broker. In case you want to be notified about the unroutable messages, it's important to set the mandatory flag to true; if false, the unroutable messages are dropped as well.

About the Authors

  • Sigismondo Boschi

    Sigismondo Boschi is a software developer currently involved in projects of messaging and networking distributed applications.

    Prior to this, he has had more than 10 years' experience working with distributed applications and message-passing paradigms. He first acquired a PhD in Computational Physical Chemistry from the University of Bologna, and then has worked in the Development of Scientific High Performance Computing Projects.

    Browse publications by this author
  • Gabriele Santomaggio

    Gabriele Santomaggio has worked in the IT industry for more than 15 years. He is a developer, and very keen on middleware and distributed applications. Currently, he is working on high-performance Java applications.

    He is the member of a big IT Italian community (www.indigenidigitali.com/) where he has published some posts about Amazon Web Services and message-oriented middleware (http://blog.indigenidigitali.com/tag/gabriele-santomaggio/).

    He likes running and listening to jazz music.

    Browse publications by this author

Latest Reviews

(2 reviews total)
contained a few recipes I was looking for
Good
Book Title
Access this book, plus 7,500 other titles for FREE
Access now