Aggregators, File exchange Over FTP/FTPS, Social Integration, and Enterprise Messaging

Chandan Pandey

February 2015

In this article by Chandan Pandey, the author of Spring Integration Essentials, we will explore the out-of-the-box capabilities that the Spring Integration framework provides for a seamless flow of messages across heterogeneous components and see what Spring Integration has in the box when it comes to real-world integration challenges. We will cover Spring Integration's support for external components and we will cover the following topics in detail:

  • Aggregators
  • File exchange over FTP/FTPS
  • Social integration
  • Enterprise messaging

(For more resources related to this topic, see here.)


The aggregators are the opposite of splitters - they combine multiple messages and present them as a single message to the next endpoint. This is a very complex operation, so let's start by a real life scenario. A news channel might have many correspondents who can upload articles and related images. It might happen that the text of the articles arrives much sooner than the associated images - but the article must be sent for publishing only when all relevant images have also arrived. This scenario throws up a lot of challenges; partial articles should be stored somewhere, there should be a way to correlate incoming components with existing ones, and also there should be a way to identify the completion of a message. Aggregators are there to handle all of these aspects - some of the relevant concepts that are used are MessageStore, CorrelationStrategy, and ReleaseStrategy. Let's start with a code sample and then we will dive down to explore each of these concepts in detail:

  message-store="feedsMySqlStore "
  <int:poller fixed-rate="1000"></int:poller>

Hmm, a pretty big declaration! And why not—a lot of things combine together to act as an aggregator. Let's quickly glance at all the tags used:

  • int:aggregator: This is used to specify the Spring framework's namespace for the aggregator.
  • input-channel: This is the channel from which messages will be consumed.
  • output-channel: This is the channel to which messages will be dropped after aggregation.
  • ref: This is used to specify the bean having the method that is called on the release of messages.
  • method: This is used to specify the method that is invoked when messages are released.
  • release-strategy: This is used to specify the bean having the method that decides whether aggregation is complete or not.
  • release-strategy-method: This is the method having the logic to check for completeness of the message.
  • correlation-strategy: This is used to specify the bean having the method to correlate the messages.
  • correlation-strategy-method: This is the method having the actual logic to correlate the messages.
  • message-store: This is used to specify the message store, where messages are temporarily stored until they have been correlated and are ready to release. This can be in memory (which is default) or can be a persistence store. If a persistence store is configured, message delivery will be resumed across a server crash.

Java class can be defined as an aggregator and, as described in the previous bullet points, the method and ref parameters decide which method of bean (referred by ref) should be invoked when messages have been aggregated as per CorrelationStrategy and released after fulfilment of ReleaseStrategy. In the following example, we are just printing the messages before passing them on to the next consumer in the chain:

public class SoFeedAggregator {
  public List<SyndEntry> aggregateAndPublish(List<SyndEntry> 
    messages) {     //Do some pre-processing before passing on to next channel     return messages;   } }

Let's get to the details of the three most important components that complete the aggregator.

Correlation strategy

Aggregator needs to group the messages—but how will it decide the groups? In simple words, CorrelationStrategy decides how to correlate the messages. The default is based on a header named CORRELATION_ID. All messages having the same value for the CORRELATION_ID header will be put in one bracket. Alternatively, we can designate any Java class and its method to define a custom correlation strategy or can extend Spring Integration framework's CorrelationStrategy interface to define it. If the CorrelationStrategy interface is implemented, then the getCorrelationKey() method should be implemented. Let's see our correlation strategy in the feeds example:

public class CorrelationStrategy {
  public Object groupFeedsBasedOnCategory(Message<?> message) {
      SyndEntry entry = (SyndEntry)message.getPayload();
      List<SyndCategoryImpl> categories=entry.getCategories();
        for (SyndCategoryImpl category: categories) {
          //for simplicity, lets consider the first category
          return category.getName();
    return null;

So how are we correlating our messages? We are correlating the feeds based on the category name. The method must return an object that can be used for correlating the messages. If a user-defined object is returned, it must satisfy the requirements for a key in a map such as defining hashcode() and equals(). The return value must not be null.

Alternatively, if we would have wanted to implement it by extending framework support, then it would have looked like this:

public class CorrelationStrategy implements CorrelationStrategy {
  public Object getCorrelationKey(Message<?> message) {
            return category.getName();
      return null;

Release strategy

We have been grouping messages based on correlation strategy—but when will we release it for the next component? This is decided by the release strategy. Similar to the correlation strategy, any Java POJO can define the release strategy or we can extend framework support. Here is the example of using the Java POJO class:

public class CompletionStrategy {
  public boolean checkCompleteness(List<SyndEntry> messages) {
        return true;
    return false;

The argument of a message must be of type collection and it must return a Boolean indication whether to release the accumulated messages or not. For simplicity, we have just checked for the number of messages from the same category—if it's greater than two, we release the messages.

Message store

Until an aggregated message fulfils the release criteria, the aggregator needs to store them temporarily. This is where message stores come into the picture. Message stores can be of two types: in-memory and persistence store. Default is in memory, and if this is to be used, then there is no need to declare this attribute at all. If a persistent message store needs to be used, then it must be declared and its reference should be given to the message- store attribute. A mysql message store can be declared and referenced as follows:

<bean id=" feedsMySqlStore "
  <property name="dataSource" ref="feedsSqlDataSource"/>

Data source is Spring framework's standard JDBC data source. The greatest advantage of using persistence store is recoverability—if the system recovers from a crash, all in-memory aggregated messages will not be lost. Another advantage is capacity—memory is limited, which can accommodate a limited number of messages for aggregation, but the database can have a much bigger space.


FTP, or File Transfer Protocol, is used to transfer files across networks. FTP communications consist of two parts: server and client. The client establishes a session with the server, after which it can download or upload files. Spring Integration provides components that act as a client and connect to the FTP server to communicate with it. What about the server—which server will it connect to? If you have access to any public or hosted FTP server, use it. Else, the easiest way for trying out the example in this section is to set up a local instance of the FTP server. FTP setup is out of the scope of this article.


To use Spring Integration components for FTP/FTPS, we need to add a namespace to our configuration file and then add the Maven dependency entry in the pom.xml file. The following entries should be made:

  • Namespace support can be added by using the following code snippet:
  • Maven entry can be added by using the following code snippet:

Once namespace is available and the JAR has been downloaded, we are ready to use the components. As mentioned earlier, client components of Spring Integration need a session to establish with the FTP server. The details of the session is encapsulated in the session factory; let's look at a sample session factory configuration:

<bean id="ftpClientSessionFactory"
    ftp.session.DefaultFtpSessionFactory">   <property name="host" value="localhost"/>   <property name="port" value="21"/>   <property name="username" value="testuser"/>   <property name="password" value="testuser"/> </bean>

The DefaultFtpSessionFactory class is at work here, and it takes the following parameters:

  • Host that is running the FTP server
  • Port at which it's running the server
  • Username
  • Password for the server

A session pool for the factory is maintained and an instance is returned when required. Spring takes care of validating that a stale session is never returned.

Downloading files from the FTP server

Inbound adapters can be used to read the files from the server. The most important aspect is the session factory that we just discussed in the preceding section. The following code snippet configures an FTP inbound adapter that downloads a file from a remote directory and makes it available for processing:

  "C:\\Chandan\\Projects\\siexample\\ftp\\ftplocalfolder"   auto-create-local-directory="true"   delete-remote-files="true"   filename-pattern="*.txt"   local-filename-generator-expression=
  "#this.toLowerCase() + '.trns'">   <int:poller fixed-rate="1000"/> </int-ftp:inbound-channel-adapter>

Let's quickly go through the tags used in this code:

  • int-ftp:inbound-channel-adapter: This is the namespace support for the FTP inbound adapter.
  • channel: This is the channel on which the downloaded files will be put as a message.
  • session-factory: This is a factory instance that encapsulates details for connecting to a server.
  • remote-directory: This is the directory on the server where the adapter should listen for the new arrival of files.
  • local-directory: This is the local directory where the downloaded files should be dumped.
  • auto-create-local-directory: If enabled, this will create the local directory structure if it's missing.
  • delete-remote-files: If enabled, this will delete the files on the remote directory after it has been downloaded successfully. This will help in avoiding duplicate processing.
  • filename-pattern: This can be used as a filter, but only files matching the specified pattern will be downloaded.
  • local-filename-generator-expression: This can be used to generate a local filename.

An inbound adapter is a special listener that listens for events on the remote directory, for example, an event fired on the creation of a new file. At this point, it will initiate the file transfer. It creates a payload of type Message<File> and puts it on the output channel. By default, the filename is retained and a file with the same name as the remote file is created in the local directory. This can be overridden by using local- filename-generator-expression.

Incomplete files

On the remote server, there could be files that are still in the process of being written. Typically, there the extension is different, for example, filename.actualext.writing. The best way to avoid reading incomplete files is to use the filename pattern that will copy only those files that have been written completely.

Uploading files to the FTP server

Outbound adapters can be used to write files to the server. The following code snippet reads a message from a specified channel and writes it inside the FTP server's remote directory. The remote server session is determined as usual by the session factory. Make sure the username configured in the session object has the necessary permission to write to the remote directory. The following configuration sets up a FTP adapter that can upload files in the specified directory:

  <int-ftp:outbound-channel-adapter channel="ftpOutputChannel"

Here is a brief description of the tags used:

  • int-ftp:outbound-channel-adapter: This is the namespace support for the FTP outbound adapter.
  • channel: This is the name of the channel whose payload will be written to the remote server.
  • remote-directory: This is the remote directory where files will be put. The user configured in the session factory must have appropriate permission.
  • session-factory: This encapsulates details for connecting to the FTP server.
  • auto-create-directory: If enabled, this will automatically create a remote directory if it's missing, and the given user should have sufficient permission.

The payload on the channel need not necessarily be a file type; it can be one of the following:

  • A Java file object
  • byte[]: This is a byte array that represents the file contents
  • java.lang.String: This is the text that represents the file contents

Avoiding partially written files

Files on the remote server must be made available only when they have been written completely and not when they are still partial. Spring uses a mechanism of writing the files to a temporary location and its availability is published only when it has been completely written. By default, the suffix is written, but it can be changed using the temporary-file-suffix property. This can be completely disabled by setting use-temporary-file- name to false.

FTP outbound gateway

Gateway, by definition, is a two-way component: it accepts input and provides a result for further processing. So what is the input and output in the case of FTP? It issues commands to the FTP server and returns the result of the command. The following command will issue an ls command with the option –l to the server. The result is a list of string objects containing the filename of each file that will be put on the reply- channel. The code is as follows:

<int-ftp:outbound-gateway id="ftpGateway"

The tags are pretty simple:

  • int-ftp:outbound-gateway: This is the namespace support for the FTP outbound gateway
  • session-factory: This is the wrapper for details needed to connect to the FTP server
  • command: This is the command to be issued
  • command-options: This is the option for the command
  • reply-channel: This is the response of the command that is put on this channel

FTPS support

For FTPS support, all that is needed is to change the factory class—an instance of org.springframework.integration.ftp.session.DefaultFtpsSessionFactory should be used. Note the s in DefaultFtpsSessionFactory. Once the session is created with this factory, it's ready to communicate over a secure channel. Here is an example of a secure session factory configuration:

<bean id="ftpSClientFactory"
  DefaultFtpsSessionFactory">   <property name="host" value="localhost"/>   <property name="port" value="22"/>   <property name="username" value="testuser"/>   <property name="password" value="testuser"/> </bean>

Although it is obvious, I would remind you that the FTP server must be configured to support a secure connection and open the appropriate port.

Social integration

Any application in today's context is incomplete if it does not provide support for social messaging. Spring Integration provides in-built support for many social interfaces such as e-mails, Twitter feeds, and so on. Let's discuss the implementation of Twitter in this section. Prior to Version 2.1, Spring Integration was dependent on the Twitter4J API for Twitter support, but now it leverages Spring's social module for Twitter integration. Spring Integration provides an interface for receiving and sending tweets as well as searching and publishing the search results in messages. Twitter uses oauth for authentication purposes. An app must be registered before we start Twitter development on it.


Let's look at the steps that need to be completed before we can use a Twitter component in our Spring Integration example:

  • Twitter account setup: A Twitter account is needed. Perform the following steps to get the keys that will allow the user to use Twitter using the API:
    1. Visit
    2. Sign in to your account.
    3. Click on Create New App.
      Spring Integration Essentials
    4. Enter the details such as Application name, Description, Website, and so on. All fields are self-explanatory and appropriate help has also been provided. The value for the field Website need not be a valid one—put an arbitrary website name in the correct format.
      Spring Integration Essentials
    5. Click on the Create your application button. If the application is created successfully, a confirmation message will be shown and the Application Management page will appear, as shown here:
      Spring Integration Essentials
    6. Go to the Keys and Access Tokens tab and note the details for Consumer Key (API Key) and Consumer Secret (API Secret) under Application Settings, as shown in the following screenshot:
      Spring Integration Essentials
    7. You need additional access tokens so that applications can use Twitter using APIs. Click on Create my access token; it takes a while to generate these tokens. Once it is generated, note down the value of Access Token and Access Token Secret.
      Spring Integration Essentials
    8. Go to the Permissions tab and provide permission to Read, Write and Access direct messages.
      Spring Integration Essentials

    After performing all these steps, and with the required keys and access token, we are ready to use Twitter. Let's store these in the property file:

    twitter.oauth.apiKey= lnrDlMXSDnJumKLFRym02kHsy
    twitter.oauth.apiSecret= 6wlriIX9ay6w2f6at6XGQ7oNugk6dqNQEAArTsFsAU6RU8F2Td
    twitter.oauth.accessToken= 158239940-FGZHcbIDtdEqkIA77HPcv3uosfFRnUM30hRix9TI
    twitter.oauth.accessTokenSecret= H1oIeiQOlvCtJUiAZaachDEbLRq5m91IbP4bhg1QPRDeh

    The next step towards Twitter integration is the creation of a Twitter template. This is similar to the datasource or connection factory for databases, JMS, and so on. It encapsulates details to connect to a social platform. Here is the code snippet:

    <context:property-placeholder location="classpath: "/>
    <bean id="twitterTemplate" class="
      twitter.api.impl.TwitterTemplate ">   <constructor-arg value="${twitter.oauth.apiKey}"/>   <constructor-arg value="${twitter.oauth.apiSecret}"/>   <constructor-arg value="${twitter.oauth.accessToken}"/>   <constructor-arg value="${twitter.oauth.accessTokenSecret}"/> </bean>

As I mentioned, the template encapsulates all the values. Here is the order of the arguments:

  • apiKey
  • apiSecret
  • accessToken
  • accessTokenSecret

With all the setup in place, let's now do some real work:

Receiving tweets

Spring Integration has exposed inbound adapters for receiving tweets. Twitter tweets are of different types, and Spring provides support for receiving tweets as Timeline Updates, Direct Messages, Mention Messages, as well as Search Results.

Spring's inbound adapter is a polling-based mechanism that polls the Twitter site for updates at defined intervals. Be aware of Twitter's Rate Limiting; it limits the rate at which an application can poll for the updates. Factor this when setting the polling interval so that it's in compliance with the Twitter policies. Let's look at a working code snippet:

<int-twitter:inbound-channel-adapter id="testTweet"

The components in this code are covered in the following bullet points:

  • int-twitter:inbound-channel-adapter: This is the namespace support for Twitter's inbound channel adapter.
  • twitter-template: This is the most important aspect. The Twitter template encapsulates which account to use to poll the Twitter site. The details given in the preceding code snippet are fake; it should be replaced with real connection parameters.
  • channel: Messages are dumped on this channel.

These adapters are further used for other applications, such as for searching messages, retrieving direct messages, and retrieving tweets that mention your account, and so on. Let's have a quick look at the code snippets for these adapters. I will not go into detail for each one; they are almost similar to what have been discussed previously.

  • Search: This adapter helps to search the tweets for the parameter configured in the query tag. The code is as follows:
    <int-twitter:search-inbound-channel-adapter id="testSearch"
  • Retrieving Direct Messages: This adapter allows us to receive the direct message for the account in use (account configured in Twitter template). The code is as follows:
      id="testdirectMessage"   twitter-template="twiterTemplate"   channel="twitterDirectMessageChannel"> </int-twitter:dm-inbound-channel-adapter>
  • Retrieving Mention Messages: This adapter allows us to receive messages that mention the configured account via the @user tag (account configured in the Twitter template). The code is as follows:
      id="testmentionMessage"   twitter-template="twiterTemplate"   channel="twitterMentionMessageChannel"> </int-twitter:mentions-inbound-channel-adapter>

Sending tweets

Twitter exposes outbound adapters to send messages. Here is a sample code:


Whatever message is put on the twitterSendMessageChannel channel is tweeted by this adapter. Similar to an inbound gateway, the outbound gateway provides support for sending direct messages. Here is a simple example of an outbound adapter:


Any message that is put on the twitterSendDirectMessage channel is sent to the user directly. But where is the name of the user to whom the message will be sent? It is decided by a header in the message TwitterHeaders.DM_TARGET_USER_ID. This must be populated either programmatically, or by using enrichers or SpEL. For example, it can be programmatically added as follows:

Message message = MessageBuilder.withPayload("Chandan")

Alternatively, it can be populated by using a header enricher, as follows:

<int:header-enricher input-channel="twitterIn"
  <int:header name="twitter_dmTargetUserId" value=" test_id "/>

Twitter search outbound gateway

As gateways provide a two-way window, the search outbound gateway can be used to issue dynamic search commands and receive the results as a collection. If no result is found, the collection is empty. Let's configure a search outbound gateway, as follows:

  <int-twitter:search-outbound-gateway id="twitterSearch"

And here is what the tags covered in this code mean:

  • int-twitter:search-outbound-gateway: This is the namespace for the Twitter search outbound gateway
  • request-channel: This is the channel that is used to send search requests to this gateway
  • twitter-template: This is the Twitter template reference
  • search-args-expression: This is used as arguments for the search
  • reply-channel: This is the channel on which searched results are populated

This gives us enough to get started with the social integration aspects of the spring framework.

Enterprise messaging

Enterprise landscape is incomplete without JMS—it is one of the most commonly used mediums of enterprise integration. Spring provides very good support for this. Spring Integration builds over that support and provides adapter and gateways to receive and consume messages from many middleware brokers such as ActiveMQ, RabbitMQ, Rediss, and so on.

Spring Integration provides inbound and outbound adapters to send and receive messages along with gateways that can be used in a request/reply scenario. Let's walk through these implementations in a little more detail. A basic understanding of the JMS mechanism and its concepts is expected. It is not possible to cover even the introduction of JMS here. Let's start with the prerequisites.


To use Spring Integration messaging components, namespaces, and relevant Maven the following dependency should be added:

  • Namespace support can be added by using the following code snippet:
    xmlns: int-jms=
  • Maven entry can be provided using the following code snippet:

After adding these two dependencies, we are ready to use the components. But before we can use an adapter, we must configure an underlying message broker. Let's configure ActiveMQ. Add the following in pom.xml:


After this, we are ready to create a connection factory and JMS queue that will be used by the adapters to communicate. First, create a session factory. As you will notice, this is wrapped in Spring's CachingConnectionFactory, but the underlying provider is ActiveMQ:

<bean id="connectionFactory" class="org.springframework.
  jms.connection.CachingConnectionFactory">   <property name="targetConnectionFactory">     <bean class="org.apache.activemq.ActiveMQConnectionFactory">       <property name="brokerURL" value="vm://localhost"/>     </bean>   </property> </bean>

Let's create a queue that can be used to retrieve and put messages:

  <constructor-arg value="queue.input"/>

Now, we are ready to send and retrieve messages from the queue. Let's look into each message one by one.

Receiving messages – the inbound adapter

Spring Integration provides two ways of receiving messages: polling and event listener. Both of them are based on the underlying Spring framework's comprehensive support for JMS. JmsTemplate is used by the polling adapter, while MessageListener is used by the event-driven adapter. As the name suggests, a polling adapter keeps polling the queue for the arrival of new messages and puts the message on the configured channel if it finds one. On the other hand, in the case of the event-driven adapter, it's the responsibility of the server to notify the configured adapter.

The polling adapter

Let's start with a code sample:

  <int:poller fixed-rate="1000" />

This code snippet contains the following components:

  • int-jms:inbound-channel-adapter: This is the namespace support for the JMS inbound adapter
  • connection-factory: This is the encapsulation for the underlying JMS provider setup, such as ActiveMQ
  • destination: This is the JMS queue where the adapter is listening for incoming messages
  • channel: This is the channel on which incoming messages should be put

There is a poller element, so it's obvious that it is a polling-based adapter. It can be configured in one of two ways: by providing a JMS template or using a connection factory along with a destination. I have used the latter approach. The preceding adapter has a polling queue mentioned in the destination and once it gets any message, it puts the message on the channel configured in the channel attribute.

The event-driven adapter

Similar to polling adapters, event-driven adapters also need a reference either to an implementation of the interface AbstractMessageListenerContainer or need a connection factory and destination. Again, I will use the latter approach. Here is a sample configuration:


There is no poller sub-element here. As soon as a message arrives at its destination, the adapter is invoked, which puts it on the configured channel.

Sending messages – the outbound adapter

Outbound adapters convert messages on the channel to JMS messages and put them on the configured queue. To convert Spring Integration messages to JMS messages, the outbound adapter uses JmsSendingMessageHandler. This is is an implementation of MessageHandler. Outbound adapters should be configured with either JmsTemplate or with a connection factory and destination queue. Keeping in sync with the preceding examples, we will take the latter approach, as follows:


This adapter receives the Spring Integration message from jmsChannel, converts it to a JMS message, and puts it on the destination.


Gateway provides a request/reply behavior instead of a one-way send or receive. For example, after sending a message, we might expect a reply or we may want to send an acknowledgement after receiving a message.

The inbound gateway

Inbound gateways provide an alternative to inbound adapters when request-reply capabilities are expected. An inbound gateway is an event-based implementation that listens for a message on the queue, converts it to Spring Message, and puts it on the channel. Here is a sample code:


However, this is what an inbound adapter does—even the configuration is similar, except the namespace. So, what is the difference? The difference lies in replying back to the reply destination. Once the message is put on the channel, it will be propagated down the line and at some stage a reply would be generated and sent back as an acknowledgement. The inbound gateway, on receiving this reply, will create a JMS message and put it back on the reply destination queue. Then, where is the reply destination? The reply destination is decided in one of the following ways:

    1. Original message has a property JMSReplyTo, if it's present it has the highest precedence.
    2. The inbound gateway looks for a configured, default-reply-destination which can be configured either as a name or as a direct reference of a channel. For specifying channel as direct reference default-reply-destination tag should be used.

An exception will be thrown by the gateway if it does not find either of the preceding two ways.

The outbound gateway

Outbound gateways should be used in scenarios where a reply is expected for the send messages. Let's start with an example:

  reply-channel="jmsProcessedChannel" />

The preceding configuration will send messages to request-destination. When an acknowledgement is received, it can be fetched from the configured reply-destination. If reply-destination has not been configured, JMS TemporaryQueues will be created.


In this article, we covered out-of-the-box component provided by the Spring Integration framework such as aggregator. This article also showcased the simplicity and abstraction that Spring Integration provides when it comes to handling complicated integrations, be it file-based, HTTP, JMS, or any other integration mechanism.

Resources for Article:

Further resources on this subject:

You've been reading an excerpt of:

Spring Integration Essentials

Explore Title