In this tutorial we will learn how to set up a Maven project to run a Kafka Java Consumer and Producer.
This project is composed of the following Classes:
- SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
- SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic
It is required that you start a single Node Kafka cluster as discussed in this tutorial: Getting started with Apache Kafka
Let’s start from the SampleKafkaProducer class:
package com.masteringintegration.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; // Example class sending text Messages to Kafka cluster public class SampleKafkaProducer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; public static final String CLIENT_ID = "SampleProducer"; public static final String TOPIC = "testTopic"; public static final int MESSAGES = 100; public SampleKafkaProducer(String topic, Boolean isAsync) { Properties properties = new Properties(); properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT); properties.put("client.id", CLIENT_ID); properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(properties); this.topic = topic; this.isAsync = isAsync; } public static void main( String[] args ) { boolean isAsync = false; SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync); // start the producer producer.start(); } public void run() { int messageNo = 1; while (messageNo < MESSAGES) { String messageStr = "This is Message number:" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new MessageCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); // handle the exception } } ++messageNo; } } } class MessageCallBack implements Callback { private final long startTime; private final int key; private final String message; public MessageCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }
This class extends Thread so that each message is sent asynchronously to a Topic named “testTopic”.
Let’s check now the Kafka Consumer class:
package com.masteringintegration.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class SampleKafkaConsumer { public static void main(String[] args) { String server = "127.0.0.1:9092"; String groupId = "SampleKafkaConsumer"; String topic = "testTopic"; new SampleKafkaConsumer(server, groupId, topic).run(); } // Variables private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName()); private final String mBootstrapServer; private final String mGroupId; private final String mTopic; // Constructor SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) { mBootstrapServer = bootstrapServer; mGroupId = groupId; mTopic = topic; } // Public void run() { mLogger.info("Creating consumer thread"); CountDownLatch latch = new CountDownLatch(1); ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch); Thread thread = new Thread(consumerRunnable); thread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { mLogger.info("Caught shutdown hook");consumerRunnable.shutdown();await (latch);mLogger.info("Application has exited"); })); await (latch); } // Private void await (CountDownLatch latch) { try { latch.await(); } catch (InterruptedException e) { mLogger.error("Application got interrupted", e); } finally { mLogger.info("Application is closing"); } } // Inner classes private class ConsumerRunnable implements Runnable { private CountDownLatch mLatch; private KafkaConsumer < String, String > mConsumer; ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) { mLatch = latch; Properties props = consumerProps(bootstrapServer, groupId); mConsumer = new KafkaConsumer < > (props); mConsumer.subscribe(Collections.singletonList(topic)); } @Override public void run() { try { while (true) { ConsumerRecords < String, String > records = mConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord < String, String > record: records) { mLogger.info("Key: " + record.key() + ", Value: " + record.value()); mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset()); } } } catch (WakeupException e) { mLogger.info("Received shutdown signal!"); } finally { mConsumer.close(); mLatch.countDown(); } } void shutdown() { mConsumer.wakeup(); } private Properties consumerProps(String bootstrapServer, String groupId) { String deserializer = StringDeserializer.class.getName(); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return properties; } } }
The consumer class implements Runnable so that messages can be consumed asynchronously as well.
In order to build the project we have included the following pom.xml file:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.masteringintegration.kafka</groupId> <artifactId>kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <name>kafka-demo</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.6.0</version> <executions> <execution> <id>consumer</id> <configuration> <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass> </configuration> </execution> <execution> <id>producer</id> <configuration> <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:
mvn exec:java@producer
The messages will be sent to the destination:
Sent message: (1, This is Message number:1) Sent message: (2, This is Message number:2) Sent message: (3, This is Message number:3) Sent message: (4, This is Message number:4) Sent message: (5, This is Message number:5) Sent message: (6, This is Message number:6) Sent message: (7, This is Message number:7) Sent message: (8, This is Message number:8) Sent message: (9, This is Message number:9) Sent message: (10, This is Message number:10) . . . . .
Now start the Consumer application:
mvn exec:java@consumer
You will see that messages are consumed from the Topic:
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:1 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24276 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:2 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24277 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:3 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24278 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:4 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24279 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:5 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24280 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:6 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24281 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:7 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24282 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: This is Message number:8 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24283 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:9 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24284 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:10 . . . . .
Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer. Continue your Kafka learning path with the following tutorial: Kafka Tutorial: Creating a Java Producer and Consumer using Serializers and Deserializers
Source code for this tutorial: https://github.com/fmarchioni/masterspringboot/tree/master/camel/kafka/java