1.Kafka消费方式
2.Kafka消费者工作流程
(1)总体工作流程
(2)消费者组工作流程
3.消费者API
(1)单个消费者消费
实现代码
packagecom.zrclass.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;publicclassCustomConsumer{publicstaticvoidmain(String[] args){// 1.创建消费者的配置对象Properties properties =newProperties();// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 创建消费者对象KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(properties);// 注册要消费的主题(可以消费多个主题)ArrayList<String> topics =newArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);// 拉取数据打印while(true){// 设置 1s 中消费一批数据ConsumerRecords<String,String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
(2)单个消费者指定分区消费
代码实现:
KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据ArrayList<TopicPartition> topicPartitions =newArrayList<>();
topicPartitions.add(newTopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
(3)消费者组消费
复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个消费者消费
4.kafka的分区分配及再平衡策略
(1)kafka的分区分配策略(默认策略是Range + CooperativeSticky)
- 一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。
- Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。
1)每个consumer都发送JoinGroup请求
2)选出一个consumer作为leader
3)coordinator把要消费的topic情况发送给leader消费者
4)leader消费者会负责制定消费方案
5)leader消费者把消费方案发给coordinator
6)Coordinator把leader消费者的消费方案下发给各个consumer
7)每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡
(2)range分区策略及再平衡
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IxCP3AOK-1682405725356)(分布式消息队列Kafka.assets/image-20230425112937754.png)]
分区策略测试:
1)修改主题 first 为 7 个分区。
2)复制 CustomConsumer 类,创建由三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。
3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。
packagecom.zrclass.kafka.producer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassCustomProducer{publicstaticvoidmain(String[] args)throwsInterruptedException{Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaProducer<String,String> kafkaProducer =newKafkaProducer<>(properties);for(int i =0; i <7; i++){
kafkaProducer.send(newProducerRecord<>("first", i,"test","java"));}
kafkaProducer.close();}}
4)观看 3 个消费者分别消费哪些分区的数据
CustomConsumer消费0,1,2号分区数据;
CustomConsumer1消费3,4分区数据;
CustomConsumer2消费5,6分区数据;
分区分配再平衡策略:
1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
(3)RoundRobin 分区分配再平衡
轮询的方式
(4)Sticky****以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
5.kafka offset偏移量
(1)kafaka内置主题__consumer_offsets
kafka内置主题__consumer_offsets里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
(2)消费内置主题__consumer_offsets
0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,
默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2)采用命令行方式,创建一个新的 topic。
[zrclass@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic first--partitions 2 --replication-factor 2
3)启动生产者往 first生产数据。
[zrclass@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic first --bootstrap-server hadoop102:9092
4)启动消费者消费 first数据。
[zrclass@hadoop104 kafka]$ bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic first --group test
注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。
5)查看消费者消费主题__consumer_offsets。
[zrclass@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm
atter" --from-beginning
[offset,first,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)[offset,first,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
(3)kafka默认自动提交偏移量
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//3. 创建 kafka 消费者KafkaConsumer<String,String> consumer =newKafkaConsumer<>(properties);//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
(4)手动提交offset
1)同步提交offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。
packagecom.zrclass.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Arrays;importjava.util.Properties;publicclassCustomConsumerByHandSync{publicstaticvoidmain(String[] args){// 1. 创建 kafka 消费者配置类Properties properties =newProperties();// 2. 添加配置参数// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//3. 创建 kafka 消费者KafkaConsumer<String,String> consumer =newKafkaConsumer<>(properties);//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));//5. 消费数据while(true){// 读取消息ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 输出消息for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord.value());}// 同步提交 offset
consumer.commitSync();}}}
2)异步提交offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
// 异步提交 offset
consumer.commitAsync();
(5)指定offset消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
2)latest(默认值):自动将偏移量重置为最新偏移量。
3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(6)指定消费时间重新消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
packagecom.zrclass.kafka.consumer;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.*;publicclassCustomConsumerForTime{publicstaticvoidmain(String[] args){// 0 配置信息Properties properties =newProperties();// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");// 1 创建一个消费者KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics =newArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment =newHashSet<>();while(assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();}HashMap<TopicPartition,Long> timestampToSearch =newHashMap<>();// 封装集合存储,每个分区对应一天前的数据for(TopicPartition topicPartition : assignment){
timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);}// 获取从 1 天前开始消费的每个分区的 offsetMap<TopicPartition,OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。for(TopicPartition topicPartition : assignment){OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);// 根据时间指定开始消费的位置if(offsetAndTimestamp !=null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());}}// 3 消费该主题数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
(7)重复消费和漏消费
重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
怎么能做到既不漏消费也不重复消费呢?使用消费者事务。
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如 MySQL)。
(8)消费者调优(提高吞吐量)
数据积压(消费者如何提高吞吐量)
版权归原作者 weixin_42232931 所有, 如有侵权,请联系我们删除。