Working with AMQP

(for more resources related to this topic, see here.)

Broadcasting messages

In this example we are seeing how to send the same message to a possibly large number of consumers.

This is a typical messaging application, broadcasting to a huge number of clients. For example, when updating the scoreboard in a massive multiplayer game, or when publishing news in a social network application.

In this article we are discussing both the producer and consumer implementation.

Since it is very typical to have consumers using different technologies and programming languages, we are using Java, Python, and Ruby to show interoperability with AMQP.

We are going to appreciate the benefits of having separated exchanges and queues in AMQP.

Getting ready

To use this article you will need to set up Java, Python and Ruby environments as described.

How to do it…

To cook this article we are preparing four different codes:

  • The Java publisher
  • The Java consumer
  • The Python consumer
  • The Ruby consumer

To prepare a Java publisher:

  1. Declare a fanout exchange:

    channel.exchangeDeclare(myExchange, "fanout");

  2. Send one message to the exchange:

    channel.basicPublish(myExchange, "", null, jsonmessage.getBytes());

Then to prepare a Java consumer:

  1. Declare the same fanout exchange declared by the producer:

    channel.exchangeDeclare(myExchange, "fanout");

  2. Autocreate a new temporary queue:

    String queueName = channel.queueDeclare().getQueue();

  3. Bind the queue to the exchange:

    channel.queueBind(queueName, myExchange, "");

  4. Define a custom, non-blocking consumer.
  5. Consume messages invoking channel.basicConsume()

The source code of the Python consumer is very similar to the Java consumer, so there is no need to repeat the needed steps.

In the Ruby consumer you need to use require "bunny" and then use the URI connection.

We are now ready to mix all together, to see the article in action:

  1. Start one instance of the Java producer; messages start getting published immediately.
  2. Start one or more instances of the Java/Python/Ruby consumer; the consumers receive only the messages sent while they are running.
  3. see that the consumer has lost the messages sent while it was down.

How it works…

Both the producer and the consumers are connected to RabbitMQ with a single connection, but the logical path of the messages is depicted in the following figure:

In step 1 we have declared the exchange that we are using. The logic is the same as in the queue declaration: if the specified exchange doesn't exist, create it; otherwise, do nothing.

The second argument of exchangeDeclare() is a string, specifying the type of the exchange, fanout in this case.

In step 2 the producer sends one message to the exchange. You can just view it along with the other defined exchanges issuing the following command on the RabbitMQ command shell:

rabbitmqctl list_exchanges

The second argument in the call to channel.basicPublish() is the routing key, which is always ignored when used with a fanout exchange. The third argument, set to null, is the optional message property. The fourth argument is just the message itself.

When we started one consumer, it created its own temporary queue (step 9). Using the channel.queueDeclare() empty overload, we are creating a nondurable, exclusive, autodelete queue with an autogenerated name.

Launching a couple of consumers and issuing rabbitmqctl list_queues, we can see two queues, one per consumer, with their odd names, along with the persistent myFirstQueue as shown in the following screenshot:

In step 5 we have bound the queues to myExchange. It is possible to monitor these bindings too, issuing the following command:

rabbitmqctl list_bindings

The monitoring is a very important aspect of AMQP; messages are routed by exchanges to the bound queues, and buffered in the queues.

Exchanges do not buffer messages; they are just logical elements.

The fanout exchange routes messages by just placing a copy of them in each bound queue. So, no bound queues and all the messages are just received by no one consumer.

As soon as we close one consumer, we implicitly destroy its private temporary queue (that's why the queues are autodelete; otherwise, these queues would be left behind unused, and the number of queues on the broker would increase indefinitely), and messages are not buffered to it anymore.

When we restart the consumer, it will create a new, independent queue and as soon as we bind it to myExchange, messages sent by the publisher will be buffered into this queue and pulled by the consumer itself.

There's more…

When RabbitMQ is started for the first time, it creates some predefined exchanges. Issuing rabbitmqctl list_exchanges we can observe many existing exchanges, in addition to the one that we have defined in this article:

All the amq.* exchanges listed here are already defined by all the AMQP-compliant brokers and can be used instead of defining your own exchanges; they do not need to be declared at all.

We could have used amq.fanout in place of myLastnews.fanout_6, and this is a good choice for very simple applications. However, applications generally declare and use their own exchanges.

See also

With the overload used in the article, the exchange is non-autodelete (won't be deleted as soon as the last client detaches it) and non-durable (won't survive server restarts). You can find more available options and overloads at


In this article, we are mainly using Java since this language is widely used in enterprise software development, integration, and distribution. RabbitMQ is a perfect fit in this environment.

resources for article:

further resources on this subject:

You've been reading and excerpt of:

RabbitMQ Cookbook

Explore Title
comments powered by Disqus