Commit in Kafka

By default, Kafka has enable.auto.commit=true. This commits the offset for every 5 seconds. Apart from this, there are two other types of offset commit strategies. Lets discuss all the commit strategies:

  1. Auto Commit - Default This is the simplest way to commit offsets. Kafka, by default, uses auto-commit – at every five seconds it commits the largest offset returned by the poll() method. poll() returns a set of messages with a timeout of 10 seconds, as we can see in the code:

    KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(KafkaConfigProperties.getTopic());
    
    ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
    
    for (ConsumerRecord<Long, String> message : messages) {
      // processed message
    }

    The problem with auto-commit is that there is a very high chance of data loss in case of application failure. When poll() returns the messages, Kafka may commit the largest offset before processing messages.

    Let’s say poll() returns 100 messages, and the consumer processes 60 messages when the auto-commit happens. Then, due to some failure, the consumer crashes. When a new consumer goes live to read messages, it commences reading from offset 101, resulting in the loss of messages between 61 and 100.

    Thus, we need other ways where this drawback isn’t present. The answer is manual commit.

  2. Manual Sync Commit In manual commits, whether sync or async, it’s necessary to disable auto-commit by setting the default property to false

    Properties props = new Properties();
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    After disabling the manual commit, let’s now understand the use of commitSync()

    KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(KafkaConfigProperties.getTopic());
    ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
      //process the messages
    consumer.commitSync();

    This method prevents data loss by committing the offset only after processing the messages. However, it doesn’t prevent duplicate reading when a consumer crashes before committing the offset. Besides this, it also impacts application performance.

    The commitSync() blocks the code until it completes. Also, in case of an error, it keeps on retrying. This decreases the throughput of the application, which we don’t want. So, Kafka provides another solution, async commit, that deals with these drawbacks.

  3. Async Commit Kafka provides commitAsync() to commit offsets asynchronously. It overcomes the performance overhead of manual sync commits by committing offsets in different threads. Let’s implement an async commit to understand this:

    KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(KafkaConfigProperties.getTopic()); 
    ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
      //process the messages
    consumer.commitAsync();

    The problem with the async commit is that it doesn’t retry in case of failure. It relies on the next call of commitAsync(), which will commit the latest offset.

    Suppose 300 is the largest offset we want to commit, but our commitAsync() fails due to some issue. It could be possible that before it retries, another call of commitAsync() commits the largest offset of 400 as it is asynchronous. When failed commitAsync() retries and if it commits offsets 300 successfully, it will overwrite the previous commit of 400, resulting in duplicate reading. That is why commitAsync() doesn’t retry.

  4. Commit Specific Offset Sometimes, we need to take more control over offsets. Let’s say we’re processing the messages in small batches and want to commit the offsets as soon as messages are processed. We can use the overloaded method of commitSync() and commitAsync() that takes a map argument to commit the specific offset:

    KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(KafkaConfigProperties.getTopic());
    Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    int messageProcessed = 0;
      while (true) {
        ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<Long, String> message : messages) {
            // processed one message
          messageProcessed++;
          currentOffsets.put(
              new TopicPartition(message.topic(), message.partition()),
              new OffsetAndMetadata(message.offset() + 1));
          if (messageProcessed%50==0){
            consumer.commitSync(currentOffsets);
          }
        }
      }

    In this code, we manage a currentOffsets map, which takes TopicPartition as key and OffsetAndMetadata as value. We insert the TopicPartition and OffsetAndMetadata of processed messages during message processing into the currentOffsets map. When the number of processed messages reaches fifty, we call commitSync() with the currentOffsets map to mark these messages as committed.

    The behavior of this way is the same as sync and async commit. The only difference is that here we’re deciding the offsets to be committed not Kafka.

Last updated

Was this helpful?