虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。
两种手动提交方式:
- commitSync(同步提交):
必须等待offset提交完毕,再去消费下一批数据。
同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)
- commitAsync(异步提交) :
发送完提交offset请求后,就开始消费下一批数据了。
异步提交则没有失败重试机制,有可能提交失败。
注意:
关闭自动提交
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 手动提交 记得关闭false
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 1 创建一个消费者
// KafkaConsumer<K, V>
// 由于消息形式是 key value 为 "", "hello"
// 所以泛型K为key为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 test
ArrayList<String> topics = new ArrayList<>();
topics.add("test");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 手动提交offset
// 同步提交
// kafkaConsumer.commitSync();
// 异步提交
kafkaConsumer.commitAsync();
}
}
}
异常处理
同步提交处理(有自动重试)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
异步提交处理(没有自动重试)
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
版权归原作者 程序员无羡 所有, 如有侵权,请联系我们删除。