Kafka topics can be consumed using a single-threaded Consumer or using a multi-threaded Consumer. In this tutorial we will learn the differences between them.
Let’s start from the simplest use case of Consumer, which consumes messages serially in a single thread:
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords < String, String > records = con - sumer.poll(100); for (ConsumerRecord < String, String > record: records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value()); } } }
A Consumer group, on the other hand, is a multi-threaded or multi-machine consumption from Kafka topics.
- Consumers can join a group by using the same”group.id.”
- The maximum parallelism of a group can be achieved when the number of consumers in the group equals to the number of partitions.
- Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
- Kafka also guarantees that a message is consumed by a single consumer in the group.
- Kafka Consumers can see the message in the order they were stored in the log.
Here is an example of Consumer Group:
package com.masteringintegration.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class SampleKafkaConsumer { public static void main(String[] args) { String server = "127.0.0.1:9092"; String groupId = "SampleKafkaConsumer"; String topic = "testTopic"; new SampleKafkaConsumer(server, groupId, topic).run(); } // Variables private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName()); private final String mBootstrapServer; private final String mGroupId; private final String mTopic; // Constructor SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) { mBootstrapServer = bootstrapServer; mGroupId = groupId; mTopic = topic; } // Public void run() { mLogger.info("Creating consumer thread"); CountDownLatch latch = new CountDownLatch(1); ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch); Thread thread = new Thread(consumerRunnable); thread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { mLogger.info("Caught shutdown hook");consumerRunnable.shutdown();await (latch);mLogger.info("Application has exited"); })); await (latch); } // Private void await (CountDownLatch latch) { try { latch.await(); } catch (InterruptedException e) { mLogger.error("Application got interrupted", e); } finally { mLogger.info("Application is closing"); } } // Inner classes private class ConsumerRunnable implements Runnable { private CountDownLatch mLatch; private KafkaConsumer < String, String > mConsumer; ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) { mLatch = latch; Properties props = consumerProps(bootstrapServer, groupId); mConsumer = new KafkaConsumer < > (props); mConsumer.subscribe(Collections.singletonList(topic)); } @Override public void run() { try { while (true) { ConsumerRecords < String, String > records = mConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord < String, String > record: records) { mLogger.info("Key: " + record.key() + ", Value: " + record.value()); mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset()); } } } catch (WakeupException e) { mLogger.info("Received shutdown signal!"); } finally { mConsumer.close(); mLatch.countDown(); } } void shutdown() { mConsumer.wakeup(); } private Properties consumerProps(String bootstrapServer, String groupId) { String deserializer = StringDeserializer.class.getName(); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return properties; } } }
By adding more processes/threads will let Kafka to re-balance. That is, uf any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.