Author Archives: admin

How to use Camel Kafka Component by example

In this article we will learn how to connect to Apache Kafka cluster from a Camel route using the Camel Kafka Consumer and Producer.

Start the Zookeeper Server

Pre-requisites: You need an available Apache Kafka server. To learn how to install it, you can check this article: Getting started with Apache Kafka

The first thing we need to do, is bootstraping the ZooKeeper server.

Zookeeper is a service which maintains naming and configuration data to provide flexible and robust synchronization within distributed systems. Zookeeper keeps track of status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions etc.

Kafka provides a simple Zookeeper configuration file to launch a single Zookeeper instance. To start the Zookeeper instance, use the following command:

$ bin/zookeeper-server-start.sh config/zookeeper.properties 

Check from the logs that Zookeeper is up and running:

[2022-05-10 19:33:16,616] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 2 selector thread(s), 24 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-05-10 19:33:16,620] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-05-10 19:33:16,637] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2022-05-10 19:33:16,637] INFO Using org.apache.zookeeper

Start Apache Kafka

Next, start the Kafka broker with the following command:

$ bin/kafka-server-start.sh config/server.properties 

Then, we will create a Topic that will be using in our application:

$ bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic myTopic --partitions 1 --replication-factor 1 --if-not-exists

Create the Camel Project

You can create a Camel 3 project using its archetype:

$ mvn archetype:generate \ 
-DarchetypeGroupId=org.apache.camel.archetypes \ 
-DarchetypeArtifactId=camel-archetype-java \ 
-DarchetypeVersion=3.10.0 \ 
-DgroupId=com.sample.camel \ 
-DartifactId=camel-kafka \ 
-Dversion=1.0-SNAPSHOT

The archetype will create for you a simple Camel standalone project. Firstly, replace the RouteBuilder class with the following one, which sends the messages from the data folder to the Kafka topic named ‘myTopic‘.

Here is the Main class:

package com.sample.camel;

import org.apache.camel.main.Main;

/**
 * A Camel Application
 */
public class MainApp {

    /**
     * A main() so we can easily run these routing rules in our IDE
     */
    public static void main(String... args) throws Exception {
        Main main = new Main();
        main.configure().addRoutesBuilder(new MyRouteBuilder());
        main.run(args);
    }

}

Next, let’s code the RouteBuilder Class which will consume messages from Kafka:

package com.sample.camel;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;

/**
 * A Camel Java DSL Router
 */
public class MyRouteBuilder extends RouteBuilder {

    /**
     * Let's configure the Camel routing rules using Java code 
     */
    @Override
    public void configure() throws Exception {

        // Kafka Producer using Message key
        from("file:src/data?noop=true")
                 .setHeader(KafkaConstants.KEY, constant("Camel")) 
                .to("kafka:myTopic?brokers=localhost:9092");

        // Kafka Consumer
        from("kafka:myTopic?brokers=localhost:9092")
                .log("Message received from Kafka : ${body}")
                .log("    on the topic ${headers[kafka.TOPIC]}")
                .log("    on the partition ${headers[kafka.PARTITION]}")
                .log("    with the offset ${headers[kafka.OFFSET]}")
                .log("    with the key ${headers[kafka.KEY]}");
    }

}

That’s all. run the project. Run the main class:

$ mvn clean install exec:java

Next, verify that the messages are logged on the Console:

Besides, you can check the Partition Offset for the topic from the Apache Kafka Console:

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic myTopic

myTopic:0:2

Conclusion

In this article we walked through a simple example of Camel standalone application which connects to an Apache Kafka cluster and starts producing and consuming messages to/from a topic

Source code for this tutorial: https://github.com/fmarchioni/masterspringboot/tree/master/camel/camel-kafka