学习目录
一、消费者工作流程
消费者Consumer采用从broker中主动拉取数据,Kafka采用这种方式
- 生产者Producer向每一个分区的leader发送数据,follower主动跟leader同步数据保证数据的可靠性
- 消费者Consumer消费某一个分区的数据,一个消费者可以消费多个分区的数据
- 每个分区的数据只能有一个消费者组中的一个消费者消费,即同一个分区不能有消费者组中的两个消费者同时消费
- 每个消费者的offset(分区中数据的偏移量),由消费者保存在主题中。如果某台消费者宕机了(挂了)重启的之后通过offset得到以前消费数据的位置
二、消费者组
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同
特点:
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
- 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息
- 消费者组之间互不影响。所有的消费者都属于某个消费者 组,即消费者组是逻辑上的一个订阅者
1.消费者组初始化流程
coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择 = groupid的hashcode值 % __consumer_offsets的分区数量
例如: groupid的hashcode值 = 1,__consumer_offsets为50,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
- 所有者的消费者都会主动的向消费者发送请求加入消费者组当中
- coordinator从多个消费者中选择一个消费者作为leader(老大)
- coordinator将之前所有消费者信息发给leader,其中包括主题情况等
- 消费者leader会负责制度消费方案
- 消费者leader把消费方案发给coordinator
- coordinator把消费者方案分别发给各个消费者consumer
2.特殊情况☆☆☆☆☆
每3秒 每个消费者都会和coordinator保持心跳(默认3s),一旦超时超过45s(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟)超过了五分钟,也会触发再平衡
再平衡:把挂掉的消费者的任务分配给其他消费者
3.消费者组详细消费流程
- 首先创建一个消费者网络连接的客户端去跟kafka集群进行交互,然后调用sendFetches方法用来抓取数据,进行初始化,需要设置以下参数 (1)参数一:Fetch.min.bytes 每批次最小抓取数据大小 默认1字节,可以设置 (2)参数二:fetch.max.wait.ms 每批数据未到最小抓取数据的大小的超时时间,默认为500ms (3)参数三:Fetch.max.bytes 每批次最 大抓取大小,默认50M
- 再调用send方法发送请求,通过onSuccess回调方法,把对应的数据拉去回来,将一批一批的数据 放入消息队列queue中
- 消费者从队列中拉去数据,Max.poll.records一次拉取数据返回消息的最大条数,默认500条
- 生产者对数据进行序列号,那么消费者则对数据进行反序列化,通过拦截器,最后进行数据处理
三、快速入门
在 IDEA 中编写生产者和消费者程序,生产者往主题为first3中发送数据,消费者从主题为first3中拉去数据
注意:运行程序之前,需要启动zk和kafka集群
生产者 CustomProducer01 类
packagecom.kafka.producer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;/**
* @author wangbo
* @version 1.0
*//**
* 异步发送,创建不带回调函数的API代码
*/publicclassCustomProducer01{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//写两个节点是为了防止客户挂掉,另一个能够正常工作//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 1.创建kafka生成对象// <String,String> 表示 k的数据类型,和v的数据类型KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 2.发送数据for(int i =0; i<5;i++){//第一个参数为生产者的主题名,第二个生产者生产的数据value。还有其他配置选项
kafkaProducer.send(newProducerRecord<String,String>("first3","kafka"));}// 3.关闭资源
kafkaProducer.close();}}
消费者 CustomConsumer_01 类
packagecom.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;/**
* @author wangbo
* @version 1.0
*//**
* 1. 启动集群的zk和kafka
* 2. 运行CustomConsumer_01消费者消费数据
* 3. 运行CustomProducer01生产者生产数据 注意主题要对上
*/publicclassCustomConsumer_01{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//多写一个,避免其中一台挂掉,保证数据的可靠性//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建一个消费者 "","hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题 first3ArrayList<String> topics =newArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据//循环打印消费的数据 consumerRecords.forfor(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
版权归原作者 王博1999 所有, 如有侵权,请联系我们删除。