Commit in Kafka
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<Long, String> message : messages) { // processed message }Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); //process the messages consumer.commitSync();KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); //process the messages consumer.commitAsync();KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(KafkaConfigProperties.getTopic()); Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int messageProcessed = 0; while (true) { ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<Long, String> message : messages) { // processed one message messageProcessed++; currentOffsets.put( new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1)); if (messageProcessed%50==0){ consumer.commitSync(currentOffsets); } } }
Last updated