0


【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

1 消费者入门概述

1.1 基础概念

1.1.1 消费者群组

    Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。

    如上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。

    如上图,在群组中增加一个消费者 2 ,那么每个消费者将分别从两个分区接收消息,上图中就表现为消费者 1 接收分区 1 和分区 3 的消息,消费者 2 接收分区 2 和分区 4 的消息。

    如上图,在群组中有 4 个消费者,那么每个消费者将分别从 1 个分区接收消息。

    但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。 往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。 如果是多个应用程序,需要从同一个主题中读取数据,只要保证每个应用程序有自己的消费者群组就行了。

    一般来说,建议分区数和消费者数量保持一致是最好的,当消费组的消费能力不足时,是可以通过增加分区数量来提高并行度,但是尽量避免这样情况发生,因为,增加一个topic的分区数量这个时候,kafka会进行分区再均衡,在这个期间topic是不可用的,而且一个topic可能有多个消费者组在消费他的数据,增加分区数量会影响到每一个消费者组的,所以再创建topic的时候一定要考虑好分区数。

    具体实现如图,先建立一个 2 分区的主题:

1.1.2 其他核心概念

1、订阅

    创建消费者后,使用 subscribe() 方法订阅主题,这个方法接受一个主题列表为参数,也可以接受一个正则表达式为参数;正则表达式同样也匹配多个主题。如果新创建了新主题,并且主题名字和正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。比如,要订阅所有和 test 相关的主题,可以 subscribe(“tets.*”)。

2、轮询

    为了不断的获取消息,我们要在循环中不断的进行轮询,也就是不停调用 poll 方法。

    poll 方法的参数为超时时间,控制 poll 方法的阻塞时间,它会让消费者在指定的毫秒数内一直等待 broker 返回数据。 poll 方法将会返回一个记录(消息)列表,每一条记录都包含了记录所属的主题信息,记录所在分区信息,记录在分区里的偏移量,以及记录的键值对。

    poll 方法不仅仅只是获取数据,在新消费者第一次调用时,它会负责查找群组,加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。

3、提交偏移量

    当我们调用 poll 方法的时候, broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用 Kafka 来追踪消息在分区里的位置,我们称之为偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为“提交”。

    消费者是如何提交偏移量的呢?消费者会往一个叫做 _consumer_offset 的特殊主题发送一个消息, 里面会包括每个分区的偏移量。

4、多线程安全:

    KafkaConsumer 的实现 不是 线程安全的,所以我们在多线程的环境下, 使用 KafkaConsumer 的实例要小心,应该每个消费数据的线程拥有自己的 KafkaConsumer 实例。

5、群组协调:

    消费者要加入群组时,会向 群组协调器 发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给 群组协调器 ,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化( 新加入或者掉线 ) ,主题中分区发生了变化(增加)时发生。

6、分区再均衡

    当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道, Kafka 中,存在着消费者对分区所有权的关系,这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为再均衡 。再均衡对 Kafka 很重要,这是消费者群组带来高可用性和伸缩性的关键所在。 不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读 取消息的,会造成整个群组一小段时间的不可用 。

    消费者通过向称为群组协调器的 broker (不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。 在 0.10.1 及以后的版本中,心跳由单独的线程负责,相关的控制参数为 max.poll.interval.ms 。

7、消费安全问题:

    一般情况下,我们调用 poll 方法的时候, broker 返回的是生产者写入 Kafka 同时 kafka 的消费者提交偏移量,这样可以确保消费者消息消费不丢失也 不重复,所以一般情况下 Kafka 提供的原生的消费者是安全的,但是事情会这么完美吗?答案显然不是的!

1.2 消费者重要参数

1.3 消费者配置

    消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考如下代码:
 public static void main(String[] args) {
        //TODO 消费者三个属性必须指定(broker地址清单、key和value的反序列化器)
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        //TODO 群组并非完全必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
 
        //TODO 更多消费者配置(重要的)
        properties.put("auto.offset.reset","latest"); //消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理
        properties.put("enable.auto.commit",true); // 表明消费者是否自动提交偏移 默认值true
        properties.put("max.poll.records",500); // 控制每次poll方法返回的的记录数量 默认值500
        //分区分配给消费者的策略。系统提供两种策略。默认为Range
        properties.put("partition.assignment.strategy",Collections.singletonList(RangeAssignor.class));
 
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        
        try {
            //TODO 消费者订阅主题(可以多个)
            consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
            while(true){
                //TODO 拉取(新版本)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                            record.offset(),record.key(),record.value()));
                    //do my work
                    //打包任务投入线程池
                    // ex
                }
            }
        } finally {
            consumer.close();
        }
 
    }
  • auto.offset.reset

      消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是 latest ,从最新的记录开始读取,另一个值是 earliest ,表示消费者从起始位置读取分区的记录。
    

注意:如果是消费者在读取一个没有偏移量的分区或者偏移量无效的情况(因消费者长时间失效,包含的偏移量记录已经过时并被删除)下,默认值是 latest 的话,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到这种情况。

  • enable .auto.commit

      默认值 true ,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为 false ,自行控制何时提交。
    
  • partition.assignment.strategy

      分区分配给消费者的策略。系统提供两种策略。默认为 Range 。允许自定义策略。
    
  • Range

      把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)
    
  • RoundRobin

      把主题的分区循环分配给消费者。
    

1.4 自定义策略

    extends 类 AbstractPartitionAssignor ,然后在消费者端增加参数:properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 类 .class.getName()); 即可。
  • max.poll.records,控制每次 poll 方法返回的的记录数量。
  • fetch.min.bytes,每次 fetch 请求时, server 应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为 1 个字节。多消费者下,可以设大这个值,以降低 broker 的工作负载。
  • fetch.wait.max.ms,如果没有足够的数据能够满足 fetch.min.bytes ,则此项配置是指在应答 fetch 请求之前, server 会阻塞的最大时间。缺省为 500 个毫秒。和上面的fetch.min.bytes 结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。
  • max.partition.fetch.bytes,指定了服务器从每个分区里返回给消费者的最大字节数,默认 1MB 。假设一个主题有 20 个分区和 5 个消费者,那么每个消费者至少要有 4MB 的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的 message.max.bytes 更大,否则消费者可能无法读取消息。
  • session.timeout.ms,如果 consumer 在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认 3 秒。
  • client.id,当向 server 发出请求时,这个字符串会发送给 server 。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。
  • receive.buffer.bytes 和 send.buffer.bytes,指定 TCP socket 接受和发送数据包的缓存区大小。如果它们被设置为 -1 ,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

2 kafka消费者工作原理

2.1 kafka消费者工作流程

2.2 消费者组初始化流程

  1. 确定协调器coordinator:每当我们创建一个消费者组的时候,kafka会分配一个broker作为该消费组的一个coordinator,coordinator节点的选择:groupid的hash值 % __consumer_offsets的分区数量,这个是系统给的;
  2. 注册消费者,并选出leader consumer,当有了coordinate,消费者将会开始往该coordinate上进行注册,第一个注册的消费者将成为消费组的leader,后续的作为follower;
  3. 选出leader后,leader将会从coordinate获取分区信息,并会根据分区策略给每个consumer分配分区形成一个消费策略,并将消费策略汇报给coordinate;
  4. coordinate将每一个consumer对应的分区下发给每一个consumer,对所有的follower而言,只知道自己的分区,不知道别人的,但是leader知道所有人的分区;
  5. 当发生分区再均衡的时候,leader将会重复分配过程;

2.3 消费者组详细消费流程

2.4 消费者使用示例

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
 
public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
        // 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)
        //  broker的ip地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置  反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组(组名必须)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 3. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 注册消费主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
        // 4.调用方法消费数据
        // 如果kafka集群没有新数据会造成空转
        // 填写参数为时间,如果没有拉取数据,线程睡眠一会
        while (true) {
            // 设置1s中消费的一批数据
            // Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1s
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 打印消费数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
            }
        }
        //5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作
    }
}

2.5 提交偏移量导致的问题

    当我们调用 poll 方法的时候, broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用此记录来追踪消息在分区里的位置,我们称之为偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为“提交”。

    消费者是如何提交偏移量的呢?消费者会往一个叫做 _consumer_offset 的特殊主题发送一个消息, 里面会包括每个分区的偏移量。发生了再均衡之后,消费者可能会被分配新的分区,为了能够继续工作,消费者者需要读取每个分区最后一次提交的偏移量,然后从指定的位置,继续读取消息做处理。

    1 )如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理。

    2 )如果提交的偏移量大于客户端处理的最后一个消息的偏移量 , 那么处于两个偏移量之间的消息将会丢失。

    所以, 处理偏移量的方式对客户端会有很大的影响 。KafkaConsumer API 提供了很多种方式来提交偏移量 。

2.5.1 自动提交

    最简单的提交方式是让消费者自动提交偏移量。 如果 enable.auto.comnit 被设为 true ,消费者会自动把从 poll() 方法接收到的 最大 偏移量提交上去。 提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s 。自动提交是在轮询里进行的,消费者每次在进行轮询时会检査是否该提交偏移量了,如果是, 那么就会提交从上一次轮询返回的偏移量。 不过, 在使用这种简便的方式之前 , 需要知道它将会带来怎样的结果。 假设我们仍然使用默认的 5s 提交时间间隔 , 在最近一次提交之后的 3s 发生了再均衡,再均衡之后 , 消费者从最后一次提交的偏移量位置开始读取消息。 这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量 , 减小可能出现重复消息的时间窗, 不过这种情况是无法完全避免的 。 在使用自动提交时, 每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去 , 它并不知道具体哪些消息已经被处理了 , 所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit 被设为 true 时,在调用 close() 方法之前也会进行自动提交 ) 。一般情况下不会有什么问题, 不过在处理异常或提前退出轮询时要格外小心。

    自动提交虽然方便 , 但是很明显是一种基于时间提交的方式 , 不过并没有为我们留有余地来避免重复处理消息。

2.5.2 手动提交(同步)

    我们通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发 者可以在必要的时候提交当前偏移量,而不是基于时间间隔。 把 auto.commit. offset 设为 false,自行决定何时提交偏移量。使用 commitsync()提交偏移量最简单也最可靠。这个方法会提交由 poll()方法返回的最 新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

注意: commitsync() 将会提交由 poll() 返回的最新偏移量 , 所以在处理完所有记录后要确保调用了 commitsync() ,否则还是会有丢失消息的风险。如果发生了再均衡, 从最近批消息到发生再均衡之间的所有消息都将被重复处理。 只要没有发生不可恢复的错误,commitSync ()方法会阻塞,会一直尝试直至提交成功,如果失败,也只能记录异常日志。

    具体代码参考:
public static void main(String[] args) {
        /*消息消费者*/
        Properties properties = KafkaConst.consumerConfig("CommitSync",
                StringDeserializer.class,
                StringDeserializer.class);
        //TODO 取消自动提交
        /*取消自动提交*/
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
 
        KafkaConsumer<String,String> consumer
                = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList(
                    BusiConst.CONSUMER_COMMIT_TOPIC));
            while(true){
                ConsumerRecords<String, String> records
                        = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){ //100个 100~ 200
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    //do our work
 
                }
                //开始事务
                //读业务写数据库-
                //偏移量写入数据库
                //TODO 同步提交(这个方法会阻塞)
                consumer.commitSync(); //offset =200  max
 
                consumer.commitSync(); //offset =200  max
            }
        } finally {
            consumer.close();
        }
    }

2.5.3 异步提交

    手动提交时,在 broker 对提交请求作出回应之前,应用程序会一直阻塞。这时我们可以使用异步提交 API ,我们只管发送提交请求,无需等待 broker 的响应。

    在成功提交或碰到无法恢复的错误之前 , commitsync() 会一直重试 , 但是 commitAsync 不会。它之所以不进行重试 , 是因为在它收到服务器响应的时候 , 可能有一个更大的偏移量已经提交成功。

    假设我们发出一个请求用于提交偏移量 2000,, 这个时候发生了短暂的通信问题 , 服务器收不到请求 , 自然也不会作出任何响应。与此同时 , 我们处理了另外一批消息, 并成功提交了偏移量 3000 。如果 commitAsync() 重新尝试提交偏移量 2000, 它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡 , 就会出现重复消息。

    commitAsync() 也支持回调 , 在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标。 回调具体代码参考:
public static void main(String[] args) {
        /*消息消费者*/
        Properties properties = KafkaConst.consumerConfig(
                "CommitAsync",
                StringDeserializer.class,
                StringDeserializer.class);
        //TODO  取消自动提交
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);
 
        KafkaConsumer<String,String> consumer
                = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList(
                    BusiConst.CONSUMER_COMMIT_TOPIC));
            while(true){
                ConsumerRecords<String, String> records
                        = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    //do our work
                }
                //TODO 异步提交偏移量
                consumer.commitAsync();
                /*允许执行回调*/
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(
                            Map<TopicPartition, OffsetAndMetadata> offsets,
                            Exception exception) {
                        if(exception!=null){
                            System.out.print("Commmit failed for offsets ");
                            System.out.println(offsets);
                            exception.printStackTrace();
                        }
                    }
                });
 
            }
        } finally {
            consumer.close();
        }
    }

2.5.4 同步异步组合

    因为同步提交一定会成功、异步可能会失败,所以一般的场景是同步和异步一起来做。 一般情况下, 针对偶尔出现的提交失败 , 不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的 , 那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交, 就要确保能够提交成功。 因此, 在消费者关闭前一般会组合使用 commitAsync() 和 commitsync() 。具体使用,参见代码如下:
public static void main(String[] args) {
        /*消息消费者*/
        Properties properties = KafkaConst.consumerConfig("SyncAndAsync",
                StringDeserializer.class,
                StringDeserializer.class);
        //TODO 取消自动提交
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);
 
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList(
                    BusiConst.CONSUMER_COMMIT_TOPIC));
            while(true){
                ConsumerRecords<String, String> records
                        = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    //do our work
                }
                //TODO 异步提交
                consumer.commitAsync();
            }
        } catch (CommitFailedException e) {
            System.out.println("Commit failed:");
            e.printStackTrace();
        } finally {
            try {
                //TODO 为了万不一失,需要同步提交下
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }

2.5.5 特定提交

    在我们前面的提交中,提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交该怎么办 ?

    如果 poll() 方法返回一大批数据 , 为了避免因再均衡引起的重复处理整批消息 , 想要在批次中间提交偏移量该怎么办 ? 这种情况无法通过调用 commitSync()或 commitAsync() 来实现,因为它们只会提交最后一个偏移量 , 而此时该批次里的消息还没有处理完。 消费者 API 允许在调用 commitsync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map 。假设我们处理了半个批次的消息 , 最后一个来自 主题“customers ”,分区 3 的消息的偏移量是 5000 ,你可以调用 commitsync() 方法来提交它。不过,因为消费者可能不只读取一个分区 , 因为我们需要跟踪所有分区的偏移量, 所以在这个层面上控制偏移量的提交会让代码变复杂。

    具体使用,参见代码如下:
public static void main(String[] args) {
        /*消息消费者*/
        Properties properties = KafkaConst.consumerConfig(
                "CommitSpecial",
                StringDeserializer.class,
                StringDeserializer.class);
        //TODO 必须做
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);
 
        KafkaConsumer<String,String> consumer
                = new KafkaConsumer<String, String>(properties);
        Map<TopicPartition, OffsetAndMetadata> currOffsets
                = new HashMap<TopicPartition, OffsetAndMetadata>();
        int count = 0;
        try {
            consumer.subscribe(Collections.singletonList(
                    BusiConst.CONSUMER_COMMIT_TOPIC));
            while(true){
                ConsumerRecords<String, String> records
                        = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    currOffsets.put(new TopicPartition(record.topic(),record.partition()),
                            new OffsetAndMetadata(record.offset()+1,"no meta"));
                    if(count%11==0){
                        //TODO 这里特定提交(异步方式,加入偏移量),每11条提交一次
                        consumer.commitAsync(currOffsets,null);
                    }
                    count++;
                }
            }
        } finally {
            //TODO 在关闭前最好同步提交一次偏移量
            consumer.commitSync();
            consumer.close();
        }
    }

2.6 分区再均衡

2.6.1 再均衡监听器

    在提交偏移量一节中提到过 , 消费者在退出和进行分区再均衡之前 , 会做一些清理工作比如,提交偏移量、关闭文件句柄、数据库连接等。 在为消费者分配新分区或移除旧分区时, 可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalancelistener实例就可以了。

ConsumerRebalancelistener 有两个需要实现的方法。

  1. public void onPartitionsRevoked( Collection< TopicPartition> partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了;
  2. public void onPartitionsAssigned( Collection< TopicPartition> partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。

2.6.2 从特定偏移量开始记录

    到目前为止 , 我们知道了如何使用 poll() 方法从各个分区的最新偏移量处开始处理消息。 不过, 有时候我们也需要从特定的偏移量处开始读取消息。 如果想从分区的起始位置开始读取消息, 或者直接跳到分区的末尾开始读取消息 , 可以使 seekToBeginning(Collection tp) 和seekToEnd( Collectiontp) 这两个方法。 不过,Kafka 也为我们提供了用于查找特定偏移量的 API 。它有很多用途 , 比如向后回退几个消息或者向前跳过几个消息 ( 对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息) 。在使用 Kafka 以外的系统来存储偏移量时 , 它将给我们带来更大的惊喜 -- 让消息的业务处理和偏移量的提交变得一致。

    试想一下这样的场景: 应用程序从 Kafka 读取事件 ( 可能是网站的用户点击事件流 ), 对它们进行处理 ( 可能是使用自动程序清理点击操作并添加会话信息 ), 然后把结果保存到数据库。假设我们真的不想丢失任何数据, 也不想在数据库里多次保存相同的结果。 我们可能会,毎处理一条记录就提交一次偏移量。尽管如此, 在记录被保存到数据库之后以及偏移量被提交之前 , 应用程序仍然有可能发生崩溃 , 导致重复处理数据, 数据库里就会出现重复记录。 如果保存记录和偏移量可以在一个原子操作里完成 , 就可以避免出现上述情况。记录和偏移量要么都被成功提交 , 要么都不提交。如果记录是保存在数据库里而偏移量是提交到Kafka上 , 那么就无法实现原子操作不过 , 如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢 ? 那么我们就会知道记录和偏移量要么都成功提交, 要么都没有 , 然后重新处理记录。 现在的问题是: 如果偏移量是保存在数据库里而不是 Kafka 里 , 那么消费者在得到新分区时怎么知道该从哪里开始读取 ? 这个时候可以使用 seek() 方法。 在消费者启动或分配到新分区时, 可以使用 seck() 方法查找保存在数据库里的偏移量。我们可以使用使用 Consumer Rebalancelistener 和 seek() 方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。

2.7 独立消费者

    到目前为止 , 我们讨论了消费者群组 , 分区被自动分配给群组里的消费者 , 在群组里新增或移除消费者时自动触发再均衡。不过有时候可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了, 只需要把主题或者分区分配给消费者 , 然后开始读取消息并提交偏移量。 如果是这样的话, 就不需要订阅主题 , 取而代之的是为自己分配分区。一个消费者可以订阅主题 ( 并加入消费者群组 ), 或者为自己分配分区 , 但不能同时做这两件事情。 独立消费者相当于自己来分配分区,但是这样做的好处是自己控制,但是就没有动态特性的支持了,包括加入消费者(分区再均衡之类的),新增分区,这些都需要代码中去解决,所以一般情况下不推荐使用。

2.8 优雅退出

    如果确定要退出循环 , 需要通过另一个线程调用 consumer. wakeup() 方法。如果循环运行在主线程里 , 可以在 ShutdownHook 里调用该方法。要记住 , consumer. wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer. wakeup() 可以退出 poll(), 并抛出 WakeupException 异常。我们不需要处理 Wakeup Exception, 因为它只是用于跳出循环的一种方式。不过 , 在退出线程之前调用 consumer.close() 是很有必要的 , 它会提交任何还没有提交的东西, 并向群组协调器发送消息 , 告知自己要离开群组 , 接下来就会触发再均衡 , 而不需要等待会话超时。

参考链接

Kafka基本原理详解-CSDN博客

这是最详细的Kafka应用教程了 - 掘金

Kafka : Kafka入门教程和JAVA客户端使用-CSDN博客

简易教程 | Kafka从搭建到使用 - 知乎

kafka简介-CSDN博客

Kafka 架构及基本原理简析

kafka是什么

再过半小时,你就能明白kafka的工作原理了(推荐阅读)

Kafka 设计与原理详解

Kafka【入门】就这一篇! - 知乎

kafka简介_kafka_唏噗-华为云开发者联盟

kafka详解

Kafka 设计与原理详解_kafka的设计初衷不包括-CSDN博客

kafka学习知识点总结(三)

Kafka知识总结之Broker原理总结_kafka broker-CSDN博客

深度解析kafka broker网络模型运行原理_kafka broker原理-CSDN博客

Kafka源码分析及图解原理之Broker端

kafka——消费者原理解析

深入分析kafka的消费者配置原理_kafkaconsumer consumer;-CSDN博客

一探究竟,详解Kafka生产者和消费者的工作原理! - 简书

kafka消费原理_kafka消费逻辑-CSDN博客

标签: linq c#

本文转载自: https://blog.csdn.net/junbaozi/article/details/136076553
版权归原作者 江中散人 所有, 如有侵权,请联系我们删除。

“【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程”的评论:

还没有评论