Kafka Tutorial: Creating a Java Producer and Consumer

In this tutorial we will learn how to set up a Maven project to run a Kafka Java Consumer and Producer.

This project is composed of the following Classes:

  • SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
  • SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic

It is required that you start a single Node Kafka cluster as discussed in this tutorial: Getting started with Apache Kafka

Let’s start from the SampleKafkaProducer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// Example class sending text Messages to Kafka cluster

public class SampleKafkaProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";
    public static final String TOPIC = "testTopic";
    public static final int MESSAGES = 100;

    public SampleKafkaProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public static void main( String[] args )
    {

        boolean isAsync = false;
        SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync);
        // start the producer
        producer.start();
    }

    public void run() {
        int messageNo = 1;
        while (messageNo < MESSAGES) {
            String messageStr = "This is Message number:" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new MessageCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class MessageCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public MessageCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }


    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

This class extends Thread so that each message is sent asynchronously to a Topic named “testTopic”.

Let’s check now the Kafka Consumer class:

package com.masteringintegration.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class SampleKafkaConsumer {
  public static void main(String[] args) {
    String server = "127.0.0.1:9092";
    String groupId = "SampleKafkaConsumer";
    String topic = "testTopic";
    new SampleKafkaConsumer(server, groupId, topic).run();
  }

  // Variables      

  private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
  private final String mBootstrapServer;
  private final String mGroupId;
  private final String mTopic;

  // Constructor      

  SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {
    mBootstrapServer = bootstrapServer;
    mGroupId = groupId;
    mTopic = topic;
  }

  // Public      

  void run() {
    mLogger.info("Creating consumer thread");
    CountDownLatch latch = new CountDownLatch(1);
    ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
    Thread thread = new Thread(consumerRunnable);
    thread.start();
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      mLogger.info("Caught shutdown hook");consumerRunnable.shutdown();await (latch);mLogger.info("Application has exited");
    }));
    await (latch);
  }

  // Private      

  void await (CountDownLatch latch) {
    try {
      latch.await();
    } catch (InterruptedException e) {
      mLogger.error("Application got interrupted", e);
    } finally {
      mLogger.info("Application is closing");
    }
  }

  // Inner classes      

  private class ConsumerRunnable implements Runnable {
    private CountDownLatch mLatch;
    private KafkaConsumer < String, String > mConsumer;
    ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
      mLatch = latch;
      Properties props = consumerProps(bootstrapServer, groupId);
      mConsumer = new KafkaConsumer < > (props);
      mConsumer.subscribe(Collections.singletonList(topic));
    }
    @Override public void run() {
      try {
        while (true) {
          ConsumerRecords < String, String > records = mConsumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord < String, String > record: records) {
            mLogger.info("Key: " + record.key() + ", Value: " + record.value());
            mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
          }
        }
      } catch (WakeupException e) {
        mLogger.info("Received shutdown signal!");
      } finally {
        mConsumer.close();
        mLatch.countDown();
      }
    }
    void shutdown() {
      mConsumer.wakeup();
    }
    private Properties consumerProps(String bootstrapServer, String groupId) {
      String deserializer = StringDeserializer.class.getName();
      Properties properties = new Properties();
      properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
      properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
      properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
      properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      return properties;
    }
  }
}

The consumer class implements Runnable so that messages can be consumed asynchronously as well.

In order to build the project we have included the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      
   <modelVersion>4.0.0</modelVersion>
       
   <groupId>com.masteringintegration.kafka</groupId>
      
   <artifactId>kafka-demo</artifactId>
      
   <version>1.0-SNAPSHOT</version>
       
   <name>kafka-demo</name>
      
   <!-- FIXME change it to the project's website -->
      
   <url>http://www.example.com</url>
       
   <properties>
           
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           
      <maven.compiler.source>1.8</maven.compiler.source>
           
      <maven.compiler.target>1.8</maven.compiler.target>
         
   </properties>
       
   <dependencies>
           
      <dependency>
                
         <groupId>org.apache.kafka</groupId>
                
         <artifactId>kafka-clients</artifactId>
                
         <version>2.5.0</version>
              
      </dependency>
             
      <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
           
      <dependency>
                
         <groupId>org.slf4j</groupId>
                
         <artifactId>slf4j-simple</artifactId>
                
         <version>1.7.25</version>
              
      </dependency>
          
   </dependencies>
       
   <build>
           
      <plugins>
                
         <plugin>
                     
            <groupId>org.codehaus.mojo</groupId>
                     
            <artifactId>exec-maven-plugin</artifactId>
                     
            <version>1.6.0</version>
                     
            <executions>
                          
               <execution>
                               
                  <id>consumer</id>
                               
                  <configuration>
                                    
                     <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass>
                                  
                  </configuration>
                             
               </execution>
                          
               <execution>
                               
                  <id>producer</id>
                               
                  <configuration>
                                    
                     <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass>
                                  
                  </configuration>
                             
               </execution>
                        
            </executions>
                   
         </plugin>
              
      </plugins>
         
   </build>
    
</project>

As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:

mvn exec:java@producer

The messages will be sent to the destination:

Sent message: (1, This is Message number:1) Sent message: (2, This is Message number:2) Sent message: (3, This is Message number:3) Sent message: (4, This is Message number:4) Sent message: (5, This is Message number:5) Sent message: (6, This is Message number:6) Sent message: (7, This is Message number:7) Sent message: (8, This is Message number:8) Sent message: (9, This is Message number:9) Sent message: (10, This is Message number:10)  . . . . .

Now start the Consumer application:

mvn exec:java@consumer

You will see that messages are consumed from the Topic:

[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:1 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24276 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:2 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24277 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:3 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24278 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:4 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24279 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:5 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24280 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:6 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24281 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:7 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24282 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: This is Message number:8 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24283 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 	, Value: This is Message number:9 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24284 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:  , Value: This is Message number:10  . . . . .

Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer. Continue your Kafka learning path with the following tutorial: Kafka Tutorial: Creating a Java Producer and Consumer using Serializers and Deserializers

Source code for this tutorial: https://github.com/fmarchioni/masterspringboot/tree/master/camel/kafka/java