0


Kafka之Consumer使用与基本原理

前置内容

Kafka生产者:juejin.cn/post/709417…

Kafka Consumer基本概念:juejin.cn/post/709641…

Java SDK 基本使用

Consumer就是负责从Kafka集群中消费消息数据的应用程序,自 Kafka 0.9 版本提供了Java版本的Consumer SDK供用户使用,

Kafka官方支持的语言SDK较少,更多都是由第三方社区维护的SDK,如果需要使用对应语言的SDK,需要额外下载,

第三方库信息地址:docs.confluent.io/platform/cu…

使用Consumer消费消息的完整代码如下:

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerApp {

    public static void main(String[] args) {
        String topicName = "test-group";
        Properties props = new Properties();
        // 必须指定
        props.put("bootstrap.servers", "localhost:9092");
        // 必须指定
        props.put("group.id", "test-group");
        // 必须指定
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 必须指定
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 从最早的消息开始读取
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("key: %s, value: %s", record.key(), record.value()));
                }
            }
        } finally {
            consumer.close();
        }
    }
}
复制代码

创建一个Consumer实例需要以下几个步骤:

  1. 创建一个 java.util.Properties对象,并至少指定下面几个参数:- bootstrap.servers与Java版的Producer类似,该参数用于指定borker服务器地址,多个地址之间用,分隔,如果broker集群很多,也不用全部都指定,producer会根据配置的borker发现全部的broker,之所以要指定多个,是方便故障转移使用,即使 bootstrap.servers 中的某一台挂了,consumer 也可通过其他的地址接入 kafka 集群,因为kafka内部采用FQDN(Fully Qualified Domain Name), 因此如果broker端没有显式配置 listeners 使用IP地址,最好 bootstrap.servers 参数中的地址配置为主机名,而非IP地址。- key.deserializer与Producer对应,因为Producer发送到broker中的就是字节数组,因此每个消息被读取到时也是字节数组,所以需要指定将字节数组反序列化为原来对象格式的解码器,该参数的值必须是实现 org.apache.kafka.common.serialization.Deserializer 接口的类,并且是全类名,用于将消息的key序列化为原本的值。- value.deserializer与上面的 key.deserializer 类似,只是这里是 value 的反序列化。- group.id用于指定消费者实例所属的consumer group,也就是消费组,通常取名为一个有业务意义的名字就可以了。
  2. 使用上一步的Properties实例构造KafkaConsumer对象将上一步创建好的对象通过构造函数入参传递给 KafkaConsumer 类即可。KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);复制代码
  3. 调用KafkaConsumer.subscribe方法订阅topic一个Consumer可以订阅多个topic,并且要注意,执行多次 subscribe 方法,只会以最后一次为准,即覆盖式,订阅topic的语法如下:consumer.subscribe(Arrays.asList("topic-A", "topic-B"));复制代码也可以使用手动订阅topic和相应的分区,但这种方式不推荐。consumer.assign(Arrays.asList(new TopicPartition("topic-a", 0),new TopicPartition("topic-a", 1)));复制代码consumer的订阅是延迟生效的,订阅信息只有在下次poll调用时开始生效,如果在poll之前打印订阅信息,会发现是空的,因为并未生效。consumer也可以通过正则表达式的形式配置topic订阅,即动态订阅,当系统中出现符合正则条件的,将一起进行读取处理:consumer.subscribe(Pattern.compile("kafka-.*"), new ConsumerRebalanceListener() { // 在均衡开始之前和消费者停止读取消息之后调用,一般用来提交偏移量 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } // 在重新分配分区之后和消费者开始读取消息之前调用,一般用来指定消费偏移量 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });复制代码使用正则表达式的订阅必须指定 ConsumerRebalanceListener ,该类是一个回调接口,用于编写处理consumer分区分配方案变更时的逻辑,如果用户配置的是自动提交位移(enable.auto.commit=true),可不用理会该类,直接设置为:consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());复制代码但如果是手动提交,起码要在 onPartitionsRevoked 方法中处理分区分配方案变更时的位移提交。
  4. 循环调用KafkaConsumer.poll方法读取消息poll方法使用了类似linux的selectI/O机制,所有相关的事件(rebalance、获取消息)都发生在一个事件循环(event loop)中,一个常见的event loop 获取消息的写法如下:while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("key: %s, value: %s", record.key(), record.value())); }}复制代码上面的poll方法中还传递了一个 Duration.ofSeconds(1) 参数,这个参数表示超时时间为1秒钟。通常情况下,consumer拿到了足够多的数据将立即返回,但是如果数据不够多的话,consumer将处于阻塞状态,为了防止阻塞时间过长,上面的参数意思就是,即使没有太多的数据,最多也只阻塞1秒钟就立即返回。
  5. 处理获取到的消息对象 ConsumerRecord使用poll方法拿到消息集合后,需要对消息进行相应的业务处理,需要注意的是,从kafka Consumer的角度,poll方法返回后,就算是消费成功了,但是从业务角度,拿到消息后还需要进行一系列的消息处理,处理完毕后才算得上是消费成功,当业务处理逻辑比较重的时候,应当考虑使用新的线程去处理消息,避免时间循环中业务逻辑过重导致消息消费缓慢,如果是poll的参数配置不当,导致消费缓慢,应当适当调整poll的参数,例如超时时间。
  6. 关闭KafkaConsumerconsumer程序结束后要执行close操作,用于释放运行过程中占用的系统资源,例如线程、内存 socket等,关闭方式有如下两种:- KafkaConsumer.close():关闭consumer,并最多等待30秒- KafkaConsumer.close(timeout):关闭consumer,并最多等待给定的时间

Consumer脚本命令消费

上面说的是使用Java语言进行消费,不过kafka程序自身也提供了控制台脚本,用于验证调试消费者,脚本名为

kafka-console-consumer

在kafka安装目录下的bin目录下(windows中在bin/windows下),脚本启动参数常见有:

  1. --bootstrap-servers与SDK中的意思一样,指定broker的地址,多个采用逗号分隔
  2. --topic指定要消费的topic名称
  3. --from-beginning指定是否从头消费,指定该参数与Java SDK中的 props.put("auto.offset.reset", "earliest"); 效果一致

使用demo如下:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码

Consumer主要参数

  1. session.timeout.ms最初该参数有两种含义:第一种是判断consumer group成员是否崩溃的会话超时时间,如果该值设置的过长,那么消费者组协调者(group coordinator)也需要对应的时间才能判断出consumer崩溃,第二种是consumer消息处理逻辑的最大时间(即单个consumer两次poll的间隔时间),如果超出这个时间,coordinator会将该消费者踢出group,该消费者负责的分区也将重新分配给其他消费者,这会引发两种问题,一是不必要的rebalance,因为被踢出的consumer需要重新加入group,二是consumer被踢出group后处理的消息,无法提交位移,意味着该消息在rebalance后会被重复消费,如果消息处理总是需要很久的时间,那么将引起恶性循环,consumer将无法执行新消息的消费,除非调整参数值。这种定义,让用户在实际使用中很不方便,可能消息处理逻辑本身就比较久,但是又希望快速检测到consumer的失败,于是在 0.10.1 版本后,该参数仅包含一种意思,即上述的第一种,coordinator检测consumer失效的时间,该值默认是10秒,可根据实际情况调整,建议值设置的小一些,可让coordinator更快的检测到consumer的情况,从而更快的开启rebalance,避免造成更大的消息滞后(consumer lag)。
  2. max.poll.interval.ms上述所说的第二种情况,被剥离成了 max.poll.interval.ms 参数,对于消息处理比较久的情况,单独设置该值即可,这样保证了两种逻辑的分离,不会相互影响。
  3. auto.offset.reset该参数用于指定当consumer要消费的位移信息不在消息日志的合理范围内时,kafka的应对策略。什么是不合理范围?即无位移信息或位移信息越界,简单的说就是consumer要消费的信息位置并不存在。应对策略有下面几种:- earliest:从最早的位移开始消费- latest:从最新位移处进行消费- none:抛出异常该参数值的效果触发必须要符合 无位移信息或位移信息越界 才行,例如首次运行一个consumer group,并指定从头消费,那么group必然会从头开始消费,因为此时group没有任何位移信息,但是当group一旦提交了位移信息后,重启该group后,该group并不会再从头消费,因为kafka保存了该group的位移信息。
  4. enable.auto.commit该参数用于指定consumer是否自动提交位移,如果设置为true,consumer将在后台自动提交位移,否则需要用户手动提交位移,对于不允许消息丢失的情况下,可以设置为false,由用于手动提交。
  5. fetch.max.bytes用于指定consumer端单次获取数据的最大字节数,如果实际业务场景下消息很大,该参数也需要调整,否则consumer将无法消费这些消息。
  6. max.poll.records用于指定每次poll调用返回的最大消息数,可根据实际情况调整该参数的值。
  7. heartbeat.interval.ms该参数用于通知consumer group中的成员要进行新一轮的rebalance时的间隔,假设group coordinator决定开启新一轮rebalance时,它会将该决定以 REBALANCE_IN_PROGRESS 异常的形式放入 consumer 心跳请求的 response 中,consumer在收到该类型的response后,即知晓自己需要重新加入组了,该值设置的越小,当需要rebalance时,rebalance的也越快,且该值必须小于 session.timeout.ms,因为如果consumer在 session.timeout.ms 这段时间内都不发送心跳,coordinator 将认为他已经失效了,所以也没必要通知他了。
  8. connections.max.idle.ms该参数用于指定kafka定期关闭空闲socket的时长,默认值是9分钟,该参数可能会导致consumer处理下次请求时需要重新申请socket资源,造成速度下降,如果不在乎socket资源空闲的开销,可以设置为-1,即不关闭socket。

位移管理

consumer需要为自己订阅的分区进行消费进度的保存,即处理到了哪里,并且要定期向kafka broker提交当前的消费位置,进行持久化,这个消费位置被称为位移,

位移也表示下一条待消费消息的位置,假设consumer已经读取了某分区中第N条消息,那么它应该提交位移值为N,因为位移从0开始,位移为N的消息是第N+1条消息,即下次要消费的消息。

offset的提交时间,是对消息交付语义(message delivery semantic)保证的基石,常见的消息语义有三种:

  1. 最多一次 (at most once):消息可能会丢失,但不会重复处理
  2. 最少一次 (at least once):消息不会丢失,但可能会重复处理多次
  3. 精确一次 (exactly once):消息一定会被处理且仅被处理一次

consumer在消费进行业务处理前就提交位移,则可实现第一种语义,因为即使崩溃,恢复后也是去消费下一条,之前的不会被消费,

相反,如果提交位移在业务处理后,则可实现第二种语义,因为正常情况下,无法保证业务处理和位移提交符合原子性,所以仅保证不丢失,但不保证业务处理完毕后崩溃没有提交位移,导致恢复后的重复处理,

kafka自0.11版本开始支持事务,有了事务则可以实现第三种语义。

除了offset外,还有一些与consumer相关的位置信息,一共如下:

  1. 上次提交位移 (last committed offset):consumer最近一次提交的offset值,也就是上面提到的offset。
  2. 当前位置(current position):consumer已读取但尚未提交的位置
  3. 水位(watermark):也被称为高水位(high watermark),在水位下的所有消息都是consumer可以读取的(即图的左边),水位之上(即图的右边)的都无法被consumer读取
  4. 日志终端位移(Log End Offset,LEO):表示当前分区的最大位移值,正常情况下LEO都比水位要大,当分区的所有副本都保存了某条消息,分区的副本leader才会向上移动水位值

之所以有水位的存在,是因为kafka要保证消息要被所有的副本写入成功后,再允许消息被处理。

consumer会在broker列表中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等功能。

当consumer group首次启动时,由于其没有位移信息,所以 auto.offset.reset 的作用就体现出来了,通常情况下,要么从最新的地方开始读取,要么从最早的唯一开始读取,

当consumer运行一段时间后,必须要提交自己的位移值,如果consumer崩溃或被关闭,其负责的分区将被分配给其他consumer,因此在其他consumer读取这些分区前要做好位移提交工作,否则就会出现消息重复消费,

consumer提交位移的主要机制是通过向其所属的 coordinator 发送位移提交请求实现,每次提交都会向 __consumer_offsets 对应的分区上追加一条消息。

自动提交与手动提交

默认情况下,consumer自动进行位移提交,自动提交间隔是5秒钟,通过参数

auto.commit.interval.ms

参数可以控制自动提交的间隔,

一般推荐对消息手动进行提交,因为自动提交虽然不用额外做处理,但是极有可能出现消息丢失,在构建KafkaConsumer时,通过将配置参数

enable.auto.commit=false

将提交改为手动,

并在代码中使用

consumer.commitSync()

consumer.commitAsync()

进行提交,在业务处理完毕后进行提交可保证消息不会丢失,但不保证不重复消费,

两个方法前者是同步提交,会阻塞用户线程继续运行,后者是异步提交,不阻塞用户线程,但是还会在poll方法中轮询异步提交的结果。

提交方法还提供了带参数的重载方法,可以对提交做出更细粒度的控制,如下,每处理一条记录进行一次提交:

while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        // 获取分区
        for (TopicPartition partition : records.partitions()) {
            // 获取分区下的每条记录
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
                // 处理分区下的每条记录
                System.out.println(partitionRecord.value());
                // 获取该记录的offset
                long lastOffset = partitionRecord.offset();
                // 提交offset
                // 因为提交的位移必须是下一条待消费的消息位置,因此要 + 1
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }
复制代码

重平衡 (rebalance)

rebalance的作用

rebalance是一组协议,其定义了一个consumer group中的所有consumer如何达成一致均匀的订阅topic中的所有分区,

好比一共有一百块砖头,5个工人,那么这5个工人得达成协议怎么搬,不能两个或者五个人同时搬一块砖,这样会造成不必要的资源浪费,

例如一个名为A的topic,其有100个分区,现在有一个group来订阅A topic,该group中有5个consumer,默认情况下,kafka会为每个consumer分配不同的20个分区进行订阅消费,该分配过程就叫rebalance,

当consumer成功执行rebalance后, 组订阅的topic的每个分区只会分配给组内的一个consumer实例。

group coordinator(组协调者)

kafka内置了一个组协调协议 (group coordinator protocol),对于每个消费者组,kafka集群中的某个broker会被选举为其组协调者 (group coordinator),

coordinator负责对组的状态进行管理,其主要职责就是当组内有新成员来时,对该组进行 rebalance 操作,即协调topic分区订阅重新分配。

rebalance的触发条件

组rebalance的触发条件有下面3个:

  1. 组成员变更例如新 consumer 加入组,已有consumer离开组,或者已有consumer崩溃。
  2. 组订阅topic数变更基于正则表达式的订阅,当有符合正则表达式的新topic被创建时。
  3. 组订阅topic分区数变更被订阅的topic的分区数发生更改,例如使用命令行脚本增加了topic的分区数。

一般应用比较常见的触发条件是第一种,即 consumer 崩溃,这里的崩溃并不一定指的是consumer进程挂掉,或者consumer进程所在的机器宕机,

而是指的是,当consumer无法在指定时间内完成消息的处理,coordinator将认为consumer已经崩溃,从而触发新一轮的rebalance,

在实际的业务处理中,一定要避免在poll主线程中执行较重的逻辑处理,这会导致处理时间过长而被coordinator认为崩溃执行rebalance,

频繁的rebalance会极大的降低consumer的吞吐量,在生产环境中需要结合业务配置好consumer的几个参数:

request.timeout.ms

max.poll.records

max.poll.interval.ms

,避免不必要的rebalance出现。

rebalance分区分配策略

分区分配策略决定topic的分区使用何种方式分配给consumer订阅。

consumer默认有三种分区分配策略:

  1. rangerange策略基于范围的思想,将单个topic的分区按照顺序排列,然后将这些分区划分成固定大小的分区段,并依次分配给每个consumer。假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。range策略的分配过程大概:首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者排完序将会是C1, C2;然后将partitions的个数除以消费者的总数来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。几个例子:情况1: 有10个分区,2个消费者, 10 / 2 = 5,那么消费者 C1和消费者C2 将会消费同样多的分区,所以最后分区分配的结果:- C1 将消费 0, 1, 2, 3, 4 分区- C2 将消费 5, 6, 7, 8, 9 分区情况2: 有11个分区,那么最后分区分配的结果:- C1 将消费 0, 1, 2, 3, 4, 5 分区- C2 将消费 6, 7, 8, 9, 10分区情况3: 有2个主题(T1和T2),分别有11个分区(0,1,2,…10),那么最后分区分配的结果:- C1 将消费 T1主题的 0, 1, 2, 3, 4, 5 分区以及 T2主题的 0, 1, 2, 3, 4, 5分区,加起来一共- C2 将消费 T1主题的 6, 7, 8, 9, 10 分区以及 T2主题的 5, 6, 7, 8, 9, 10分区可以看出,C1 消费者C2 消费者多消费了2个分区,这是Range策略的一个弊端。
  2. round-robinround-robin策略将所有topic的分区顺序摆开,然后轮询式的分配给每个consumer。假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。那么最终分配给C1的是:0,2,4,6,8分配给C2的是:1,3,5,7,9
  3. stickysticky策略即黏性策略,rebalance会最大限度的按照之前的分配方案分配给各个consumer。假设我们有两个名为 T1 、T2 的主题,每个主题各3个分区,然后我们有三个消费者(C1,C2,C3)来消费这6个分区里面的数据。现分配如下C1: T1-0、T2-0C2: T1-1、T2-1C3: T1-2、T2-2突然 C2 崩溃,那么rebalance后,重新分配为:C1: T1-0、T1-1、T2-0C3:T1-2、T2-1、T2-2即在保持分配均匀的情况下,将原本属于某consumer的分区还分配给该consumer。

kafka consumer默认的分配策略是range,如果group下的所有consumer订阅的主题都是一样的,那么使用round-robin策略分配的会更均匀,

通过

partition.assignment.strategy

对consumer进行分配策略的设置,除了kafka自带的分配策略,用户也可以自定义分配器(assignor)。

rebalance generation

一个consumer group可以执行多次rebalance,generation 的引入是为了保护consumer group的offset无效提交,

generation 表示rebalance的分代,起初是0,当进行一次rebalance后,该值就会增加,假设上一届的consumer成员由于某些原因延迟提交了offset,

由于其提交offset时,携带的是旧的 generation 信息,因此该提交会被consumer group拒绝,很多时候 consumer 抛出 ILLEGAL_GENERATION 异常就是这个原因。

rebalance 协议

rebalance 本质是一组协议,group 与 coordinator 共同使用这组协议完成 group 的 coordinator,协议有如下几个:

  1. JoinGroup:consumer请求加入组
  2. SyncGroup:group leader 将分配方案同步更新到所有组内成员中
  3. Heartbeat:consumer定期向coordinator汇报心跳表明自己存活
  4. LeaveGroup:consumer主动通知coordinator自己即将离组
  5. DescribeGroup:查看组的所有信息(成员信息、协议信息、分配方案以及订阅信息),该类型主要供管理员使用

在rebalance过程中,coordinator主要处理consumer发过来的JoinGroup和SyncGroup请求,当consumer主动离组时发送LeaveGroup请求给coordinator。

在rebalance成功之后,组内所有consumer定期向coordinator发送Heartbeat请求,每个consumer根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROCESS判断是否开启新一轮的rebalance。

rebalance 流程

consumer group在执行 rebalance 之前必须确定 coordinator 所在的broker,并创建与该 broker 相互通信的 socket 连接,

确定 coordinator 的算法与确定 offset 被提交到 __consumer_offsets 目标分区的算法相同,如下:

  • 计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions 参数值(默认50),假设得出结果 10
  • 寻找 __consumer_offsets 分区 10 的 leader 副本所在的broker,该broker即为这个 group 的 coordinator

成功连接 coordinator 后,即可进行 rebalance 操作,rebalance 主要分为两步:

  1. 加入组这一步组内所有 consumer 向 coordinator 发送 JoinGroup 请求,当收集全 JoinGroup 请求后,coordinator 从中选择一个 consumer 担任 group 的 leader,并将所有的成员信息以及它们的订阅信息发送给leader,group 的 leader 与 coordinator 并非同一种概念,leader是某个consumer实例,coordinator是kafka集群中的一个broker,分配方案由leader给出,而非coordinator,之所以将分配方案交给consumer leader执行,是因为这样做有更好的灵活性,在这种机制下,用户可以自行实现类似Hadoop 机架感知(rack-aware)分配方案,同一机架上的分区分配给相同机架上的consumer,可减少网络传输的开销;同时,当consumer的分区策略发生改变后,重启consumer即可,无需broker进行介入。
  2. 同步更新分配方案这一步 group leader 开始制定分配方案,即根据分配策略决定group中的consumer分别负责topic中的哪些分区, 一旦分配完成,leader会将分配方案分装进 SyncGroup 请求并发送给 coordinator,组内所有的consumer都会发送 SyncGroup 请求,但只有leader发送的SyncGroup请求中包含了分配方案,coordinator收到分配方案后将属于各自consumer的分配方案作为SyncGroup请求的response返还给各自的consumer。

rebalance监听器

consumer默认将位移提交到 __consumer_offsets 中,其实 kafka 也支持用户将位移提交到外部存储中,例如数据库,

如果要实现这个功能,用户必须使用 rebalance 监听器,使用 rebalance 监听器的前提是用户使用 consumer group,如果使用的是独立consumer或者直接手动分配分区,那么 rebalance 监听器将不会生效,

rebalance监听器主要是一个接口回调类

ConsumerRebalanceListener

,有两个方法需要实现

onPartitionsRevoked

onPartitionsAssigned

,前者在开启新一轮的rebalance前调用,后者在rebalance完成后调用。

rebalance监听器最常见的用法是手动提交位移到第三方存储库,以及在rebalance前后执行一些审计操作,demo如下:

大致思路:

  1. 使用 joinStart 保存本次rebalance的开始时间、totalRebalanceTimeMs统计所有rebalance的时长总和
  2. 在 onPartitionsRevoked 记录本次rebalance的开始时间
  3. 在 onPartitionsRevoked 将每个分区的offset用自定义方法 saveOffsetInExternalStore 保存到外部存储中
  4. 在 onPartitionsAssigned 将每个分区的offset从外部存储中读取出来,并使用seek设置consumer从该位置读取
  5. 累加 totalRebalanceTimeMs 总时长
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// rebalance开始时间
final AtomicLong joinStart = new AtomicLong(0L);
// rebalance完成总时长
final AtomicLong totalRebalanceTimeMs = new AtomicLong(0L);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 设置新一轮的rebalance开始时间
        joinStart.set(System.currentTimeMillis());
        for (TopicPartition partition : partitions) {
            // consumer.position(partition) 读取当前offset
            // saveOffsetInExternalStore 将分区 offset 保存到外部存储
            saveOffsetInExternalStore(consumer.position(partition));
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            // readOffsetFromExternalStore 从外部存储读取提交位移的值
            // seek方法指定消费者从该位移处消费
            consumer.seek(partition, readOffsetFromExternalStore(partition));
        }
        // 更新rebalance完成总时长
        totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get());
    }
});
复制代码

如果启用了启动提交位移,用户可以不在

ConsumerRebalanceListener

监听器中手动提交唯一,consumer每次rebalance时,会检查用户是否启用了自动提交位移,如果是,它会自动帮用户提交,因此无需显式提交。

consumer要求rebalance在很短的时间内完成,因此在rebalance中不要放执行时间很长的逻辑,特别是一些阻塞方法。

解码序列化

解码序列化与Producer发送者发送时的序列化是互逆操作,即将对方序列化后的字节数组再恢复成原样子。

默认解序列化器

与Producer的序列化呼应,常用的deserializer如下:

  • StringDeserializer:序列化String类型
  • ByteBufferDeserializer:序列化ByteBuffer类型
  • BytesDeserializer:序列化Kafka自定义的Bytes类
  • DoubleDeserializer:序列化Double类型
  • IntegerDeserializer:序列化Integer类型
  • LongDeserializer:序列化Long类型

如果用户有更复杂的解序列化需求,可自行定义 deserializer 。

在构造Consumer对象时,指定相应的序列化值即可使用序列化:

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
复制代码

自定义解序列化器

先定义一个实现 Deserializer 接口的类:

public class UserDeserializer implements Deserializer<User> {

    private ObjectMapper objectMapper;

    @Override
    public void configure(Map configs, boolean isKey) {
        this.objectMapper = new ObjectMapper();
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, User.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        this.objectMapper = null;
    }
}
复制代码

指定consumer的value解码序列化为刚刚创建的类:

Properties props = new Properties();
// 必须指定
props.put("bootstrap.servers", "localhost:9092");
// 必须指定
props.put("group.id", "test-group");
// 必须指定
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必须指定
props.put("value.deserializer", "com.kafka.producer.UserDeserializer");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-user-topic"));
try {
    while (true) {
        ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
        // 获取分区
        for (TopicPartition partition : records.partitions()) {
            // 获取分区下的每条记录
            List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
                System.out.println(partitionRecord.value());
            }
        }
    }
} finally {
    consumer.close();
}
复制代码

与之前的Producer程序配合使用,得到消费结果:

多线程消费

与KafkaProducer不同,KafkaConsumer是非线程安全的,因此在实践过程中,推荐KafkaProducer单实例供多线程使用,

对于KafkaConsumer非线程安全有两种实践方式推荐:

1. 每个线程单独建立自己的KafkaConsumer

既然KafkaConsumer实例是非线程安全的,那么每个线程创建时,都各自创建一个仅自己使用的KafkaConsumer就可以避免问题了。

样例设计,先定义三个类:

  1. ConsumerRunnable 类:消费线程类,执行真正的消费任务
  2. ConsumerGroup 类:消费线程管理类,创建多个线程类执行消费任务
  3. ConsumerMain 类:测试主方法类
public class ConsumerMain {

    static class ConsumerRunnable implements Runnable {

        private final KafkaConsumer<String, String> consumer;

        public ConsumerRunnable(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            // 必须指定
            props.put("bootstrap.servers", brokerList);
            // 必须指定
            props.put("group.id", groupId);
            // 必须指定
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 必须指定
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 从最早的消息开始读取
            props.put("auto.offset.reset", "earliest");
            props.put("enable.auto.commit", "true");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    // 获取分区
                    for (TopicPartition partition : records.partitions()) {
                        // 获取分区下的每条记录
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
                            System.out.println(partitionRecord.value());
                        }
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }

    static class ConsumerGroup {

        private List<ConsumerRunnable> consumers;

        public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
            // 创建好相应的Consumer,等待执行
            consumers = new ArrayList<>(consumerNum);
            for (int i = 0; i < consumerNum; i++) {
                consumers.add(new ConsumerRunnable(brokerList, groupId, topic));
            }
        }

        public void execute() {
            // 放到不同的线程中执行消费
            for (ConsumerRunnable consumer : consumers) {
                new Thread(consumer).start();
            }
        }

    }

    public static void main(String[] args) {
        String brokerList = "localhost:9092";
        String groupId = "testGroup1";
        String topic = "test-topic";
        int consumerNum = 3;
        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        consumerGroup.execute();
    }

}
复制代码

2. 单KafkaConsumer实例+多worker线程

大量的线程和对应的KafkaConsumer创建占用的资源相对也会较多,因此可以选择消息的获取与消息的处理逻辑进行解耦,在全局维护一个或若干个消费者实例进行消息获取,然后将消息的处理逻辑放入单独的工作者线程中进行就好。

样例设计,先定义三个类:

  1. ConsumerThreadHandler:consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合,consumer位移提交也在这里进行。
  2. ConsumerWorker:本质是一个Runnable,执行真正的业务逻辑处理,并上报位移信息给ConsumerThreadHandler。
  3. Main类:测试主方法类。
public class Main {

    static class ConsumerWorker<K, V> implements Runnable {

        private final ConsumerRecords<K, V> records;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        // 得到消息和位移map(用于上报位移)
        public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.records = records;
            this.offsets = offsets;
        }

        @Override
        public void run() {
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
                // 消费消息
                for (ConsumerRecord<K, V> record : partitionRecords) {
                    System.out.println(record.value());
                }
                // 上报位移
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                synchronized (offsets) {
                    if (!offsets.containsKey(partition)) {
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    } else {
                        long currentOffset = offsets.get(partition).offset();
                        if (currentOffset <= lastOffset + 1) {
                            offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                        }
                    }
                }

            }
        }
    }

    static class ConsumerThreadHandler<K, V> {
        private final KafkaConsumer<K, V> consumer;
        private ExecutorService executorService;
        private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            // 必须指定
            props.put("bootstrap.servers", brokerList);
            // 必须指定
            props.put("group.id", groupId);
            // 必须指定
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 必须指定
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 从最早的消息开始读取
            props.put("auto.offset.reset", "earliest");
            // 关闭自动提交
            props.put("enable.auto.commit", "false");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // 提交位移
                    consumer.commitSync(offsets);
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // 清空位移
                    offsets.clear();
                }
            });
        }

        // 提交任务给worker运行
        public void consumer(int threadNumber) {
            executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            try {
                while (true) {
                    ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
                    if (!records.isEmpty()) {
                        // 读取到消息直接提交
                        executorService.submit(new ConsumerWorker<>(records, offsets));
                    }
                    // commit
                    commitOffsets();
                }
            } finally {
                commitOffsets();
                consumer.close();
            }
        }

        // 提交唯位移
        private void commitOffsets() {
            Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
            synchronized (offsets) {
                if (offsets.isEmpty()) {
                    return;
                }
                unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
                offsets.clear();
            }
            consumer.commitSync(unmodfiedMap);
        }

        public void close() {
            // 终止消费者
            consumer.wakeup();
            // 停止线程池
            executorService.shutdown();
        }
    }

    public static void main(String[] args) {
        String brokerList = "localhost:9092";
        String groupId = "testGroup1";
        String topic = "test-topic";
        // 创建1个消费者读取消息
        ConsumerThreadHandler<String, String> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
        // 开启4个worker运行业务处理
        new Thread(() -> handler.consumer(Runtime.getRuntime().availableProcessors())).start();
        try {
            // 主线程休眠20秒
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 20秒后关闭所有的消费者和线程池
        handler.close();
    }

}
复制代码

3. 两种方法对比

两种方式各有利弊,用户可根据实际业务场景进行相应的选择:
优点缺点方法1(每个线程维护专属KafkaConsumer)实现简单;无线程间交互开销,速度较快;方便位移管理;易维护分区间的消息消费顺序Socket连接开销大;consumer数量受限于topic分区数,扩展性差;因为socket连接多,发送的请求也会多,所以broker端负载相对较高;rebalance可能性增大方法2(全局consumer + 多worker线程)消息获取与处理解耦;可独立拓展consumer数量和worker数量,伸缩性较好需要实现负载;分区间的消息消费顺序难以维护;处理链路变长,位移管理困难;worker线程异常可能导致消费数据丢失

独立consumer

consumer group 会自动帮用户执行分区分配和rebalance,对于需要多个consumer共同消费某topic的场景,使用group是最合适的,

如果用户需要严格控制某个consumer固定消费某些分区,场景如下:

  1. 由进程自己维护分区状态
  2. 进程自身保证高可用(可自行重启恢复错误,例如YARN、Mesos等容器调度框架),无需kafka完成错误检测和恢复

在这种情况下,consumer group则不适用,需要应用独立消费者(standalone consumer),standalone consumer 之间彼此独立工作,任意一个consumer崩溃不会影响其他的consumer。

独立消费者Demo

使用 consumer group 进行消息的消费时,我们使用

KafkaConsumer.subscribe

直接订阅topic,独立消费者使用

KafkaConsumer.assign

方法进行消费,

如果发生多次

KafkaConsumer.assign

调用,只有最后一次会生效,之前的会被覆盖,同时 assign 和 subscribe 不可以在同一个 consumer 中混用。

assign

方法接收一个分区列表,直接赋予 consumer 访问这些分区的权力,代码如下:

    KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
    // 获取 my-user-topic 主题的分区列表
    List<PartitionInfo> partitions = consumer.partitionsFor("my-user-topic");
    // 创建一个list用于保存 TopicPartition 对象, 即consumer需要订阅的主题分区
    List<TopicPartition> topicPartitions = new ArrayList<>();
    for (PartitionInfo partition : partitions) {
        topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
    try {
        // 分区不为空
        if (!topicPartitions.isEmpty()) {
            // 使用 assign 进行订阅
            consumer.assign(topicPartitions);
                while (true) {
                    ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
                    // 获取分区
                    for (TopicPartition partition : records.partitions()) {
                        // 获取分区下的每条记录
                        List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
                            System.out.println(partitionRecord.value());
                        }
                    }
                }

        }
    } finally {
        consumer.close();
    }
}
复制代码

新/旧consumer对比

  1. 旧版本Consumer使用scala语言编写,新版本使用Java
  2. 旧版本Consumer依赖Zookeeper提交位移,新版本直接使用borker提供的topic,ZK本质只是一个协调服务组件,并不适合高并发的读写操作
  3. 旧版本的Consumer读取消息需要为每个分区都新建一个线程,新版本不需要
  4. 旧版本区分low-level 和 high-level 两个版本,前者没有consumer group的概念,而后者支持,新版本可通过不同方法的使用来实现是否支持group

总之旧版本已经不推荐使用,请在生产环境中使用新版本sdk。

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/cainiao1412/article/details/124839608
版权归原作者 多动手,勤思考 所有, 如有侵权,请联系我们删除。

“Kafka之Consumer使用与基本原理”的评论:

还没有评论