Kafka simple Consumer and Consumer Group

Kafka topics can be consumed using a single-threaded Consumer or using a multi-threaded Consumer. In this tutorial we will learn the differences between them.

Let’s start from the simplest use case of Consumer, which consumes messages serially in a single thread:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      System.out.println("Enter topic name");
      return;
    }
    //Kafka consumer configuration settings       

    String topicName = args[0].toString();
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
    KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (props);

    //Kafka Consumer subscribes list of topics here.       

    consumer.subscribe(Arrays.asList(topicName))

    //print the topic name       

    System.out.println("Subscribed to topic " + topicName);
    int i = 0;
    while (true) {
      ConsumerRecords < String, String > records = con - sumer.poll(100);
      for (ConsumerRecord < String, String > record: records)

        // print the offset,key and value for the consumer records.   

        System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
    }
  }
}

A Consumer group, on the other hand, is a multi-threaded or multi-machine consumption from Kafka topics.

  • Consumers can join a group by using the same”group.id.”
  • The maximum parallelism of a group can be achieved when the number of consumers in the group equals to the number of partitions.
  • Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
  • Kafka also guarantees that a message is consumed by a single consumer in the group.
  • Kafka Consumers can see the message in the order they were stored in the log.

Here is an example of Consumer Group:

package com.masteringintegration.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class SampleKafkaConsumer {
  public static void main(String[] args) {
    String server = "127.0.0.1:9092";
    String groupId = "SampleKafkaConsumer";
    String topic = "testTopic";
    new SampleKafkaConsumer(server, groupId, topic).run();
  }

  // Variables     

  private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
  private final String mBootstrapServer;
  private final String mGroupId;
  private final String mTopic; // Constructor      SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {         mBootstrapServer = bootstrapServer;         mGroupId = groupId;         mTopic = topic;     }      
  // Public      

  void run() {
    mLogger.info("Creating consumer thread");
    CountDownLatch latch = new CountDownLatch(1);
    ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
    Thread thread = new Thread(consumerRunnable);
    thread.start();
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      mLogger.info("Caught shutdown hook");consumerRunnable.shutdown();await (latch);mLogger.info("Application has exited");
    }));
    await (latch);
  }

  // Private      

  void await (CountDownLatch latch) {
    try {
      latch.await();
    } catch (InterruptedException e) {
      mLogger.error("Application got interrupted", e);
    } finally {
      mLogger.info("Application is closing");
    }
  }

  // Inner classes      

  private class ConsumerRunnable implements Runnable {
    private CountDownLatch mLatch;
    private KafkaConsumer < String, String > mConsumer;
    ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
      mLatch = latch;
      Properties props = consumerProps(bootstrapServer, groupId);
      mConsumer = new KafkaConsumer < > (props);
      mConsumer.subscribe(Collections.singletonList(topic));
    }
    @Override public void run() {
      try {
        while (true) {
          ConsumerRecords < String, String > records = mConsumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord < String, String > record: records) {
            mLogger.info("Key: " + record.key() + ", Value: " + record.value());
            mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
          }
        }
      } catch (WakeupException e) {
        mLogger.info("Received shutdown signal!");
      } finally {
        mConsumer.close();
        mLatch.countDown();
      }
    }
    void shutdown() {
      mConsumer.wakeup();
    }
    private Properties consumerProps(String bootstrapServer, String groupId) {
      String deserializer = StringDeserializer.class.getName();
      Properties properties = new Properties();
      properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
      properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
      properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
      properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      return properties;
    }
  }
}

By adding more processes/threads will let Kafka to re-balance. That is, uf any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.