消费位移
Kafka 中的位移(offset)是用来记录消息在分区中的位置的标志,简单说就是
记录消费者的消费进度
,每次消息消费后需要更新消费进度,也就是**
位移提交
**
由此可见一旦位移提交发生异常,会导致消费进度不正确,就必然发生**
消息丢失
或者
重复消费
**
消息位移存储内部主题
__consumer_offsets
消息消费后需要执行位移的提交
消息位移提交几种方式
自动提交
enable.auto.commit
配置为true 默认每5s 提交一次 (auto.commit.interval.ms)拉取消息之前也会检查 是否可以进行位移提交
消息重复消费例子
消费者拉取了一批消息,消费后,消费位移自动提交前应用崩溃了,下次应用恢复,又从上次位移提交的地方消费
通过减小位移提交的时间间隔,能减少消息重复消费的可能,但会使消息位移提交频繁
消息丢失例子
消费者拉取到消息,此时消息位移刚好自动提交,但消息还没来及处理,然后应用崩溃了,下次应该恢复了,由于位移已经提交, 未处理的几条消息,就丢失了。
除了极端情况下消息可能存在丢失或重复消费,重复消息业务可以通过幂等性保证, 但消息丢失是可怕的,我们甚至都不知道
手动提交
对于业务来说消息拉取后,正确处理完才算消费了,自动提交可以更加灵活精准控制消息位移的提交
使用方式 设置**
enable.auto.commit
** 配置为false
同步提交
它会阻塞当前线程,直到提交成功或发生错误。同步提交确保位移提交的可靠性,但会增加延迟。
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record:records){
// 消息处理逻辑
}
kafkaConsumer.commitSync();
异步提交
它不会阻塞当前线程,提交过程在后台进行。异步提交提高了性能,但需要处理可能的提交失败情况。
kafkaConsumer.commitASync()
KafkaConsumer API 还为手动提交提供了带参数的方法
commitSync(Map<TopicPartition, OffsetAndMetadata>;
commitAsync(Map<TopicPartition, OffsetAndMetadata>)
总结
一般情况我们消息位移自动提交就可以满足我们大部分场景,当然也有场景需要控制消息位移提交,需要我们在
可靠性与性能
之间做取舍,自动位移提交代码稍微复杂点,需要处理好位移提交失败的情况。
版权归原作者 happycao123 所有, 如有侵权,请联系我们删除。