Commit in Kafka
By default, Kafka has enable.auto.commit=true
. This commits the offset for every 5 seconds. Apart from this, there are two other types of offset commit strategies. Lets discuss all the commit strategies:
Auto Commit - Default This is the simplest way to commit offsets. Kafka, by default, uses auto-commit â at every five seconds it commits the largest offset returned by the poll() method. poll() returns a set of messages with a timeout of 10 seconds, as we can see in the code:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<Long, String> message : messages) { // processed message }
The problem with auto-commit is that there is a very high chance of data loss in case of application failure. When poll() returns the messages, Kafka may commit the largest offset before processing messages.
Letâs say poll() returns 100 messages, and the consumer processes 60 messages when the auto-commit happens. Then, due to some failure, the consumer crashes. When a new consumer goes live to read messages, it commences reading from offset 101, resulting in the loss of messages between 61 and 100.
Thus, we need other ways where this drawback isnât present. The answer is manual commit.
Manual Sync Commit In manual commits, whether sync or async, itâs necessary to disable auto-commit by setting the default property to
false
Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
After disabling the manual commit, letâs now understand the use of
commitSync()
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); //process the messages consumer.commitSync();
This method prevents data loss by committing the offset only after processing the messages. However, it doesnât prevent duplicate reading when a consumer crashes before committing the offset. Besides this, it also impacts application performance.
The
commitSync()
blocks the code until it completes. Also, in case of an error, it keeps on retrying. This decreases the throughput of the application, which we donât want. So, Kafka provides another solution, async commit, that deals with these drawbacks.Async Commit Kafka provides
commitAsync()
to commit offsets asynchronously. It overcomes the performance overhead of manual sync commits by committing offsets in different threads. Letâs implement an async commit to understand this:KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); //process the messages consumer.commitAsync();
The problem with the async commit is that it doesnât retry in case of failure. It relies on the next call of
commitAsync()
, which will commit the latest offset.Suppose 300 is the largest offset we want to commit, but our commitAsync() fails due to some issue. It could be possible that before it retries, another call of commitAsync() commits the largest offset of 400 as it is asynchronous. When failed commitAsync() retries and if it commits offsets 300 successfully, it will overwrite the previous commit of 400, resulting in duplicate reading. That is why commitAsync() doesnât retry.
Commit Specific Offset Sometimes, we need to take more control over offsets. Letâs say weâre processing the messages in small batches and want to commit the offsets as soon as messages are processed. We can use the overloaded method of
commitSync()
andcommitAsync()
that takes a map argument to commit the specific offset:KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int messageProcessed = 0; while (true) { ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<Long, String> message : messages) { // processed one message messageProcessed++; currentOffsets.put( new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1)); if (messageProcessed%50==0){ consumer.commitSync(currentOffsets); } } }
In this code, we manage a currentOffsets map, which takes TopicPartition as key and OffsetAndMetadata as value. We insert the TopicPartition and OffsetAndMetadata of processed messages during message processing into the currentOffsets map. When the number of processed messages reaches fifty, we call commitSync() with the currentOffsets map to mark these messages as committed.
The behavior of this way is the same as sync and async commit. The only difference is that here weâre deciding the offsets to be committed not Kafka.
Last updated
Was this helpful?