How Kafka commits messages



Every message your producers send to a Kafka partition has an offset—a sequential index number that identifies each message. To keep track of which messages have already been processed, your consumer needs to commit the offsets of the messages that were processed.

How Kafka commits messages

There are different ways in which commit can happen and each way has its own pros andcons. Let's see them in detail:

Auto-committing messages

The default configuration of the consumer is to auto-commit messages. Consumer auto-commits the offset of the latest read messages at the configured interval of time. If we make enable.auto.commit = true and set auto.commit.interval.ms=2000 , then consumer will commit the offset every two seconds. There are certain risks associated with this option. For example, you set the interval to 10 seconds and consumer starts consuming the data. At the seventh second, your consumer fails, what will happen? Consumer hasn't committed the read offset yet so when it starts again, it will start reading from the start of the last committed offset and this will lead to duplicates.

Using Current offset commit

Most of the time, we may want to have control over committing an offset when required. Kafka provides you with an API to enable this feature. We first need to do enable.auto.commit = false and then use the commitSync() method to call a commit offset from the consumer thread. This will commit the latest offset returned by polling. It would be better to use this method call after we process all instances of ConsumerRecord , otherwise there is a risk of losing records if consumer fails in between.

while (true) {
	ConsumerRecords<String, String> records = consumer.poll(2);
	for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %sn",
	record.offset(), record.key(), record.value());
	try {
		consumer.commitSync();
	} catch (CommitFailedException ex) {
		//Logger or code to handle failed commit
	}
}

Asynchronous commit

The problem with synchronous commit is that unless we receive an acknowledgment for a commit offset request from the Kafka server, consumer will be blocked. This will cause low throughput. It can be done by making commit happen asynchronously. However, there is a problem in asynchronous commit--it may lead to duplicate message processing in a few cases where the order of the commit offset changes. For example, offset of message 10 got committed before offset of message 5. In this case, Kafka will again serve message 5-10 to consumer as the latest offset 10 is overridden by 5.

while (true) {
	ConsumerRecords<String, String> records = consumer.poll(2);
	for (ConsumerRecord<String, String> record : records)
	System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());

	consumer.commitAsync(new OffsetCommitCallback() {
		public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
		}
	});
}
FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials
Cookie Policy

Cookie Policy This website uses cookies that are necessary to its functioning and required to achieve the purposes illustrated in the privacy policy. By accepting this OR scrolling this page OR continuing to browse, you agree to our privacy policy.

© 2020 Your Company. All Rights Reserved. Designed By JoomShaper

Please publish modules in offcanvas position.