0


Kafka入门笔记(四) -- kafka消费者

文章目录


前言

建议对kafka还不了解的小伙伴先看:Kafka入门笔记(一) --kafka概述+kafka集群搭建 、 Kafka入门笔记(二) --kafka常用命令、Kafka入门笔记(三) – kafka生产者


一、kafka消费者

1、消费方式:

  1. consumer采用pull(拉)模式从broker中读取数据,该模式可以根据consumer的消费能力以适当的速率消费消息。- 不足:如果kafka没有数据,消费者可能会陷入循环,一直返回空数据- 解决方法:kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可消费,consumer会等待一段时间再返回,这段时长即为timeout
  2. push(推)模式很难适应消费速率不同的消费者,因为消费发送速率是由broker决定的,典型表现就是拒绝服务以及网络拥塞。

2、分区分配策略:

  1. RoundRobin(轮询)策略:根据消费者组划分 - 消费者组成员增加或减少,都会重新分配消息
  2. Range(范围)策略(默认):根据主题划分

3、offset的维护:

  1. 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费;
  2. kafka 0.9版本之前,consumer默认将offset保持在zookeeper中,从0.9版本开始,consumer默认将offset保持在kafka一个内置的topic中,该topic为_consumer_offsets;
  3. 修改配置文件consumer.properties: exclude.internal.topics=false

二、Consumer(消费者)

1、导入kafka相关依赖

<!--kafka依赖--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><!--Json转换相关依赖--><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib-ext-spring</artifactId><version>1.0.2</version></dependency>

2、自动提交offset的消费方式:

  1. 配置消费者相关信息:
// 消费者相关配置Properties properties =newProperties();//设置kafka集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test001");//是否开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交延时
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//key/value的反序列化//key反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//value反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
  1. 创建消费者、订阅(topic)主题:
//创建消费者KafkaConsumer<String,String> consumer =newKafkaConsumer<>(properties);//订阅消费主题
consumer.subscribe(Arrays.asList("topic01"));
  1. 获取数据(阻塞获取数据):
//获取数据while(true){//多久拉取一次数据ConsumerRecords<String,String> consumerRecords =  consumer.poll(1000);for(ConsumerRecord<String,String> c:consumerRecords){//获取数据
        value = c.value();//具体业务逻辑...}}

3、手动提交offset的消费方式:

1)commitSync(同步提交):
  1. 消费者配置 关闭自动提交
//是否开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  1. 同步提交
while(true){//多久拉取一次数据ConsumerRecords<String,String> consumerRecords =  consumer.poll(1000);for(ConsumerRecord<String,String> c:consumerRecords){//获取数据
        value = c.value();//具体业务逻辑...}//同步提交,当前线程会阻塞直到offset提交成功
    consumer.commitSync();}

2)commmitAsync(异步提交):
  1. 消费者配置 关闭自动提交:
//是否开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  1. 异步提交:
while(true){//多久拉取一次数据ConsumerRecords<String,String> consumerRecords =  consumer.poll(1000);for(ConsumerRecord<String,String> c:consumerRecords){//获取数据
        value = c.value();//具体业务逻辑...}//异步提交
    consumer.commitAsync(newOffsetCommitCallback(){//异步逻辑处理@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e){if(e !=null){System.out.println("commit failed for"+map);}}});}

3)同步提交与异步提交的比较:
  1. 相同点: 。都会将本次poll的一批数据最高偏移量提交;
  2. 不同点: 。commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试; 。CommitAsync没有失败重试机制,故可能提交失败。

4、如何重新消费某一个主题的数据?

  1. 更换消费者组
  2. 重置消费者offset

实现如下:

//更换消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test002");//重置消费者的offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
标签: kafka 笔记 分布式

本文转载自: https://blog.csdn.net/qq_34424698/article/details/136020185
版权归原作者 无题白水 所有, 如有侵权,请联系我们删除。

“Kafka入门笔记(四) -- kafka消费者”的评论:

还没有评论