RabbitMQ Essentials - Second Edition

By Lovisa Johansson , David Dossot
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Creating a Taxi Application

About this book

RabbitMQ is an open source message queuing software that acts as a message broker using the Advanced Message Queuing Protocol (AMQP). This book will help you to get to grips with RabbitMQ to build your own applications with a message queue architecture. You’ll learn from the experts from CloudAMQP as they share what they've learned while managing the largest fleet of RabbitMQ clusters in the world.

Following the case study of Complete Car, you’ll discover how you can use RabbitMQ to provide exceptional customer service and user experience, and see how a message queue architecture makes it easy to upgrade the app and add features as the company grows. From implementing simple synchronous operations through to advanced message routing and tracking, you’ll explore how RabbitMQ streamlines scalable operations for fast distribution. This book will help you understand the advantages of message queue architecture, including application scalability, resource efficiency, and user reliability. Finally, you’ll learn best practices for working with RabbitMQ and be able to use this book as a reference guide for your future app development projects.

By the end of this book, you’ll have learned how to use message queuing software to streamline the development of your distributed and scalable applications.

Publication date:
August 2020
Publisher
Packt
Pages
154
ISBN
9781789131666

 
Creating a Taxi Application

In everyday conversation, people greet each other, exchange banter, and then eventually end the conversation and continue on their way. Low-level TCP connections function in the same way over lightweight channels in RabbitMQ. Applications that are going to exchange messages over RabbitMQ need to establish a permanent connection to the message broker. When this connection is established, a channel needs to be created so that message-oriented interactions, such as publishing and consuming messages, can be performed.

After demonstrating these fundamentals, this chapter will cover how a broker uses exchanges to determine where each message should be delivered. An exchange is like a mailman: it delivers messages to their proper queues (mailboxes) for consumers to find at a later time.

The basic RabbitMQ concepts are shown in the following diagram:

Fig 2.1: Basic RabbitMQ concepts

By the end of this chapter, you will have a solid understanding of the application architecture behind the Complete Car (CC) platform and how they sent the first message through RabbitMQ. This requires an introduction to two different types of exchanges: direct exchange, which delivers messages to a single queue, and topic exchange, which delivers messages to multiple queues based on pattern-matching routing keys.

To get the best start possible, following topics are covered:

  • The application architecture behind CC
  • Establishing a connection to RabbitMQ
  • Sending the first messages
  • Adding topic messages

Let's get started!

 

Technical requirements

 

The application architecture behind CC

CC needs one application that is used by the taxi drivers and one that is used by the customer. The customer should be able to request a taxi via the application, and the taxi driver should be able to accept a request (the ride):

Fig 2.2: The customer requests a taxi via the CC application

The customer should be able to enter information about the starting point and the endpoint of the trip. Active drivers receive the requests and are able to accept them. The customer should, in the end, be able to follow the location of the taxi during the trip.

The following diagram shows the messaging architecture that CC wants to achieve:

Fig 2.3: CC's main application architecture

This flow can be explained in 10 steps, as highlighted in the preceding diagram:

  1. A customer uses CC's mobile application to book a taxi. A request is now sent from the mobile application to the Application Service. This request includes information about the trip that the customer wants to book.
  2. The Application Service stores the request in a database.
  3. The Application Service adds a message with information about the trip to a queue in RabbitMQ.
  4. Connected taxi cars subscribe to the message (the booking request).
  5. A taxi responds to the customer by sending a message back to RabbitMQ.
  6. The Application Service subscribes to the messages.
  7. Again, the Application Service stores the information in a database.
  8. The Application Service forwards the information to the customer.
  9. The taxi app starts to automatically send the taxi's geographical location at a given interval to RabbitMQ.
  10. The location of the taxi is then passed straight to the customer's mobile application, via WebSockets, so that they know when the taxi arrives.

Let's begin by taking a closer look at steps 1, 2, 3, and 4, as shown in the preceding diagram, where a customer requests a taxi (a message is published to RabbitMQ) and a taxi driver receives the request (a message is consumed from RabbitMQ).

 

Establishing a solid connection to RabbitMQ

As mentioned in Chapter 1, A Rabbit Springs to Life, a physical network connection must be established between the application servers and RabbitMQ. An Advanced Message Queuing Protocol (AMQP) connection is a link between the client and the broker that performs underlying networking tasks, including initial authentication, IP resolution, and networking:

Fig 2.4: AMQP connection between the application and RabbitMQ

Each AMQP connection maintains a set of underlying channels. A channel reuses a connection, forgoing the need to reauthorize and open a new TCP stream, making it more resource-efficient.

The following diagram illustrates a channel within a connection between an application and RabbitMQ:

Fig 2.5: Channels allow you to use resources more efficiently

Unlike creating channels, creating connections is a costly operation, very much like it is with database connections. Typically, database connections are pooled, where each instance of the pool is used by a single execution thread. AMQP is different in the sense that a single connection can be used by many threads through many multiplexed channels.

The handshake process for an AMQP connection requires at least seven TCP packets, and even more when using TLS. Channels can be opened and closed more frequently if needed:

  • AMQP connections: 7 TCP packages
  • AMQP channel: 2 TCP packages
  • AMQP publish: 1 TCP package (more for larger messages)
  • AMQP close channel: 2 TCP packages
  • AMQP close connection: 2 TCP packages
  • Total 14-19 packages (plus Acks)

The following diagram illustrates an overview of the information that's sent to connections and channels:

Fig 2.6: The handshake process for an AMQP connection

Establishing a single long-lived connection between the Application Service and RabbitMQ is a good start.

A decision must be made regarding which programming language and client library to use. The first few examples in this book are written in Ruby, and the client library Bunny (https://github.com/ruby-amqp/bunny) is used to publish and consume messages. Ruby is an easy language to read and understand, even if it is unfamiliar to you.

The application must be configured to use a certain connection endpoint, often referred to as a connection string; for example, a host and port. The connection string contains the information needed to be able to establish a connection. AMQP's assigned port number is 5672. TLS/SSL-encrypted AMQP can be used via AMQPS; it's a secure version of the AMQP protocol that's assigned port 5671.

The library is the element that opens the TCP connection to the target IP address and port. The connection parameters have been added as a URI string to an environment variable to the code called RABBITMQ_URI. There is no URI standard for AMQP URIs, but this format is widely used:

 RABBITMQ_URI="amqp://user:[email protected]/vhost"

According to the Ruby (Bunny) documentation, connecting to RabbitMQ is simple. The code for this is divided into code blocks, and can be found later in this chapter:

  1. Add the username, the password, and the vhost that were set up in Chapter 1, A Rabbit Springs to Life, and then add the string to an environment variable on the machine:
RABBITMQ_URI="amqp://cc-dev:[email protected]/cc-dev-vhost"
  1. Require the bunny client library:
# Require client library
require "bunny"
  1. Read the connection URI from the environment variable and start a connection:
connection = Bunny.new ENV['RABBITMQ_URI']
# Start a session with RabbitMQ
connection.start

This seems straightforward so far, but CC requires production-grade code that can gracefully handle failures. What if RabbitMQ is not running? Clearly, it is bad if the whole application is down. What if RabbitMQ needs to be restarted? CC wants its application to recover gracefully if any issues occur. In fact, CC wants its application to keep functioning, regardless of whether the whole messaging subsystem is working or not. The user experience must be smooth and easy to understand, as well as reliable.

In summary, the behavior CC wishes to achieve is as follows:

  • If the connection to RabbitMQ is lost, it should reconnect by itself.
  • If the connection is down, sending or fetching messages should fail gracefully.

When the application connects to the broker, it needs to handle connection failures. No network is reliable all the time and misconfigurations and mistakes happen; the broker might be down, and so on. While not automatic, in this case, error detection should happen early in the process.

To handle TCP connection failures in Bunny, it is necessary to catch the exception:

begin
connection = Bunny.new ENV['RABBITMQ_URI']
connection.start
rescue Bunny::TCPConnectionFailed => e
puts "Connection to server failed"
end

Detecting network connection failures is nearly useless if an application cannot recover from them. Recovery is an important part of error handling.

Some client libraries offer automatic connection recovery features that include consumer recovery. Any operation that's attempted on a closed channel will fail with an exception. If Bunny detects a TCP connection failure, it will try to reconnect every 5 seconds with no limit regarding the number of reconnection attempts. It is possible to disable automatic connection recovery by adding automatic_recovery => false to Bunny.new. This setting should only be used if you're reconnecting in some other way, or when testing the connection string.

Messages can be sent across languages, platforms, and operating systems. You can choose from a number of different client libraries for different languages. There are lots of client libraries out there, but here are some that are recommended:

  • Python: Pika
  • Node.js: amqplib
  • PHP: php-amqplib
  • Java: amqp-client
  • Clojure: Langohr

This section has shown how CC manages to establish a connection to RabbitMQ. We demonstrated why a long-lived connection is recommended and how to handle some common errors. Now, it's time to create a channel inside the connection.

Working with channels

Every AMQP protocol-related operation occurs over a channel. The channel instances are created by the connection instance. As described, a channel is a virtual (AMQP) connection inside the (TCP) connection. All operations performed by a client happen on a channel, queues are declared on channels, and messages are sent over channels.

A channel never exists on its own; it's always in the context of a connection:

# Declare a channel
channel = connection.create_channel

Channels in a connection are closed once the connection is closed or when a channel error occurs. Client libraries allow us to observe and react to channel exceptions.

More exceptions are usually thrown at a channel level than at the connection level. Channel-level exceptions often indicate errors the application can recover from, such as, when it has no permissions, or when attempting to consume from a deleted queue. Any attempted operation on a closed channel will also fail with an exception.

Even though channel instances are technically thread-safe, it is strongly recommended to avoid having several threads that are using the same channel concurrently.

CC is now able to connect to a RabbitMQ broker, open a channel, and issue a series of commands, all in a thread-safe and exception-safe manner. It's now time to build on this foundation!

Building the taxi request tool

Now, it's time to build the message flow.

First, the customer will send a simple HTTP request from the mobile application to the Application Service. This message will contain meta-information such as a timestamp, sender and receiver IDs, and the destination and requested taxi ID.

The message flow will look something like this:

Fig 2.7: The frontend/backend interactions of CC's main application

The Application Service stores the information in a database so that all the data becomes visible for the data analysis scripts in a later state.

How the data is stored in the database is not handled in these examples since that's not the main case being followed in this chapter. The easiest method would be to allow the Application Service to add the information to the database. Another option is to offload the Application Service and put new messages into a message queue between the database and the Application Service and let another service subscribe to those messages and handle them; that is, store them in the database.

The flow between the mobile device, the Application Service, and RabbitMQ is illustrated in the following diagram:

Fig 2.8: The flow between the mobile device, the Application Service, and RabbitMQ

Regarding our main flow, the discussion about AMQP in Chapter 1, A Rabbit Springs to Life, detailed how messages are published to exchanges after being routed to queues to be consumed.

A routing strategy determines which queue (or queues) the message will be routed to. The routing strategy bases its decision on a routing key (a free-form string) and potentially on message meta-information. Think of the routing key as an address that the exchange uses to decide how the message should be routed. It also needs to be a binding between an exchange and the queue to enable a message to flow from the former to the latter.

Now, let's explore the direct exchange.

The direct exchange

A direct exchange delivers messages to queues based on a message routing key. A message goes to the queue(s) whose bindings routine key matches the routing key of the message.

CC only has two cars, so it starts out with a simple communication system where one customer can request a taxi from one driver. In this case, one message needs to be routed to the queue acting as the inbox of that driver. Therefore, the exchange-routing strategy that will be used is a direct one, matching the destination queue name with the routing key used when the message is produced, as illustrated in the following diagram:

Fig 2.9: The direct exchange route messages to specific queues

An example use case of direct exchange could be as follows:

  1. The customer orders the taxi named taxi.1. An HTTP request is sent from the customer's mobile application to the Application Service.
  2. The Application Service sends a message to RabbitMQ with a routing key, taxi.1. The message routing key matches the name of the queue, so the message ends up in the taxi.1 queue.

The following diagram demonstrates how the direct exchange message routing would happen:

Fig 2.10: The direct exchange routing messages to specific queues based on the routing key

This may not be the most efficient approach to scale. In fact, it will be reviewed as soon as CC has more cars, but it's the easiest way to get started and launch the application fast.

Let's follow the first code CC creates as the initial application and learn about the different concepts at the same time. The code at the beginning of the code block has been taken from the connection and channel section:

  1. Require the bunny client library.
  2. Read the URI connection from the environment variable and start a connection.
  3. Start a communication session with RabbitMQ.
  4. Declare the taxi.1 queue.
  5. Declare the taxi.1 direct exchange.
  6. Bind the taxi.1 queue to the taxi-direct exchange with the taxi.1 routing key:
# 1. Require client library
require "bunny"

# 2. Read RABBITMQ_URI from ENV
connection = Bunny.new ENV["'RABBITMQ_URI"]

# 3. Start a communication session with RabbitMQ
connection.start
channel = connection.create_channel

def on_start(channel)
# 4. Declare a queue for a given taxi
queue = channel.queue("taxi.1", durable: true)
# 5. Declare a direct exchange, taxi-direct
exchange = channel.direct("taxi-direct", durable: true, auto_delete: true)

# 6. Bind the queue to the exchange
queue.bind(exchange, routing_key: "taxi.1")

# 7. Return the exchange
exchange
end

exchange = on_start(channel)

It's a bit of an overkill and unnecessary to declare queues and exchanges for every message that's sent, so it's highly recommended to create a method that handles the setup of the application. This should be a method that creates the connection and declares queues, exchanges, and so on. The method in this example is simply called on_start, which declares the queue and binds an exchange to the queue.

If the exchange doesn't exist when something is published to it, it will raise exceptions. If the exchange already exists, it will do nothing; otherwise, it will actually create one. This is why it's safe to declare queues every time the application starts or before publishing a message.

Channels are killed by exceptions. In CC's case, sending to a nonexistent exchange would not only raise an exception, but it would also terminate the channel where the error occurred. Any subsequent code that tries to use the terminated channel will fail as well.

In addition to using the direct type, CC has configured the durable type, autoDelete, and the argument properties of the exchange. This exchange should not go away after a restart of RabbitMQ, nor when it's unused, which explains the values used in the configuration.

An exchange declaration is only idempotent if the exchange properties are the same. Trying to declare an already-existing exchange with different properties will fail. Always use consistent properties in an exchange declaration. If you're making a change to the properties, delete the exchange before declaring it with the new properties. The same rule applies to a queue declaration.

After creating the exchange, the taxi queue is created and bound to it.

The queue is declared with a similar approach to an exchange, but with slightly different properties, as follows:

  • durable: True the queue must stay declared, even after a broker restart.
  • autoDelete: False keep the queue, even if it's not being consumed anymore.
  • exclusive: False this queue should be able to be consumed by other connections (several application servers can be connected to RabbitMQ and accessed from different connections).
  • arguments: Null no need to custom configure the queue.

The queue is bound to the exchange using its own name as the routing key so that the direct routing strategy can route messages to it. When this is done, publishing messages to the taxi-direct exchange will actually deliver messages to the taxi queue whose name matches the published routing key.

If no queue is bound to an exchange, or if the routing strategy can't find a matching destination queue, the message that was published to the exchange will be discarded silently. As an option, it is possible to be notified when unroutable messages are discarded, as shown in subsequent chapters.

Again, when the same properties are used, these operations are idempotent, so the queue can safely be declared and bound to the exchange, again and again

Although direct exchange has been covered in this chapter, AMQP 0-9-1 brokers provide four different types of exchanges. Depending on the binding setups you have between queues and parameters, these exchanges route messages differently. The upcoming chapters look closer at the other types of exchanges. For now, here is a short explanation of each:

  • Fanout: Messages are routed to all queues bound to the fanout exchange.
  • Topic: Wildcards must form a match between the routing key and the binding's specific routing pattern.
  • Headers: Use the message header attributes for routing.

Now, it's time to send our first message to RabbitMQ!

 

Sending the first messages

The basic concept and initial setup has already been covered, so let's jump in and send the first messages!

First, let's take a look at the order_taxi method, which is in charge of sending messages for the initial car request:

def order_taxi(taxi, exchange)
payload = "example-message"
message_id = rand
exchange.publish(payload,
routing_key: taxi,
content_type: "application/json",
content_encoding: "UTF-8",
persistent: true,
message_id: message_id)
end

exchange = on_start(channel)
order_taxi("taxi.1", exchange)

order_taxi is going to be called every time a user wants to order a taxi. There is no guarantee that the addressee has ever logged into the system, so as far as the sender is concerned, it's impossible to be sure the destination queue exists. The safest path is to declare the queue on every message sent, bearing in mind that this declare operation is idempotent, so it will not do anything if the queue already exists. This may seem strange at first, but it's the sender's responsibility to ensure the addressee's queue exists if they want to be sure the message will not get lost.

This is a common pattern with AMQP when there is no strong happens-before relationship between events. Re-declaration is the way to go. Conversely, the check-then-act pattern is discouraged; trying to check the pre-existence of an exchange or a queue does not guarantee success in the typical distributed environment where AMQP is used.

The method for publishing a message is very simple; call publish toward the exchange. Then, use the queue name as the routing key (as per the direct routing) and an array of bytes that represent the actual message payload. It's possible to add some optional message properties, which could include the following:

  • content_type (string): A message is published and consumed as a byte array, but nothing really says what these bytes represent. In the current situation, both publishers and consumers are in the same system, so it could be assumed that the content type is expected. That being said, always specify the content type so that messages are self-contained; whichever system ends up receiving or introspecting a message will know for sure what the byte array it contains represents.
  • content_encoding (string): A specific encoding (UTF-8) is used when serializing string messages into byte arrays so that they can be published. Again, in order for the messages to be self-explicit, provide all the necessary meta-information to allow them to be read.
  • message_id (string): As demonstrated later in this book, message identifiers are an important aspect of traceability in messaging and distributed applications. In the example is a random message id generated.
  • persistent (boolean): Specifies if the message should be persisted to disk or not.
Do not confuse exchange and queue durability with message persistence; non-persistent messages stored in a durable queue will be gone after a broker restart, leaving you with an empty queue.

Additionally, persistent messages in a non-persistent queue will be gone after a broker restart, also leaving you with an empty queue.

Ensure that messages are not lost by declaring a queue as durable and setting the message delivery mode to persistent.

But what would happen if sending the message fails, such as when the connection with RabbitMQ is broken?

Why would you ever use a non-persistent delivery mode? Isn't the whole point of a message broker such as RabbitMQ to guarantee that messages aren't lost? This is true, but there are circumstances where this guarantee can be relaxed. Consider a scenario where a publisher bombards the broker with a lot of non-critical messages. Using a non-persistent delivery here would mean that RabbitMQ wouldn't need to constantly access the disk, thus providing better performance in this case.

Before going any further, let's take a look at the structure of an AMQP message.

AMQP message structure

The following screenshot illustrates the structure of an AMQP message and includes the four AMQP message properties just used, plus a few new ones. Note that this diagram uses the specification name of the fields and that each language implementation renames them slightly so that they can be valid names. For example, content-type becomes contentType in Java, and content_type in Ruby:

Fig 2.11: Properties of an AMQP message

Except for reserved, all these properties are free to use and, unless otherwise specified, are ignored by the AMQP broker. In the case of RabbitMQ, the only field that is supported by the broker is the user-id field, which is validated to ensure it matches the name of the broker user that established the connection. Notice how the headers property allows us to add extra key-value pairs in case none of the standard properties fit the requirements.

The next section explains how messages are consumed.

Consuming messages

Now, let's turn our attention to the method in charge of retrieving messages, which is step 4 in the main architecture of CC, which can be found in the The application architecture behind CC section.

Here, the taxi application can check the queue for new messages at a regular interval. This is a so-called synchronous approach. This would mean holding the application thread in charge of dealing with the poll requests until all pending messages have been removed from the queue, as illustrated in the following diagram:

Fig 2.12: A client asking for new messages in the broker

A frontend regularly polling the backend for messages would soon start to take its toll in terms of load, meaning that the overall solution would begin to suffer from performance degradation.

Instead, CC wisely decides to build the solution in favor of a server-push approach. The idea is to server-push messages to the clients from the broker. Luckily, RabbitMQ offers two ways to receive messages: there's the polling-based basic.get method and the push-based basic.consume method. As illustrated in the following diagram, messages are pushed to the consumer:

Fig 2.13: Consumer subscribing messages from the broker

The subscribe method adds a consumer to the queue, which then subscribes to receive message deliveries.

Make sure that the consumer consumes messages from the queue instead of using basic GET actions. The basic.get command is comparatively expensive when it comes to resources.

With subscribe, the messages are delivered to the client from the broker when new messages are ready and the client has availability. This allows, in general, the smooth processing of messages. Additionally, using subscribe means that a consumer is connected as long as the channel it was declared on is available or until the client cancels it.

The message process is running smoothly and effortlessly, almost as if nothing is happening! That is, of course, until alerts are set in motion to acknowledge and/or negative acknowledge whether a part of the process has run as expected, or not as planned.

Acknowledgment and negative acknowledgment

RabbitMQ needs to know when a message can be considered successful in terms of being sent to the consumer as expected. The broker should then delete messages from the queue once the broker receives the response; otherwise, the queue would overflow. The client can reply to the broker by either acking (acknowledge) the message when it receives it or when the consumer has completely processed the message. In either situation, once the message has been ack:ed, it's removed from the queue.

Therefore, it's up to the consumer to acknowledge a message if and only if it is done with processing, or if it is certain that there is no risk of losing the message if it is processed asynchronously.

To avoid a situation where a message could be forever lost (for example, worker crashed, exceptions, and so on), the consuming application should not acknowledge a message until it is completely finished with it.

A message is rejected by an application when the application indicates to the broker that processing has failed or cannot be accomplished at the time. Nack, or negative-acknowledge, tells RabbitMQ that a message was not handled as instructed. Nack'ed messages, by default, are sent back to the queue for another try.

Acknowledges will be covered in detail in Chapter 3, Sending Messages to Multiple Taxi Drivers.

Ready? Set? Time to RUN, Rabbit!

Running the code

Now, it's time to set up some code for the consumer. You'll be able to recognize most of this code from the previous section, Sending the first messages:

  1. Require client libraries.
  2. Read RABBITMQ_URI from ENV.
  3. Start a communication session with RabbitMQ.
  4. Declare a queue for a given taxi.
  5. Declare a direct exchange, taxi-direct.
  6. Bind the queue to the exchange.
  7. Subscribe to the queue.

What follows is the code that's required for the initial consumer setup:

# example_consumer.rb
# 1. Require client library
require "bunny"

# 2. Read RABBITMQ_URI from ENV
connection = Bunny.new ENV["RABBITMQ_URI"]

# 3. Start a communication session with RabbitMQ
connection.start
channel = connection.create_channel

# Method for the processing
def process_order(info)

puts "Handling taxi order"
puts info
sleep 5.0
puts "Processing done"
end

def taxi_subscribe(channel, taxi)
# 4. Declare a queue for a given taxi
queue = channel.queue(taxi, durable: true)

# 5. Declare a direct exchange, taxi-direct
exchange = channel.direct("taxi-direct", durable: true, auto_delete: true)

# 6. Bind the queue to the exchange
queue.bind(exchange, routing_key: taxi)

# 7. Subscribe from the queue
queue.subscribe(block: true, manual_ack: false) do |delivery_info, properties, payload|
process_order(payload)
end
end

taxi = "taxi.1"
taxi_subscribe(channel, taxi)

Here, two flags were added to the subscribe method that need to be explained. Let's look at them in detail:

  • block (Boolean, default false): Should the call block the calling thread? This option can be useful for keeping the main thread of a script alive. It is incompatible with automatic connection recovery and is not generally recommended.
  • manual_ack (Boolean, default false): In CC's case, since the risk of losing a message is acceptable during this phase, the system does not manually acknowledge messages. Instead, it informs the broker to consider them as acknowledged as soon as it fetches them (more on manual acknowledgment later in this book).

And that's it! CC now has a working order request inbox ready to be tested. Next, we'll look at the management console when activated taxis are running.

Running the application

With the application running and a server connected to RabbitMQ, the following established connections can be seen from the management console:

Fig 2.14: The management console provides connection information

Notice how the upstream and downstream network throughputs are clearly represented, and that the channels that get opened and closed very quickly are hard to see from the management console. So, let's look at the following exchanges:

Fig 2.15: The taxi-direct direct exchange showing up in the management console

The user exchange and the rate of messages coming in and out are shown in the management console. The fact that they are being consumed as fast as they come in is a good sign that the current architecture is sufficient for CC's needs and that messages are not piling up. But what are all these other exchanges that haven't been created by code and where are they coming from? The nameless exchange represented as (AMQP default) and all the exchanges with names that start with amq. are defined by the AMQP specification and, as such, must be provided by default by RabbitMQ. Now, what about queues? Let's have a look:

Fig 2.16: Each client-to-taxi inbox queue is visible in the management console

As expected, there is one queue per taxi and some nifty usage statistics. Notice how the ack column is empty, which is no surprise, given how message acknowledgment works. The queue is receiving messages while letting RabbitMQ know it won't be acknowledging them, so there is no activity related to acknowledging messages.


With enough RAM, RabbitMQ can deal with hundreds of queues and bindings without a problem, so multiple queues are not an issue.

Confident about its architecture and implementation, CC rolls out the client-to-taxi ordering subsystem. The client can send the request and the taxi can handle the request.

CC quickly expands the company with two new environmentally friendly cars. As in the previous solution, a client needs to send an order request message to a certain driver. Now, a new feature was requested the capacity to send a message to a group of taxi cars. It should be possible for clients to select a normal taxi or an environmentally friendly taxi. Let's see how CC will implement this new feature through the power of RabbitMQ.

 

Adding topic messages

CC's application allows its taxis to organize themselves into groups by registering their topics of interest. The new feature to roll out will allow clients to send an order request to all taxis within a particular topic. It turns out that this feature matches a specific exchange routing rule, not surprisingly called topic! This type of exchange allows us to route the message to all the queues that have been bound with a routing key matching the routing key of the message. So, unlike the direct exchange that routes a message to one queue maximum, the topic exchange can route it to multiple queues. Two other examples of where topic-based routing could be applied are to location-specific data, such as traffic warning broadcasts, or to trip price updates.

A routing pattern consists of several words separated by dots. A best practice to follow is to structure routing keys from the most general element to the most specific one, such as news.economy.usa or europe.sweden.stockholm.
The topic exchange supports strict routing key matching and will also perform wildcard matching using * and # as placeholders for exactly one word and zero or more words, respectively.

The following diagram illustrates how the topic exchange will be used in CC's application. Notice how the single inbox queue remains unchanged and simply gets connected to the topic exchange via extra bindings, each of them reflecting the interest of a user:

Fig 2.17: The topic exchange sending thematic messages to eco queues

Because the same inbox is used for everything, the code that's already in place for fetching messages doesn't need to be changed. In fact, this whole feature can be implemented with only a few additions. The first of these additions takes care of declaring the topic exchange in the existing on_start method, as follows:

def on_start(channel)
# Declare and return the topic exchange, taxi-topic
channel.topic("taxi-topic", durable: true, auto_delete: true)
end

There's nothing really new or fancy here; the main difference is that this exchange is called taxi-topic and is a topic type of exchange. Sending a message is even simpler than with the client-to-taxi feature because there is no attempt to create the addressee's queue. It wouldn't make sense for the sender to iterate through all the users to create and bind their queues, as only users already subscribed to the target topic at the time of sending will get the message, which is exactly the expected functionality. The order_taxi method is listed here:

# Publishing an order to the exchange
def order_taxi(type, exchange)
payload = "example-message"
message_id = rand
exchange.publish(payload,
routing_key: type,
content_type: "application/json",
content_encoding: "UTF-8",
persistent: true,
message_id: message_id)
end

exchange = on_start(channel)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)

The difference is that, now, messages are published to the taxi-topic exchange. The rest of the code that creates and publishes the message is exactly the same as the client-to-taxi messaging. Lastly, information needs to be added when a new taxi subscribes or unsubscribes from certain topics:

# example_consumer.rb

def taxi_topic_subscribe(channel, taxi, type)
# Declare a queue for a given taxi
queue = channel.queue(taxi, durable: true)

# Declare a topic exchange
exchange = channel.topic('taxi-topic', durable: true, auto_delete: true)

# Bind the queue to the exchange
queue.bind(exchange, routing_key: type)

# Bind the queue to the exchange to make sure the taxi will get any order
queue.bind(exchange, routing_key: 'taxi')

# Subscribe from the queue
queue.subscribe(block:true,manual_ack: false) do |delivery_info, properties, payload|
process_order(payload)
end
end

taxi = "taxi.3"
taxi_topic_subscribe(channel, taxi, "taxi.eco.3")

taxi.3 is the new environmentally friendly taxi, now ready to receive orders from clients that want an environmentally friendly car.

The AMQP specification does not provide any means to introspect the current bindings of a queue, so it is not possible to iterate them and remove the ones not needed anymore in order to reflect a change in a taxi's topics of interest. This is not a terrible concern because the application is required to maintain this state anyway.

The RabbitMQ management console exposes a REST API that can be used to perform queue binding introspection, among many other features not covered by the AMQP specification. More about that in upcoming chapters.

With this new code in place, everything works as expected. No code changes are needed to retrieve the new client-to-taxi orders because they arrive in the same inbox queue as the previous messages. Topical messages are sent and received correctly by the taxi cars, and all this happens with a minimal change and no increase in the number of queues. When connected to the management console, click on the Exchanges tab; the only visible difference is the new exchange topic; that is, taxi-topic.

 

Summary

This chapter covered how to connect to RabbitMQ and how to send and receive order messages. The car order system was successfully created, and direct and topic exchanges were put in motion in the context of CC's client-to-taxi and client-to-taxis features.

As Complete Car grows, so does the demand for new features in the taxi application. What's next for CC as it meets user demand? The next chapter explains how to work with channels and queues to expand the features of the app.

About the Authors

  • Lovisa Johansson

    Lovisa Johansson has been working daily with RabbitMQ for many years. Through CloudAMQP, the company 84codes offers managed RabbitMQ clusters. With 50,000+ running instances, they are considered to be the largest provider of RabbitMQ as a Service in the world. Lovisa is an experienced developer with a Master's degree in Computer Science and Engineering. Through her work, she continues to write the most popular and widespread educational content about RabbitMQ, and occasionally shares this knowledge as a speaker at various conferences.

    Browse publications by this author
  • David Dossot

    David Dossot has worked as a software engineer and architect for more than 18 years. He has been using RabbitMQ since 2009 in a variety of different contexts. He is the main contributor to the AMQP transport for Mule. His focus is on building distributed and scalable server-side applications for the JVM and the Erlang VM. He is a member of IEEE, the Computer Society, and AOPA, and holds a diploma in Production Systems Engineering from ESSTIN. He is a Mule champion and a DZone Most Valuable Blogger. He commits on multiple open source projects and likes to help people on Stack Overflow. He's also a judge for the annual Jolt Awards software competition.

    Browse publications by this author
Book Title
Unlock this book and the full library for only $5/m
Access now