Camel Kafka with Spring Boot example

This article shows how you can create a Spring Boot application which uses Camel Kafka Component to send and consume messages from a Kafka Topic

The Kafka Component allows to connect to a Kafka cluster and produce and send messages from within a Camel Route. In order to use it from a Spring Boot application.

In order to use the Kafka component, you have to include the camel-kafka-starter in your project:

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-kafka-starter</artifactId>
</dependency>

Besides, you can configure the list of Brokers in your configuration file (f.e. application.properties) through the following properties:

camel.component.kafka.brokers =localhost:9092

In our basic example, we have used a single Kafka node. In real world project you will use a comma-separated list of nodes and port (f.e. host1:port1,host2:port2). The list can be a subset of brokers or a VIP pointing to a subset of brokers.

Starting Kafka

Before coding our sample Spring Boot application, we need to start a Kafka Cluster. Please refer to this article to learn more about it: Getting started with Apache Kafka

Firstly, start Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties 

Next start the Kafka node:

bin/kafka-server-start.sh config/server.properties 

Make sure that the Kafka broker is running:

Coding the Camel Route

As said, our Spring Boot project will need the Kafka Starter. Within our pom.xml, we recommend including both the Spring Boot BOM and the Camel Spring Boot BOM to simplify dependencies management:

<dependencyManagement>
	<dependencies>
		<!-- Spring Boot BOM -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>${spring.boot-version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<!-- Camel BOM -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-dependencies</artifactId>
			<version>${camel-version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>
<dependencies>
	<!-- Camel -->
	<dependency>
		<groupId>org.apache.camel.springboot</groupId>
		<artifactId>camel-spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.camel.springboot</groupId>
		<artifactId>camel-stream-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.camel.springboot</groupId>
		<artifactId>camel-kafka-starter</artifactId>
	</dependency>
	<!-- Test -->
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.camel</groupId>
		<artifactId>camel-test-spring-junit5</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

To test the kafka component, we will add a simple Route which has both Producer and a Consumer from a Kafka Topic. Also, please note that by Kafka will create by default the Topic if that is not available.

package com.sample;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
 
@Component
public class MySpringBootRouter extends RouteBuilder {

     @Override
    public void configure() throws Exception {

       // Kafka Producer
        from("timer://foo?period=1000")
        .to("kafka:myTopic?brokers=localhost:9092");
        
       // Kafka Consumer
        from("kafka:myTopic?brokers=localhost:9092")
        .log("Message received from Kafka : ${body}")
        .log("    on the topic ${headers[kafka.TOPIC]}")
        .log("    on the partition ${headers[kafka.PARTITION]}")
        .log("    with the offset ${headers[kafka.OFFSET]}")
        .log("    with the key ${headers[kafka.KEY]}");

    }
}

Please note, this Route is tagged as @Component. Therefore, Camel will activate it when the Spring Boot application is ready.

To complete our example, we just need to add a SpringBoot Application Class:

package com.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MySpringBootApplication {

    /**
     * A main method to start this application.
     */
    public static void main(String[] args) {
        SpringApplication.run(MySpringBootApplication.class, args);
    }

}

Next, run the application:

mvn install spring-boot:run

You will see from the Console that a Message was received from Kafka:

If you switch back to the Kafka file system, you can check the list of Topics using the following shell command:

bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 myTopic 
__consumer_offsets
myTopic

As you can see from the output, the Topic “myTopic” is listed among the Kafka Topics. You can also check the Partition Offset for that topic using the following shell command:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic myTopic
myTopic:0:2

Conclusion

This article was a walk through a simple Camel Spring Boot application which uses the Kafka Component to produce and consume messages from/to a Topic

Source code: https://github.com/fmarchioni/masterspringboot/tree/master/camel/camel-spring-kafka

Leave a Reply