Kafka Tutorial: Creating a Java Producer and Consumer



In this tutorial we will learn how to set up a Maven project to run a Kafka Java Consumer and Producer.

This project is composed of the following Classes:

  • SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
  • SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic

It is required that you start a single Node Kafka cluster as discussed in this tutorial: Getting started with Apache Kafka

Let's start from the SampleKafkaProducer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// Example class sending text Messages to Kafka cluster

public class SampleKafkaProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";
    public static final String TOPIC = "testTopic";
    public static final int MESSAGES = 100;

    public SampleKafkaProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public static void main( String[] args )
    {

        boolean isAsync = false;
        SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync);
        // start the producer
        producer.start();
    }

    public void run() {
        int messageNo = 1;
        while (messageNo < MESSAGES) {
            String messageStr = "This is Message number:" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new MessageCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class MessageCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public MessageCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }


    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

This class extends Thread so that each message is sent asynchronously to a Topic named "testTopic".

Let's check now the Kafka Consumer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class SampleKafkaConsumer {

    public static void main(String[] args) {
        String server = "127.0.0.1:9092";
        String groupId = "SampleKafkaConsumer";
        String topic = "testTopic";

        new SampleKafkaConsumer(server, groupId, topic).run();
    }

    // Variables

    private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
    private final String mBootstrapServer;
    private final String mGroupId;
    private final String mTopic;

    // Constructor

    SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {
        mBootstrapServer = bootstrapServer;
        mGroupId = groupId;
        mTopic = topic;
    }

    // Public

    void run() {
        mLogger.info("Creating consumer thread");

        CountDownLatch latch = new CountDownLatch(1);

        ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
        Thread thread = new Thread(consumerRunnable);
        thread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            mLogger.info("Caught shutdown hook");
            consumerRunnable.shutdown();
            await(latch);

            mLogger.info("Application has exited");
        }));

        await(latch);
    }

    // Private

    void await(CountDownLatch latch) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            mLogger.error("Application got interrupted", e);
        } finally {
            mLogger.info("Application is closing");
        }
    }

    // Inner classes

    private class ConsumerRunnable implements Runnable {

        private CountDownLatch mLatch;
        private KafkaConsumer<String, String> mConsumer;

        ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
            mLatch = latch;

            Properties props = consumerProps(bootstrapServer, groupId);
            mConsumer = new KafkaConsumer<>(props);
            mConsumer.subscribe(Collections.singletonList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = mConsumer.poll(Duration.ofMillis(100));

                    for (ConsumerRecord<String, String> record : records) {
                        mLogger.info("Key: " + record.key() + ", Value: " + record.value());
                        mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                    }
                }
            } catch (WakeupException e) {
                mLogger.info("Received shutdown signal!");
            } finally {
                mConsumer.close();
                mLatch.countDown();
            }
        }

        void shutdown() {
            mConsumer.wakeup();
        }

        private Properties consumerProps(String bootstrapServer, String groupId) {
            String deserializer = StringDeserializer.class.getName();
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            return properties;
        }
    }
}

The consumer class implements Runnable so that messages can be consumed asynchronously as well.

In order to build the project we have included the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.masteringintegration.kafka</groupId>
  <artifactId>kafka-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>kafka-demo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.25</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
          <execution>
            <id>consumer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass>
            </configuration>
          </execution>
          <execution>
            <id>producer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:

mvn exec:java@producer

The messages will be sent to the destination:

Sent message: (1, This is Message number:1)
Sent message: (2, This is Message number:2)
Sent message: (3, This is Message number:3)
Sent message: (4, This is Message number:4)
Sent message: (5, This is Message number:5)
Sent message: (6, This is Message number:6)
Sent message: (7, This is Message number:7)
Sent message: (8, This is Message number:8)
Sent message: (9, This is Message number:9)
Sent message: (10, This is Message number:10)
 . . . . .

Now start the Consumer application:

mvn exec:java@consumer

You will see that messages are consumed from the Topic:

[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:1
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24276
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:2
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24277
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:3
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24278
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:4
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24279
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:5
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24280
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:6
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24281
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:7
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24282
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: This is Message number:8
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24283
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 	, Value: This is Message number:9
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24284
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 
, Value: This is Message number:10
 . . . . .

Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer. Continue your Kafka learning path with the following tutorial: Kafka Tutorial: Creating a Java Producer and Consumer using Serializers and Deserializers

Source code for this tutorial: https://github.com/fmarchioni/masteringintegration/tree/master/kafka/java

 

FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials
Cookie Policy

Cookie Policy This website uses cookies that are necessary to its functioning and required to achieve the purposes illustrated in the privacy policy. By accepting this OR scrolling this page OR continuing to browse, you agree to our privacy policy.

© 2020 Your Company. All Rights Reserved. Designed By JoomShaper

Please publish modules in offcanvas position.