Basic Coding with HornetQ: Creating and Consuming Messages

Exclusive offer: get 50% off this eBook here
HornetQ Messaging Developer’s Guide

HornetQ Messaging Developer’s Guide — Save 50%

Avoid being stung by JBoss HornetQ messaging service whether you're an existing user or a newcomer with this agile, fast-paced, example-rich guide with this book and ebook

$26.99    $13.50
by Piero Giacomelli | November 2012 | JBoss Java Open Source

Exchanging information in the form of short messages is becoming more and more important, so frameworks for doing this will be a key factor in software development. Messages and information can be exchanged at exponential speed with JBoss HornetQ asynchronous messaging middleware.

In this article by Piero Giacomelli, author of HornetQ Messaging Developer’s Guide, we will cover the following topics::

  • Installing Eclipse and NetBeans for developing with HornetQ on both Windows and Ubuntu
  • Setting up a development environment for working with HornetQ core API in Eclipse and NetBeans
  • Creating an example for producing and consuming messages in HornetQ in both a synchronous and an asynchronous way
  • Implementing some classes using the performance practice for managing core API connections, sessions, and clients

(For more resources related to this topic, see here.)

Installing Eclipse on Windows

You can download the Eclipse IDE for Java EE developers (in our case the ZIP file eclipse-jee-indigo-SR1-win32.zip) from http://www.eclipse.org/downloads/. Once downloaded, you have to unzip the eclipse folder inside the archive to the destination folder so that you have a folder structure like the one illustrated in the following screenshot:

Now a double-click on the eclipse.exe file will fire the first run of Eclipse.

Installing NetBeans on Windows

NetBeans is one of the most frequently used IDE for Java development purposes. It mimics the Eclipse plugin module's installation, so you could download the J2EE version from the URL http://netbeans.org/downloads/.But remember that this version also comes with an integrated GlassFish application server and a Tomcat server. Even in this case you only need to download the .exe file (java_ee_sdk-6u3-jdk7-windows.exe, in our case) and launch the installer. Once finished, you should be able to run the IDE by clicking on the NetBeans icon in your Windows Start menu.

Installing NetBeans on Linux

If you are using a Debian-based version of Linux like Ubuntu, installing both NetBeans and Eclipse is nothing more than typing a command from the bash shell and waiting for the installation process to finish.

As we are using Ubuntu Version 11, we will type the following command from a non-root user account to install Eclipse:

sudo apt-get install eclipse

The NetBeans installation procedure is slightly different due to the fact that the Ubuntu repositories do not have a package for a NetBeans installation.

So, for installing NetBeans you have to download a script and then run it. If you are using a non-root user account, you need to type the following commands on a terminal:

sudo wget http://download.netbeans.org/netbeans/7.1.1/final/bundles/ netbeans-7.1.1-ml-javaee-linux.sh sudo chmod +x netbeans-7.1.1-ml-javaee-linux.sh ./netbeans-7.1.1-ml-javaee-linux.sh

During the first run of the IDE, Eclipse will ask which default workspace the new projects should be stored in. Choose the one suggested, and in case you are not planning to change it, check the Use this as the default and do not ask again checkbox for not re-proposing the question, as shown in the following screenshot:

The same happens with NetBeans, but during the installation procedure.

Post installation

Both Eclipse and NetBeans have an integrated system for upgrading them to the latest version, so when you have correctly launched the first-time run, keep your IDE updated.

For Eclipse, you can access the Update window by using the menu Help | Check for updates. This will pop up the window, as shown in this screenshot:

NetBeans has the same functionality, which can be launched from the menu.

A 10,000 foot view of HornetQ

Before moving on with the coding phase, it is time to recover some concepts to allow the user and the coder to better understand how HornetQ manages messages.

HornetQ is only a set of Plain Old Java Objects (POJOs) compiled and grouped into JAR files. The software developer could easily grasp that this characteristic leads to HornetQ having no dependency on third-party libraries. It is possible to use and even start HornetQ from any Java class; this is a great advantage over other frameworks.

HornetQ deals internally only with its own set of classes, called the HornetQ core, avoiding any dependency on JMS dialect and specifications. Nevertheless, the client that connects with the HornetQ server can speak the JMS language.

So the HornetQ server also uses a JMS to core HornetQ API translator. This means that when you send a JMS message to a HornetQ server, it is received as JMS and then translated into the core API dialect to be managed internally by HornetQ. The following figure illustrates this concept:

The core messaging concepts of HornetQ are somewhat simpler than those of JMS:

  • Message: This is a unit of data that can be sent/delivered from a consumer to a producer. Messages have various possibilities. But only to cite them, a message can have: durability, priority, expiry time, time, and dimension.
  • Address: HornetQ maintains an association between an address (IP address of the server) and the queues available at that address. So the message is bound to the address.
  • Queue: This is nothing more than a set of messages. Like messages, queues have attributes such as durability, temporary, and filtering expressions.
HornetQ Messaging Developer’s Guide Avoid being stung by JBoss HornetQ messaging service whether you're an existing user or a newcomer with this agile, fast-paced, example-rich guide with this book and ebook
Published: October 2012
eBook Price: $26.99
Book Price: $44.99
See more
Select your format and quantity:

Thinking, then coding

HornetQ can handle messages in a very efficient way, but, as usual, lots of improvements can be managed by coding the message consumer and the message producer in a way that avoids any anti-pattern in performance and tuning. Remembering the HornetQ user manual, if you work on the code side of things, you need to avoid instantiating the producer, consumer, connections, or sessions for one single message. This is the most common performance anti-pattern, so you should try to avoid it.

We also need to alert the user that it's possible to improve performance by considering the following server-side check list:

  • Tuning the journal
  • Tuning JMS
  • Tuning settings
  • Tuning JVM
  • Tuning your code

The HornetQ core API example

We are now ready to move on the example, which used JMS to create and consume messages. We will re-code the same example using the core API. We will then code a message producer and a message consumer that will push/read ECG signals using core API messages to a HornetQ standalone non-clustered server.

We will try to get only one connection object that is charged with connecting to the HornetQ standalone non-clustered server that is running on our test environment. Even the session that is shared between the consumer client and the producer client will be created with only one object.

Finally, we will detail how to code the message producer and the message consumer.If you use the core API objects, you will see that the message object has many methods to integrate objects into it. So we will take a look at some possible implementations and we will suggest some others to the user.

Preparing your development environment

This step is pretty easy to be arranged; if you use the HornetQ core API, you only need to add to your application classpath the following JAR files that can be found in the HORNETQ_ROOT\lib folder:

  • HornetQ-core.jar
  • HornetQ-core-client.jar
  • Netty.jar

A HornetQ server can be created and used directly from Java code; from this it follows that the queues can also be created at runtime, but in this case we will detail the queue into the configuration file of HornetQ. Before going on we need to understand the difference between the JMS queue and the core API queue. In the JMS implementation there exists the idea of a message topic, which is absent in the core API. This means that in core API you have an address where the server is running on one or more queues bounded to this address. With core API it is possible to create a queue at runtime in your code or to use a queue mapped to your address. In our case, we will use a queue mapped to the localhost address.

So, we will create a durable queue that will be used by both the producer client and the consumer client to deliver and use messages. The difference between durable and non-durable queue is pretty simple—non durable queues do not survive after a server restart, meaning that the messages that are not consumed are lost if the server crashes.

To configure such queues simply open, using your preferred text editor, the hornetq-configuration.xml file that you can find in the HORNETQ_ROOT\config\standalone\non-clustered\ folder. Inside the <configuration></configuration> XML element, add the following element:

<queues> <queue name="jms.queue.mytestqueue"> <address>jms.queue.mytestqueue</address> <durable>true</durable> </queue> </queues>

This will create a predefined core API queue that is durable, meaning that all the messages bound to this queue will survive a server restart. By convention, all the predefined core queues have the jms.queue prefix before the name of the queue you want to assign.

To see for yourself how HornetQ works, at runtime you can change the hornetqconfiguration.xml file and you will see that HornetQ will get the change and display something like the following screenshot:

Now we can move to our IDE to prepare the development environment.

Using the New Java Project wizard, create a new Java project called Chapter03 with five classes in it:

  • MyCoreClientFactory: This will initialize the CoreClientFactory class
  • MyCoreSession: This is in charge of returning a valid CoreSession class
  • MyCoreMessageProducer: This is same for the CoreMessageProducer class
  • MyCoreMessageConsumer: This is same for the CoreMessageConsumer class
  • MyCoreTest: This is the class that will test the previous ones

So, in Eclipse, you will have the following configuration for the project:

While in NetBeans (Ubuntu), you should have something like the following screenshot:

We are now ready to move on to the coding phase.

For the reader who would like to use a more object-oriented approach, we suggest, apart from the MyCoreTest class, the creation of an interface for the other classes in order to have every class as an implementation of a known interface.

Creating a shared core API connection

We are now ready to code the connection class that will be used by the session class to connect to a standalone HornetQ server using the core API. Basically, the MyCoreConnectionFactory class has the following static methods:

Public static ClientSessionFactory getConnectionFactory(string host, int port) Private static HashMap createSettings(String host,int port) Public static void close();

The aim of this class is to manage one ClientSessionFactory HornetQ object, so that it will be possible for multiple session objects to use it. The public main method is declared as static, so we need the following static objects:

private static ClientSessionFactory factory = null; static HashMap map = null; private static TransportConfiguration configuration; private static ServerLocator locator;

The code for the main method is:

public static ClientSessionFactory getConnectionFactory(string host, int port) { if (factory == null){ configuration = new TransportConfiguration(NettyConne ctorFactory.class.getName(), createSettings(host, port)); locator = HornetQClient.createServerLocatorWithoutHA( configuration); factory = locator.createSessionFactory(); } return factory; }

Basically, the first time this method is called, the factory object, which is of type ClientSessionFactory, is null (because we choose to declare it as a static property of the class itself) so it will be instantiated and configured according to the IP address and the port where the core server is running using the NettyConnector. Then, the next time we need to access the connection, it will be re-used without any changes. We underline that we need to create a ClientSessionFactory object using a ServerLocator object. The ServerLocator object uses a TransportConfiguration object, which is an object used by a client to specify a connection to a server and its backup, if one exists.

Typically, its constructors take the class name and parameters needed to create the connection. These will be different and will depend on which connector is being used, netty, InVM, or something similar. The difference between the IVM (In Virtual Machine) connector and the netty connector is that the netty connector allows the client and the server to run on different virtual machines. Nevertheless, we strongly encourage using the netty connector, because it is the most confi gurable one.

The netty transport can be used in one of these ways; to use old (blocking) Java IO, NIO (non-blocking), to use straightforward TCP sockets like SSL, or to tunnel over HTTP or HTTPS. In addition to this we also provide a servlet transport.

This is why, even if we do not need the netty.jar library at compile time, we will need it at runtime, to allow the netty connector to work properly.

Having said that the ClientSessionFactory class can be closed, we implement a method named close():

public static void close() { if (factory != null){ factory.close(); factory = null; } }

This method will close the connection and set the factory object to null so that if some other resources need to access the ClientSessionFactory object, it will be reinitialized from scratch.

Creating and sharing a HornetQ session

Now that we have a connection factory at our disposal, it is time to move to a shared session that will be used by both the consumer and the producer of the message.

We follow the same methodology we used for the ConnectionFactory class. We will have only one method that will initialize a ClientSession object and return it after starting. The methods will be as follows:

public static void setSessionParameters(String host, int port) public static ClientSession getSession() public static void start() public static void close()

So we have only one static ClientSession object that is mapped from the ClientSessionFactory class we have just coded. We have some methods to set up the connection and to create a consumer object on a queue defined by the user.

We first need to declare a static org.hornetq.api.core.client.ClientSession object that will be available and used every time we will call a method of the class in the following way:

private static ClientSession session = null;

The first method to be implemented is the setSessionParameters(String host, int port) method, which will be the one responsible for creating a connection within the MyCoreClientFactory object we just coded. The code is very simple:

if (session == null) { System.out.println("creating session at " + new java. util.Date()); session = MyCoreClientFactory. getClientSessionFactory(host, port).createSession(false, true, true); }

So we call the getClientSessionFactory method of the first MyCoreClientFactory class with the host and the IP address thus creating a ClientSession object. The three respective Boolean parameters are:

  • xa: This parameter shows whether the session supports XA transaction semantics or not
  • autoCommitSends: This parameter is set to true for automatically commiting message sends, and set to false for commiting manually
  • autoCommitAcks: This parameter is set to true for automatically commiting message acknowledgement, and set to false for commiting manually

So, in this case our connection will support the transaction semantic that will automatically commit messages once the send method of the consumer is called, and will automatically acknowledge every message.

This is a simple case, but in a high-frequency message environment you will need to fine-tune these parameters. However, this will require more coding on the producerconsumer layer for effectively managing the commits and acknowledgements.

We also add a console output to show that the session will be created only once during our test even if we share it between the producer and the consumer.

The next simple method is the getSession() method—one that is in charge of returning to every call the ClientSession object just initialized. The following code is self-explanatory:

public static ClientSession getSession() { return session; }

We also need two more methods to start and stop the instantiated session. Here is the first one:

public static start() { if (session != null) { session.start(); } } This is the second one: public static close() { if (session != null) { session.close(); session = null; } }

To avoid a null object reference, the two methods control—in a very simple way—if the session object is first initialized. Armed with these two classes, we are now ready to move to the interesting part. We will now start to code an object that acts like a MessageProducer class

Coding a client consumer

Now that we have the two static classes for managing the connection and the session, we are now ready to code a reusable object that can be used to send messages to a HornetQ queue.

The aim of this object, once created, is to send a message to the selected HornetQ queue by passing only three parameters—the IP address of the host where HornetQ is running, the port where the netty service is listening, and the name of the queue where we need to put the messages.

We need to instantiate two objects that will be delivering the messages:

private org.hornetq.api.core.client.ClientProducer producer; private String queuename; private org.hornetq.api.core.client.ClientMessage message;

The core ClientMessage object is equipped with lots of methods that can be used both for defining message properties, such as durability and timeout (among others), and for defining various ways to deliver a text message. We invite the reader to refer to the Javadoc API, provided athttp://docs.jboss.org/hornetq/<version>.Final/api/index.html, to start training with the various possibilities offered by the core API implementation, which offers lots of functionalities/methods that surpass the JMS message implementation.

Clearly, we also need a producer that will be in charge of sending the message to the selected queue. In this case we will explicitly declare the queue name at this level.

Apart from the constructor of the class, we only have two methods:

public void setQueueName(String queuename) throws Exception public void send(String _message) throws HornetQException, Exception

The setQueueName(String queuename) method is in charge of defining the bounded queue for the MessageProducer class.

The send method somehow mimics the send method of the JMS MessageProducer class.

The code for setQueueName is shown as follows:

if (producer == null){ producer = MyCoreSession.getSession(). createProducer(queuename); }

So, this method is in charge of both creating the MessageProducer class from the session we have instantiated before and to bind it to the queue specified by the queuename string.

So, using a cascade method when you, for the first time, use the MyCoreMessageProducer class and you call the setQueueName method you will have the following backward calls to the previous static classes:

  • A MyCoreSession.getSession() call to obtain the CoreSession object
  • A backward call to the getClientSessionFactory method of the MyCoreClientFactory class

So, the first time the MyCoreMessageProducer class is called, all the core API objects are initialized correctly and then reused without creating new instances; this greatly reduces the creation of new objects and helps in controlling the information flow.

Once we have a CoreProducer object correctly set and bound to a queue, we only need to send the message using the send method.

The code is shown as follows:

message = MyCoreSession.getSession().createMessage(false); message.putStringProperty("ecg", _message); producer.send(message); System.out.println("message sent successfully");

First the message needs to be assigned to a session, so we need to re-use our static getSession method of the MyCoreSession class. The createMessage method that we call at the end accepts at least a Boolean type that specifies if the message is durable or not. In our case we will not need the message to stay in the queue until a consumer parses it. Other parameters of the createMessage method can be durability, priority, and so on, but again, a good look at Javadoc can give the coder all the possibilities.

On the other hand, we know that it is time for the message to be filled with the text we want to transmit. As mentioned earlier, the ClientMessage core object comes with various methods to pass simple types as messages. In fact it is also possible to deliver the so-called large messages that accept an InputStream as the body of the message. This makes it possible to directly send the files as messages. HornetQ uses the HORNETQ_ROOT\data folder to physically store the queue; in this case we have a queue named large-message where it is possible to deliver InputStream. When using this technique, the only limit to the dimension of the message delivered is the amount of disk space available to the HORNETQ_ROOT\data folder.

The HornetQ user manual claims to have tested the delivery of a message that was 8 GB in size on a machine running with 50 MB of RAM. We never tested a stressful situation and we alert the reader that other parameters should be fine-tuned for performances like this.

Considering such performances and considering that it is possible to use InputStream for storing the body of HTML pages via URLs, why not consider HornetQ for creating a fast, multithreaded, and high-performance web spider?

In our case we will use the possibility to store in a message and retrieve in a second moment a key/value pair, like in a Java hashtable structure. In our example the message will be a single three signal ECG measurement and our test message will be a string like the following one:

String theECG = "1;02/20/2012 14:01:59.010;1020,1021,1022";

We will assign this string as a key/value couple as follows:

"ecg"/"1;02/20/2012 14:01:59.010;1020,1021,1022";

Only as a reference, it is also possible to write directly to the body of the message using the getBodyBuffer().writeString(String value) method.

For large messages we invite the reader to take a look at the setBodyInputStream method of the CoreMessage class.

Testing the first message sent

Before coding the consumer class, let's test the classes we coded.

So before moving on, open the MyCoreTest class with your IDE and add the following code to the main method:

public static void main(String[] argvs) throws Exception{ try { MyCoreSession.setSessionParameters("localhost", 5445); MyCoreMessageProducer m = new MyCoreMessageProducer(); m.setQueuename("jms.queue.myqueue"); m.send("1;02/20/2012 14:01:59.010;1020,1021,1022"); } catch (Exception e) { e.printStackTrace(); } finally { MyCoreSession.close(); MyCoreClientFactory.close(); } System.exit(0); } }

So, as you can see, we first set in the MyCoreSession class the host and the port where HornetQ is running; this will create the ClientSessionFactory and the ClientSession objects in cascade.

Having done that, we create a MyCoreMessageProducer object and bind it to a specific queue. Then we will send the message and close the session and connection. I suggest you close the session and the connection in a finally block because if you have a runtime exception the next time you re-run the program, the pending session will affect the way you can consume messages.

Our efforts are shown in the following screenshot:

Only to show that the objects are re-used correctly according to the code performance practice we underlined at the beginning, if you change the code to send, for example, five messages in the following way:

try { MyCoreSession.setSessionParameters("localhost", 5445); MyCoreMessageProducer m; m = new MyCoreMessageProducer(); m.setQueuename("jms.queue.myqueue"); for (int i = 1; i < 5; i++){ m.send("1;02/20/2012 14:01:59.010;1020,1021,1022"); } MyCoreSession.close(); MyCoreClientFactory.close(); } catch(Exception ex) { e.printStackTrace(); }

You'll see the following output:

This confirms that the connection and the session have been created only once, from the inner object to the outermost one.

HornetQ Messaging Developer’s Guide Avoid being stung by JBoss HornetQ messaging service whether you're an existing user or a newcomer with this agile, fast-paced, example-rich guide with this book and ebook
Published: October 2012
eBook Price: $26.99
Book Price: $44.99
See more
Select your format and quantity:

Coding the HornetQ core API consumer

Let's move to the end of our coding example, consuming the message we have just sent to the queue.

Our final class is the MyCoreMessageConsumer class, which is in charge of connecting to the queue and parsing the message that we pushed with the MessageProducer class.

In this case we will reuse the same MyCoreSession class and hence the MyCoreClientFactory class, too.

The class only has the following attributes:

private org.hornetq.api.core.client.ClientConsumer consumer; private org.hornetq.api.core.client.ClientMessage message;

The consumer is the one that we will create to receive the messages and ClientMessage will be the object where the body of the message will be stored, containing the key/value for the ECG string that we used in the MessageProducer class.

The only methods we will implement are the following two:

public void getMessages(String queuename) throws HornetQException, Exception public void describeMessage() throws IllegalArgumentException, IllegalAccessException

The getMessage method accepts the queuename and initializes a MessageConsumer object on that queue so that we can receive the message. The method is implemented as follows:

public void getMessages(String queuename) throws HornetQException, Exception{ consumer = MyCoreSession.getSession(). createConsumer(queuename); message = consumer.receive(); System.out.println("Received TextMessage:" + message. getStringProperty("ecg")); }

As you can see, we call the same static method that we used in the MyCoreProducer class; so, according to the previous code the session is initialized within the same class and we get the same reference that we have for the message consumer class. With this session we create a consumer bound on the queuename queue that is passed as a parameter. While the MyCoreProducer class is able to work without exception, even for a queue not defined on the server, the consumer needs to be bound correctly with a queue name.

This behavior could seem strange, but HornetQ bounds queues to the address, so if you send the message to a queue that does not exist, the messages simply disappear. In case you are using the JMS implementation when you try to send a message to a queue that is not defined, you will get an InvalidDestinationException exception.

The CoreClientConsumer class is equipped with a receive method that can be called without a parameter or passing a millisecond integer parameter that will block the receiving of a message until the millisecond timeout has passed. In case the MessageConsumer class is blocked to the queue until something happens, the process is destroyed or the network is down.

So, for example, if you want to receive a message and set up a timeout of one second for each message, simply use the following lines along with the code:

MyCoreSession.getSession().createConsumer(queuename); message = consumer.receive(1000);

Once received, we need to parse the message to retrieve the correct key/value pair. This is done by using the getStringProperty method and recalling the correct key identifier, which in our case is the ECG string.

If you try to access a key not stored in the message object you will have a null value returned, so a better way to read a property of a message is to use the containsProperty method.

A more correct code follows:

if (message.containsProperty("ecg")) System.out.println("Received TextMessage:" + message.getStringProperty("ecg"));

We also add a method to describe all the fields of the message so that we have a fast method for logging some important attributes of the message to the console.

This method uses reflection to find the fields of the ClientMessage object inside our MyCoreMessageConsumer class. We will call the method on the superclass, after setting the visibility of every field to avoid problems in accessing protected/private attributes.

Reflection is one of the great possibilities that Java offers to the developer, so we will only give a simple example and leave the reader any possible future implementation that they want to make.

The code for the describeMessage() method is the following:

Class<?> parentClass = this.message.getClass(). getSuperclass(); Field[] fields = parentClass.getDeclaredFields(); for (Field field : fields) { field.setAccessible(true); if (field.get(this.message) != null) System.out.println("field: " + field.getName() + " : " + field.get(this.message).toString() ); else System.out.println("field: " + field.getName() + " : null"); }

Some fields can have a null value, so we need to control them to avoid null reference exceptions at runtime.

Considering the field property that we can set on a CoreMessage object, we need to point out the most useful ones in a transactional environment:

  • messageID: This is a unique identifier that is automatically assigned when the message is produced
  • servertimestamp: This is the long timestamp value corresponding to the time this message was handled by a HornetQ server
  • UserID: This is an optional user-specified long value that can be set to identify the message and will be passed around with the message
  • Priority: This is a byte-valued field that ranges from 0 (less priority and default value) to 9 (more priority), inclusive of both

Putting everything together

Now that we have also coded the MyCoreMessageConsumer class, we are ready to put everything together to have a fully working example. So we reopen our MyCoreTest class to add the functionality of the MyCoreMessageConsumer class.

The complete code for the main method is shown as follows:

public static void main(String[] argvs) throws Exception{ try { MyCoreSession.setSessionParameters("localhost", 5445); MyCoreMessageProducer m = new MyCoreMessageProducer(); m.setQueuename("jms.queue.myqueue"); m.send("1;02/20/2012 14:01:59.010;1020,1021,1022"); MyCoreSession.start(); MyCoreMessageConsumer c = new MyCoreMessageConsumer(); c.getMessages("jms.queue.myqueue"); c.describeMessage(); } catch (Exception e) { e.printStackTrace(); } finally { MyCoreSession.close(); MyCoreClientFactory.close(); } System.exit(0); }

The following are the steps to follow:

  1. First we start the ClientSession object bind inside the MyCoreSession class.
  2. Then we create the MyCoreMessageConsumer object (so that it will be possible to use) and call the getMessages method on the queue we used to produce the messages.
  3. Lastly, we describe the message received and close the ClientSession and ClientFactory objects.

The output of our efforts is the following console:

Our example using only HornetQ core API is over, but we need to underline some remarks.

Final considerations

There are some remarks that we should note to prevent you from possibly misunderstanding how the core messages work.

First we need to point out that, in the implementation done in this article, we assume implicitly that the messages are produced in a synchronous fashion by the consumer. This means that even if we produce messages in a series, only the last one is consumed.

To arrange a MessageConsumer class that works in an asynchronous way, we need to detail how the message is consumed in a better way.

If a MessageConsumer class needs to access a queue in an asynchronous way, the solution is to have a Java EventHandler class that somehow fires up when a new message is read.

So the MessageConsumer class has the setMessageHandler method. This is the method that accepts as a parameter a class that implements the MessageHandler interface. The MessageHandler interface has only one method that needs to be implemented and that will be fired up once a new message is consumed asynchronously.

To set the asynchronicity for a generic consumer, you can refer to the following code:

ClientConsumer consumer = session.createConsumer("jms.queue.myqueue"); consumer.setMessageHandler(new myMessageHandler());

As you can see, we created a new class that implements the MessageHandler interface. One possible implementation is:

import java.util.Date; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.MessageHandler; public class myMessageHandler implements MessageHandler { @Override public void onMessage(ClientMessage message) { // TODO Auto-generated method stub System.out.println("received message " + message. getMessageID() + " at " + new Date(message.getTimestamp())); } }

Once a new message is consumed in the onMessage method, it is fired by passing the new ClientMessage object. In our implementation, we have to only write to the console, the messageID, and the timestamp of the message received by the HornetQ server.

Only to display some information, we convert to a Date format, but you can do anything you need for implementing the logic layer of your application.

Have you done your math?

Lastly, we would like to suggest some possible improvements on the code, hoping that fighting against some first-use difficulties could help with a deeper understanding of the layer interaction in HornetQ.

First, an obvious implementation of the code done previously is to add the possibility in the MyCoreMessageConsumer class to consume messages asynchronously by adding a new method and a new MessageHandler implementation. Using the class we coded, it would be nice to create a client producer that sends messages and a client consumer that runs on a separate JVM that reads messages asynchronously.

Another good exercise is to add to the consumer the MongoDB interaction. This can be done by implementing the getMessages method of the MyCoreMessageConsumer class, both synchronously and asynchronously.

The code we had can be improved in several ways. We invite the reader to change it in the spirit of the open source community.

JMS messaging is implemented by HornetQ but core API is much more efficient because it works with low level machine capacity but has some limitations; for example, core messages are just able to send byte messages with aggregated properties. So you need to think which will be the best solution for your project.

Summary

In this article we learned how to install a development environment, create a set of reusable classes for managing messages using the core API, and saw some specific properties of a core API message.

Resources for Article :


Further resources on this subject:


About the Author :


Piero Giacomelli

Piero Giacomelli started playing with computers back in 1986 when he received his first PC (a commodore 64). Despite his love for computers, he graduated in Mathematics, entered the professional software industry in 1997, and started using Java.

He has been involved in a lot of software projects using Java, .NET, and PHP. He is not only a great fan of JBoss and Apache technologies, but also uses Microsoft technologies without moral issues.

He has worked in many different industrial sectors, such as aerospace, ISP, textile and plastic manufacturing, and e-health association, both as a software developer and as an IT manager. He has also been involved in many EU research-funded projects in FP7 EU programs, such as CHRONIOUS, I-DONT-FALL, FEARLESS, and CHROMED.

In recent years, he has published some papers on scientific journals and has been awarded two best paper awards by the International Academy, Research and Industry Association (IARIA).

In 2012, he published HornetQ Messaging Developer's Guide, Packt Publishing, which is a standard reference book for the Apache HornetQ Framework.

He is married with two kids, and in his spare time, he regresses to his infancy ages to play with toys and his kids.

Books From Packt


jBPM 5 Developer Guide
jBPM 5 Developer Guide

JBoss AS 5 Development
JBoss AS 5 Development

Scalix:   Linux Administrator's Guide
Scalix: Linux Administrator's Guide

Web Services Testing with soapUI
Web Services Testing with soapUI

Joomla! 1.5 Top Extensions Cookbook
Joomla! 1.5 Top Extensions Cookbook

JBoss ESB Beginner’s Guide
JBoss ESB Beginner’s Guide

JBoss AS 7   Configuration, Deployment and Administration
JBoss AS 7 Configuration, Deployment and Administration

WebSphere Application Server 7.0 Administration Guide
WebSphere Application Server 7.0 Administration Guide


No votes yet

Post new comment

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.
g
Z
P
Z
P
W
Enter the code without spaces and pay attention to upper/lower case.
Code Download and Errata
Packt Anytime, Anywhere
Register Books
Print Upgrades
eBook Downloads
Video Support
Contact Us
Awards Voting Nominations Previous Winners
Judges Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software
Resources
Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software