0


Kafka:消费者手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。

两种手动提交方式:

  • commitSync(同步提交):

必须等待offset提交完毕,再去消费下一批数据。

同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)

  • commitAsync(异步提交) :

发送完提交offset请求后,就开始消费下一批数据了。

异步提交则没有失败重试机制,有可能提交失败。

注意:

关闭自动提交

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandSync {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 手动提交 记得关闭false
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 1 创建一个消费者  
        // KafkaConsumer<K, V> 
        // 由于消息形式是 key value 为 "", "hello"
        // 所以泛型K为key为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 test
        ArrayList<String> topics = new ArrayList<>();
        topics.add("test");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            // 手动提交offset
            // 同步提交
            // kafkaConsumer.commitSync();
            // 异步提交
            kafkaConsumer.commitAsync();
        }
    }
}

异常处理

同步提交处理(有自动重试)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    handle(e); // 处理提交失败异常
}

异步提交处理(没有自动重试)

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步提交规避阻塞
    }
} catch(Exception e) {
    handle(e); // 处理异常
} finally {
    try {
        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
        consumer.close();
    }
}

本文转载自: https://blog.csdn.net/weixin_45427648/article/details/129698546
版权归原作者 程序员无羡 所有, 如有侵权,请联系我们删除。

“Kafka:消费者手动提交”的评论:

还没有评论