0


深入学习Kafka数据消费大致流程(如何创建并使用Kafka消费者)

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

见代码:

com.heima.kafka.chapter3.OffsetCommitAsyncCallback

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

异步回调

try {

while (running.get()) {

ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {

//do some logical processing.
}
// 异步回调
consumer.commitAsync(new OffsetCommitCallback() {

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {

if (exception == null) {

System.out.println(offsets);
} else {

log.error(“fail to commit offsets {}”, offsets, exception);
}
}
});
}
} finally {

consumer.close();
}

5.指定位移消费

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。

seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费。

见代码库:

com.heima.kafka.chapter3.SeekDemo

/**

  • 指定位移消费 */ public class SeekDemo extends ConsumerClientConfig { public static void main(String[] args) { Properties props = initConfig(); KafkaConsumer<String, String> consu
标签: 学习 kafka linq

本文转载自: https://blog.csdn.net/2301_79098686/article/details/138423748
版权归原作者 2301_79098686 所有, 如有侵权,请联系我们删除。

“深入学习Kafka数据消费大致流程(如何创建并使用Kafka消费者)”的评论:

还没有评论