How to aggregate Messages in Camel

In Apache Camel, message aggregation involves combining multiple messages into a single message based on certain criteria. This tutorial will guide you through the process of aggregating messages using a custom aggregation strategy.

Prerequisites

Before starting, ensure you have the following:

  • Basic understanding of Apache Camel
  • Apache Camel project set up and running

Step 1: Create a Custom Aggregation Strategy

  1. Create a new Java class for your custom aggregation strategy, let’s name it MyAggregationStrategy.
  2. Implement the org.apache.camel.AggregationStrategy interface. This interface requires you to implement the aggregate method, which defines the logic for combining messages.
  3. Customize the aggregate method to meet your requirements. You can access and modify the incoming and old exchanges to build the aggregated message. Here’s an example implementation:
package com.sample.aggregation;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;


public class MyOwnAggregationStrategy implements AggregationStrategy {
   
    private static final String SEPARATOR = System.getProperty( "line.separator" );
    

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            oldExchange.getIn().setBody(oldBody + SEPARATOR + newBody);
            return oldExchange;
    }
}

Our custom Aggregation Strategy overrides the aggregate method. Within it, it receives two Exchange objects: oldExchange and newExchange. These represent the old and new message exchanges being processed.

The condition if (oldExchange == null) checks if this is the first message being processed. If so, it simply returns the newExchange as is, since there is no previous message to aggregate with.

If there is an existing oldExchange, it concatenates the old and new bodies using the line separator.

Finally, it sets the aggregated body back to the oldExchange using getIn().setBody(...). The original oldExchange is modified in place to store the aggregated result.

Step 2: Configure the Aggregation Route

  1. Open your Camel route .
  2. Add the necessary route configuration to aggregate messages using your custom aggregation strategy. Here’s an example route configuration:
CamelContext context = new DefaultCamelContext();

context.addRoutes(new RouteBuilder() {
    @Override

    public void configure() throws Exception {
        from("direct:start")
                .aggregate(header("myHeader"), new MyOwnAggregationStrategy())
                .completionTimeout(3000)
                .completionSize(10)

                .to("direct:b");

        from("direct:b")
                .choice()
                .when(header("myHeader").isEqualTo("groupA"))
                .log("Aggregated messages for Group A: ${body}")
                .otherwise()
                .log("Aggregated messages for Group B: ${body}");

    }
});

context.start();

As you can see from this example, we are aggregating messages based on the value of the Header field “myHeader”. Each Message will go through the our custom AggregationStrategy .

Besides, we are setting a Timeout (3000 ms) and a completionSize for our aggregation that will be completed when we are 10 messages in it.

These messages will go to the second route (direct:b) where we are taking different actions depending on the value of the Header field. In our case, we simply log the aggregated message.

Step 3: Test the Aggregation

To test the aggregation, we can simply use a ProducerTemplate and send in a loop a bunch of messages with different Headers. For example:

ProducerTemplate producerTemplate = context.createProducerTemplate();

for (int i = 1; i <= 10; i++) {
    // Send messages for Group A
    producerTemplate.sendBodyAndHeader("direct:start", "Message A" + i, "myHeader", "groupA");

    // Send messages for Group B
    producerTemplate.sendBodyAndHeader("direct:start", "Message B" + i, "myHeader", "groupB");
}

As you can see from the Console output, the Aggregated messages are printed on the Console:

camel aggregation example

Conclusion

Congratulations! You have successfully learned how to aggregate messages with Apache Camel using a custom aggregation strategy. By implementing and configuring your own strategy, you can tailor the aggregation logic to match your specific requirements.

Apache Camel provides a flexible and powerful framework for message aggregation, allowing you to combine, transform, and process messages effectively.

Found the article helpful? if so please follow us on Socials
Twitter Icon       Facebook Icon       LinkedIn Icon       Mastodon Icon