Consuming and Producing Kafka Messages from Camel

In this article we will learn how to connect to Apache Kafka cluster from a Camel route using the Camel Kafka Consumer and Producer.

Perform the following preliminary actions:

Start the Zookeeper Server

Start the ZooKeeper server. Kafka provides a simple ZooKeeper configuration file to launch a single ZooKeeper instance. To install the ZooKeeper instance, use the following command:

$ bin/zookeeper-server-start.sh config/zookeeper.properties 

Check from the logs that Zookeeper started:

[2020-04-14 19:42:12,456] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-04-14 19:42:12,470] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2020-04-14 19:42:12,475] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-04-14 19:42:12,480] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-04-14 19:42:12,496] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

Start Apache Kafka

Now, start the Kafka broker with the following command:

$ bin/kafka-server-start.sh config/server.properties 

Now that Apache Kafka is started, create a Topic that will be using in our application:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic

Create the Camel Project

You can create a Camel 3 project using its archetype:

$ mvn archetype:generate \ 
-DarchetypeGroupId=org.apache.camel.archetypes \ 
-DarchetypeArtifactId=camel-archetype-java \ 
-DarchetypeVersion=3.0.0 \ 
-DgroupId=com.sample.camel \ 
-DartifactId=camel-kafka \ 
-Dversion=1.0-SNAPSHOT

A basic project will be created. Now replace the RouteBuilder class with the following one, which sends the messages contained in the data folder to the Kafka topic named ‘myTopic’.

Here is the Main class:

package com.sample.camel;

import org.apache.camel.main.Main;

/**
 * A Camel Application
 */
public class MainApp {

    /**
     * A main() so we can easily run these routing rules in our IDE
     */
    public static void main(String... args) throws Exception {
        Main main = new Main();
        main.configure().addRoutesBuilder(new MyRouteBuilder());
        main.run(args);
    }

}

At the same time, we will be consuming messages with a simple Kafka Consumer:

package com.sample.camel;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;

/**
 * A Camel Java DSL Router
 */
public class MyRouteBuilder extends RouteBuilder {

    /**
     * Let's configure the Camel routing rules using Java code 
     */
    @Override
    public void configure() throws Exception {

        // Kafka Producer using Message key
        from("file:src/data?noop=true")
                 .setHeader(KafkaConstants.KEY, constant("Camel")) 
                .to("kafka:myTopic?brokers=localhost:9092");

        // Kafka Consumer
        from("kafka:myTopic?brokers=localhost:9092")
                .log("Message received from Kafka : ${body}")
                .log("    on the topic ${headers[kafka.TOPIC]}")
                .log("    on the partition ${headers[kafka.PARTITION]}")
                .log("    with the offset ${headers[kafka.OFFSET]}")
                .log("    with the key ${headers[kafka.KEY]}");
    }

}

That’s all. run the project. Run the main class:

$ mvn clean install exec:java

and verify that the messages are logged on the Console:

[ad #1 - KafkaConsumer[myTopic]] route2 INFO Message received from Kafka : <?xml version="1.0" encoding="UTF-8"?>
<person user="james">
<firstName>James</firstName>
<lastName>Strachan</lastName>
<city>London</city>
</person>
[ad #1 - KafkaConsumer[myTopic]] route2 INFO on the topic myTopic
[ad #1 - KafkaConsumer[myTopic]] route2 INFO on the partition 0
[ad #1 - KafkaConsumer[myTopic]] route2 INFO with the offset 2
[ad #1 - KafkaConsumer[myTopic]] route2 INFO with the key Camel
[ad #1 - KafkaConsumer[myTopic]] route2 INFO Message received from Kafka : <?xml version="1.0" encoding="UTF-8"?>
<person user="hiram">
<firstName>Hiram</firstName>
<lastName>Chirino</lastName>
<city>Tampa</city>
</person>
[ad #1 - KafkaConsumer[myTopic]] route2 INFO on the topic myTopic
[ad #1 - KafkaConsumer[myTopic]] route2 INFO on the partition 0
[ad #1 - KafkaConsumer[myTopic]] route2 INFO with the offset 3
[ad #1 - KafkaConsumer[myTopic]] route2 INFO with the key Camel

Source code for this tutorial: https://github.com/fmarchioni/masterspringboot/tree/master/camel/camel-kafka