前置内容
Kafka生产者:juejin.cn/post/709417…
Kafka Consumer基本概念:juejin.cn/post/709641…
Java SDK 基本使用
Consumer就是负责从Kafka集群中消费消息数据的应用程序,自 Kafka 0.9 版本提供了Java版本的Consumer SDK供用户使用,
Kafka官方支持的语言SDK较少,更多都是由第三方社区维护的SDK,如果需要使用对应语言的SDK,需要额外下载,
第三方库信息地址:docs.confluent.io/platform/cu…
使用Consumer消费消息的完整代码如下:
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerApp {
public static void main(String[] args) {
String topicName = "test-group";
Properties props = new Properties();
// 必须指定
props.put("bootstrap.servers", "localhost:9092");
// 必须指定
props.put("group.id", "test-group");
// 必须指定
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必须指定
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("key: %s, value: %s", record.key(), record.value()));
}
}
} finally {
consumer.close();
}
}
}
复制代码
创建一个Consumer实例需要以下几个步骤:
- 创建一个
java.util.Properties
对象,并至少指定下面几个参数:- bootstrap.servers与Java版的Producer类似,该参数用于指定borker服务器地址,多个地址之间用,分隔,如果broker集群很多,也不用全部都指定,producer会根据配置的borker发现全部的broker,之所以要指定多个,是方便故障转移使用,即使 bootstrap.servers 中的某一台挂了,consumer 也可通过其他的地址接入 kafka 集群,因为kafka内部采用FQDN(Fully Qualified Domain Name), 因此如果broker端没有显式配置 listeners 使用IP地址,最好 bootstrap.servers 参数中的地址配置为主机名,而非IP地址。- key.deserializer与Producer对应,因为Producer发送到broker中的就是字节数组,因此每个消息被读取到时也是字节数组,所以需要指定将字节数组反序列化为原来对象格式的解码器,该参数的值必须是实现org.apache.kafka.common.serialization.Deserializer
接口的类,并且是全类名,用于将消息的key序列化为原本的值。- value.deserializer与上面的 key.deserializer 类似,只是这里是 value 的反序列化。- group.id用于指定消费者实例所属的consumer group,也就是消费组,通常取名为一个有业务意义的名字就可以了。 - 使用上一步的Properties实例构造KafkaConsumer对象将上一步创建好的对象通过构造函数入参传递给 KafkaConsumer 类即可。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);复制代码
- 调用KafkaConsumer.subscribe方法订阅topic一个Consumer可以订阅多个topic,并且要注意,执行多次 subscribe 方法,只会以最后一次为准,即覆盖式,订阅topic的语法如下:
consumer.subscribe(Arrays.asList("topic-A", "topic-B"));复制代码
也可以使用手动订阅topic和相应的分区,但这种方式不推荐。consumer.assign(Arrays.asList(new TopicPartition("topic-a", 0),new TopicPartition("topic-a", 1)));复制代码
consumer的订阅是延迟生效的,订阅信息只有在下次poll调用时开始生效,如果在poll之前打印订阅信息,会发现是空的,因为并未生效。consumer也可以通过正则表达式的形式配置topic订阅,即动态订阅,当系统中出现符合正则条件的,将一起进行读取处理:consumer.subscribe(Pattern.compile("kafka-.*"), new ConsumerRebalanceListener() { // 在均衡开始之前和消费者停止读取消息之后调用,一般用来提交偏移量 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } // 在重新分配分区之后和消费者开始读取消息之前调用,一般用来指定消费偏移量 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });复制代码
使用正则表达式的订阅必须指定 ConsumerRebalanceListener ,该类是一个回调接口,用于编写处理consumer分区分配方案变更时的逻辑,如果用户配置的是自动提交位移(enable.auto.commit=true),可不用理会该类,直接设置为:consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());复制代码
但如果是手动提交,起码要在 onPartitionsRevoked 方法中处理分区分配方案变更时的位移提交。 - 循环调用KafkaConsumer.poll方法读取消息poll方法使用了类似linux的selectI/O机制,所有相关的事件(rebalance、获取消息)都发生在一个事件循环(event loop)中,一个常见的event loop 获取消息的写法如下:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("key: %s, value: %s", record.key(), record.value())); }}复制代码
上面的poll方法中还传递了一个 Duration.ofSeconds(1) 参数,这个参数表示超时时间为1秒钟。通常情况下,consumer拿到了足够多的数据将立即返回,但是如果数据不够多的话,consumer将处于阻塞状态,为了防止阻塞时间过长,上面的参数意思就是,即使没有太多的数据,最多也只阻塞1秒钟就立即返回。 - 处理获取到的消息对象 ConsumerRecord使用poll方法拿到消息集合后,需要对消息进行相应的业务处理,需要注意的是,从kafka Consumer的角度,poll方法返回后,就算是消费成功了,但是从业务角度,拿到消息后还需要进行一系列的消息处理,处理完毕后才算得上是消费成功,当业务处理逻辑比较重的时候,应当考虑使用新的线程去处理消息,避免时间循环中业务逻辑过重导致消息消费缓慢,如果是poll的参数配置不当,导致消费缓慢,应当适当调整poll的参数,例如超时时间。
- 关闭KafkaConsumerconsumer程序结束后要执行close操作,用于释放运行过程中占用的系统资源,例如线程、内存 socket等,关闭方式有如下两种:- KafkaConsumer.close():关闭consumer,并最多等待30秒- KafkaConsumer.close(timeout):关闭consumer,并最多等待给定的时间
Consumer脚本命令消费
上面说的是使用Java语言进行消费,不过kafka程序自身也提供了控制台脚本,用于验证调试消费者,脚本名为
kafka-console-consumer
,
在kafka安装目录下的bin目录下(windows中在bin/windows下),脚本启动参数常见有:
- --bootstrap-servers与SDK中的意思一样,指定broker的地址,多个采用逗号分隔
- --topic指定要消费的topic名称
- --from-beginning指定是否从头消费,指定该参数与Java SDK中的
props.put("auto.offset.reset", "earliest");
效果一致
使用demo如下:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码
Consumer主要参数
- session.timeout.ms最初该参数有两种含义:第一种是判断consumer group成员是否崩溃的会话超时时间,如果该值设置的过长,那么消费者组协调者(group coordinator)也需要对应的时间才能判断出consumer崩溃,第二种是consumer消息处理逻辑的最大时间(即单个consumer两次poll的间隔时间),如果超出这个时间,coordinator会将该消费者踢出group,该消费者负责的分区也将重新分配给其他消费者,这会引发两种问题,一是不必要的rebalance,因为被踢出的consumer需要重新加入group,二是consumer被踢出group后处理的消息,无法提交位移,意味着该消息在rebalance后会被重复消费,如果消息处理总是需要很久的时间,那么将引起恶性循环,consumer将无法执行新消息的消费,除非调整参数值。这种定义,让用户在实际使用中很不方便,可能消息处理逻辑本身就比较久,但是又希望快速检测到consumer的失败,于是在 0.10.1 版本后,该参数仅包含一种意思,即上述的第一种,coordinator检测consumer失效的时间,该值默认是10秒,可根据实际情况调整,建议值设置的小一些,可让coordinator更快的检测到consumer的情况,从而更快的开启rebalance,避免造成更大的消息滞后(consumer lag)。
- max.poll.interval.ms上述所说的第二种情况,被剥离成了 max.poll.interval.ms 参数,对于消息处理比较久的情况,单独设置该值即可,这样保证了两种逻辑的分离,不会相互影响。
- auto.offset.reset该参数用于指定当consumer要消费的位移信息不在消息日志的合理范围内时,kafka的应对策略。什么是不合理范围?即无位移信息或位移信息越界,简单的说就是consumer要消费的信息位置并不存在。应对策略有下面几种:- earliest:从最早的位移开始消费- latest:从最新位移处进行消费- none:抛出异常该参数值的效果触发必须要符合 无位移信息或位移信息越界 才行,例如首次运行一个consumer group,并指定从头消费,那么group必然会从头开始消费,因为此时group没有任何位移信息,但是当group一旦提交了位移信息后,重启该group后,该group并不会再从头消费,因为kafka保存了该group的位移信息。
- enable.auto.commit该参数用于指定consumer是否自动提交位移,如果设置为true,consumer将在后台自动提交位移,否则需要用户手动提交位移,对于不允许消息丢失的情况下,可以设置为false,由用于手动提交。
- fetch.max.bytes用于指定consumer端单次获取数据的最大字节数,如果实际业务场景下消息很大,该参数也需要调整,否则consumer将无法消费这些消息。
- max.poll.records用于指定每次poll调用返回的最大消息数,可根据实际情况调整该参数的值。
- heartbeat.interval.ms该参数用于通知consumer group中的成员要进行新一轮的rebalance时的间隔,假设group coordinator决定开启新一轮rebalance时,它会将该决定以 REBALANCE_IN_PROGRESS 异常的形式放入 consumer 心跳请求的 response 中,consumer在收到该类型的response后,即知晓自己需要重新加入组了,该值设置的越小,当需要rebalance时,rebalance的也越快,且该值必须小于 session.timeout.ms,因为如果consumer在 session.timeout.ms 这段时间内都不发送心跳,coordinator 将认为他已经失效了,所以也没必要通知他了。
- connections.max.idle.ms该参数用于指定kafka定期关闭空闲socket的时长,默认值是9分钟,该参数可能会导致consumer处理下次请求时需要重新申请socket资源,造成速度下降,如果不在乎socket资源空闲的开销,可以设置为-1,即不关闭socket。
位移管理
consumer需要为自己订阅的分区进行消费进度的保存,即处理到了哪里,并且要定期向kafka broker提交当前的消费位置,进行持久化,这个消费位置被称为位移,
位移也表示下一条待消费消息的位置,假设consumer已经读取了某分区中第N条消息,那么它应该提交位移值为N,因为位移从0开始,位移为N的消息是第N+1条消息,即下次要消费的消息。
offset的提交时间,是对消息交付语义(message delivery semantic)保证的基石,常见的消息语义有三种:
- 最多一次 (at most once):消息可能会丢失,但不会重复处理
- 最少一次 (at least once):消息不会丢失,但可能会重复处理多次
- 精确一次 (exactly once):消息一定会被处理且仅被处理一次
consumer在消费进行业务处理前就提交位移,则可实现第一种语义,因为即使崩溃,恢复后也是去消费下一条,之前的不会被消费,
相反,如果提交位移在业务处理后,则可实现第二种语义,因为正常情况下,无法保证业务处理和位移提交符合原子性,所以仅保证不丢失,但不保证业务处理完毕后崩溃没有提交位移,导致恢复后的重复处理,
kafka自0.11版本开始支持事务,有了事务则可以实现第三种语义。
除了offset外,还有一些与consumer相关的位置信息,一共如下:
- 上次提交位移 (last committed offset):consumer最近一次提交的offset值,也就是上面提到的offset。
- 当前位置(current position):consumer已读取但尚未提交的位置
- 水位(watermark):也被称为高水位(high watermark),在水位下的所有消息都是consumer可以读取的(即图的左边),水位之上(即图的右边)的都无法被consumer读取
- 日志终端位移(Log End Offset,LEO):表示当前分区的最大位移值,正常情况下LEO都比水位要大,当分区的所有副本都保存了某条消息,分区的副本leader才会向上移动水位值
之所以有水位的存在,是因为kafka要保证消息要被所有的副本写入成功后,再允许消息被处理。
consumer会在broker列表中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等功能。
当consumer group首次启动时,由于其没有位移信息,所以 auto.offset.reset 的作用就体现出来了,通常情况下,要么从最新的地方开始读取,要么从最早的唯一开始读取,
当consumer运行一段时间后,必须要提交自己的位移值,如果consumer崩溃或被关闭,其负责的分区将被分配给其他consumer,因此在其他consumer读取这些分区前要做好位移提交工作,否则就会出现消息重复消费,
consumer提交位移的主要机制是通过向其所属的 coordinator 发送位移提交请求实现,每次提交都会向 __consumer_offsets 对应的分区上追加一条消息。
自动提交与手动提交
默认情况下,consumer自动进行位移提交,自动提交间隔是5秒钟,通过参数
auto.commit.interval.ms
参数可以控制自动提交的间隔,
一般推荐对消息手动进行提交,因为自动提交虽然不用额外做处理,但是极有可能出现消息丢失,在构建KafkaConsumer时,通过将配置参数
enable.auto.commit=false
将提交改为手动,
并在代码中使用
consumer.commitSync()
或
consumer.commitAsync()
进行提交,在业务处理完毕后进行提交可保证消息不会丢失,但不保证不重复消费,
两个方法前者是同步提交,会阻塞用户线程继续运行,后者是异步提交,不阻塞用户线程,但是还会在poll方法中轮询异步提交的结果。
提交方法还提供了带参数的重载方法,可以对提交做出更细粒度的控制,如下,每处理一条记录进行一次提交:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 获取分区
for (TopicPartition partition : records.partitions()) {
// 获取分区下的每条记录
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
// 处理分区下的每条记录
System.out.println(partitionRecord.value());
// 获取该记录的offset
long lastOffset = partitionRecord.offset();
// 提交offset
// 因为提交的位移必须是下一条待消费的消息位置,因此要 + 1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
复制代码
重平衡 (rebalance)
rebalance的作用
rebalance是一组协议,其定义了一个consumer group中的所有consumer如何达成一致均匀的订阅topic中的所有分区,
好比一共有一百块砖头,5个工人,那么这5个工人得达成协议怎么搬,不能两个或者五个人同时搬一块砖,这样会造成不必要的资源浪费,
例如一个名为A的topic,其有100个分区,现在有一个group来订阅A topic,该group中有5个consumer,默认情况下,kafka会为每个consumer分配不同的20个分区进行订阅消费,该分配过程就叫rebalance,
当consumer成功执行rebalance后, 组订阅的topic的每个分区只会分配给组内的一个consumer实例。
group coordinator(组协调者)
kafka内置了一个组协调协议 (group coordinator protocol),对于每个消费者组,kafka集群中的某个broker会被选举为其组协调者 (group coordinator),
coordinator负责对组的状态进行管理,其主要职责就是当组内有新成员来时,对该组进行 rebalance 操作,即协调topic分区订阅重新分配。
rebalance的触发条件
组rebalance的触发条件有下面3个:
- 组成员变更例如新 consumer 加入组,已有consumer离开组,或者已有consumer崩溃。
- 组订阅topic数变更基于正则表达式的订阅,当有符合正则表达式的新topic被创建时。
- 组订阅topic分区数变更被订阅的topic的分区数发生更改,例如使用命令行脚本增加了topic的分区数。
一般应用比较常见的触发条件是第一种,即 consumer 崩溃,这里的崩溃并不一定指的是consumer进程挂掉,或者consumer进程所在的机器宕机,
而是指的是,当consumer无法在指定时间内完成消息的处理,coordinator将认为consumer已经崩溃,从而触发新一轮的rebalance,
在实际的业务处理中,一定要避免在poll主线程中执行较重的逻辑处理,这会导致处理时间过长而被coordinator认为崩溃执行rebalance,
频繁的rebalance会极大的降低consumer的吞吐量,在生产环境中需要结合业务配置好consumer的几个参数:
request.timeout.ms
、
max.poll.records
、
max.poll.interval.ms
,避免不必要的rebalance出现。
rebalance分区分配策略
分区分配策略决定topic的分区使用何种方式分配给consumer订阅。
consumer默认有三种分区分配策略:
- rangerange策略基于范围的思想,将单个topic的分区按照顺序排列,然后将这些分区划分成固定大小的分区段,并依次分配给每个consumer。假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。range策略的分配过程大概:首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者排完序将会是C1, C2;然后将partitions的个数除以消费者的总数来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。几个例子:情况1: 有10个分区,2个消费者, 10 / 2 = 5,那么消费者 C1和消费者C2 将会消费同样多的分区,所以最后分区分配的结果:- C1 将消费 0, 1, 2, 3, 4 分区- C2 将消费 5, 6, 7, 8, 9 分区情况2: 有11个分区,那么最后分区分配的结果:- C1 将消费 0, 1, 2, 3, 4, 5 分区- C2 将消费 6, 7, 8, 9, 10分区情况3: 有2个主题(T1和T2),分别有11个分区(0,1,2,…10),那么最后分区分配的结果:- C1 将消费 T1主题的 0, 1, 2, 3, 4, 5 分区以及 T2主题的 0, 1, 2, 3, 4, 5分区,加起来一共- C2 将消费 T1主题的 6, 7, 8, 9, 10 分区以及 T2主题的 5, 6, 7, 8, 9, 10分区可以看出,C1 消费者比C2 消费者多消费了2个分区,这是Range策略的一个弊端。
- round-robinround-robin策略将所有topic的分区顺序摆开,然后轮询式的分配给每个consumer。假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。那么最终分配给C1的是:0,2,4,6,8分配给C2的是:1,3,5,7,9
- stickysticky策略即黏性策略,rebalance会最大限度的按照之前的分配方案分配给各个consumer。假设我们有两个名为 T1 、T2 的主题,每个主题各3个分区,然后我们有三个消费者(C1,C2,C3)来消费这6个分区里面的数据。现分配如下C1: T1-0、T2-0C2: T1-1、T2-1C3: T1-2、T2-2突然 C2 崩溃,那么rebalance后,重新分配为:C1: T1-0、T1-1、T2-0C3:T1-2、T2-1、T2-2即在保持分配均匀的情况下,将原本属于某consumer的分区还分配给该consumer。
kafka consumer默认的分配策略是range,如果group下的所有consumer订阅的主题都是一样的,那么使用round-robin策略分配的会更均匀,
通过
partition.assignment.strategy
对consumer进行分配策略的设置,除了kafka自带的分配策略,用户也可以自定义分配器(assignor)。
rebalance generation
一个consumer group可以执行多次rebalance,generation 的引入是为了保护consumer group的offset无效提交,
generation 表示rebalance的分代,起初是0,当进行一次rebalance后,该值就会增加,假设上一届的consumer成员由于某些原因延迟提交了offset,
由于其提交offset时,携带的是旧的 generation 信息,因此该提交会被consumer group拒绝,很多时候 consumer 抛出 ILLEGAL_GENERATION 异常就是这个原因。
rebalance 协议
rebalance 本质是一组协议,group 与 coordinator 共同使用这组协议完成 group 的 coordinator,协议有如下几个:
- JoinGroup:consumer请求加入组
- SyncGroup:group leader 将分配方案同步更新到所有组内成员中
- Heartbeat:consumer定期向coordinator汇报心跳表明自己存活
- LeaveGroup:consumer主动通知coordinator自己即将离组
- DescribeGroup:查看组的所有信息(成员信息、协议信息、分配方案以及订阅信息),该类型主要供管理员使用
在rebalance过程中,coordinator主要处理consumer发过来的JoinGroup和SyncGroup请求,当consumer主动离组时发送LeaveGroup请求给coordinator。
在rebalance成功之后,组内所有consumer定期向coordinator发送Heartbeat请求,每个consumer根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROCESS判断是否开启新一轮的rebalance。
rebalance 流程
consumer group在执行 rebalance 之前必须确定 coordinator 所在的broker,并创建与该 broker 相互通信的 socket 连接,
确定 coordinator 的算法与确定 offset 被提交到 __consumer_offsets 目标分区的算法相同,如下:
- 计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions 参数值(默认50),假设得出结果 10
- 寻找 __consumer_offsets 分区 10 的 leader 副本所在的broker,该broker即为这个 group 的 coordinator
成功连接 coordinator 后,即可进行 rebalance 操作,rebalance 主要分为两步:
- 加入组这一步组内所有 consumer 向 coordinator 发送 JoinGroup 请求,当收集全 JoinGroup 请求后,coordinator 从中选择一个 consumer 担任 group 的 leader,并将所有的成员信息以及它们的订阅信息发送给leader,group 的 leader 与 coordinator 并非同一种概念,leader是某个consumer实例,coordinator是kafka集群中的一个broker,分配方案由leader给出,而非coordinator,之所以将分配方案交给consumer leader执行,是因为这样做有更好的灵活性,在这种机制下,用户可以自行实现类似Hadoop 机架感知(rack-aware)分配方案,同一机架上的分区分配给相同机架上的consumer,可减少网络传输的开销;同时,当consumer的分区策略发生改变后,重启consumer即可,无需broker进行介入。
- 同步更新分配方案这一步 group leader 开始制定分配方案,即根据分配策略决定group中的consumer分别负责topic中的哪些分区, 一旦分配完成,leader会将分配方案分装进 SyncGroup 请求并发送给 coordinator,组内所有的consumer都会发送 SyncGroup 请求,但只有leader发送的SyncGroup请求中包含了分配方案,coordinator收到分配方案后将属于各自consumer的分配方案作为SyncGroup请求的response返还给各自的consumer。
rebalance监听器
consumer默认将位移提交到 __consumer_offsets 中,其实 kafka 也支持用户将位移提交到外部存储中,例如数据库,
如果要实现这个功能,用户必须使用 rebalance 监听器,使用 rebalance 监听器的前提是用户使用 consumer group,如果使用的是独立consumer或者直接手动分配分区,那么 rebalance 监听器将不会生效,
rebalance监听器主要是一个接口回调类
ConsumerRebalanceListener
,有两个方法需要实现
onPartitionsRevoked
、
onPartitionsAssigned
,前者在开启新一轮的rebalance前调用,后者在rebalance完成后调用。
rebalance监听器最常见的用法是手动提交位移到第三方存储库,以及在rebalance前后执行一些审计操作,demo如下:
大致思路:
- 使用 joinStart 保存本次rebalance的开始时间、totalRebalanceTimeMs统计所有rebalance的时长总和
- 在 onPartitionsRevoked 记录本次rebalance的开始时间
- 在 onPartitionsRevoked 将每个分区的offset用自定义方法 saveOffsetInExternalStore 保存到外部存储中
- 在 onPartitionsAssigned 将每个分区的offset从外部存储中读取出来,并使用seek设置consumer从该位置读取
- 累加 totalRebalanceTimeMs 总时长
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// rebalance开始时间
final AtomicLong joinStart = new AtomicLong(0L);
// rebalance完成总时长
final AtomicLong totalRebalanceTimeMs = new AtomicLong(0L);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 设置新一轮的rebalance开始时间
joinStart.set(System.currentTimeMillis());
for (TopicPartition partition : partitions) {
// consumer.position(partition) 读取当前offset
// saveOffsetInExternalStore 将分区 offset 保存到外部存储
saveOffsetInExternalStore(consumer.position(partition));
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
// readOffsetFromExternalStore 从外部存储读取提交位移的值
// seek方法指定消费者从该位移处消费
consumer.seek(partition, readOffsetFromExternalStore(partition));
}
// 更新rebalance完成总时长
totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get());
}
});
复制代码
如果启用了启动提交位移,用户可以不在
ConsumerRebalanceListener
监听器中手动提交唯一,consumer每次rebalance时,会检查用户是否启用了自动提交位移,如果是,它会自动帮用户提交,因此无需显式提交。
consumer要求rebalance在很短的时间内完成,因此在rebalance中不要放执行时间很长的逻辑,特别是一些阻塞方法。
解码序列化
解码序列化与Producer发送者发送时的序列化是互逆操作,即将对方序列化后的字节数组再恢复成原样子。
默认解序列化器
与Producer的序列化呼应,常用的deserializer如下:
- StringDeserializer:序列化String类型
- ByteBufferDeserializer:序列化ByteBuffer类型
- BytesDeserializer:序列化Kafka自定义的Bytes类
- DoubleDeserializer:序列化Double类型
- IntegerDeserializer:序列化Integer类型
- LongDeserializer:序列化Long类型
如果用户有更复杂的解序列化需求,可自行定义 deserializer 。
在构造Consumer对象时,指定相应的序列化值即可使用序列化:
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
复制代码
自定义解序列化器
先定义一个实现 Deserializer 接口的类:
public class UserDeserializer implements Deserializer<User> {
private ObjectMapper objectMapper;
@Override
public void configure(Map configs, boolean isKey) {
this.objectMapper = new ObjectMapper();
}
@Override
public User deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, User.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.objectMapper = null;
}
}
复制代码
指定consumer的value解码序列化为刚刚创建的类:
Properties props = new Properties();
// 必须指定
props.put("bootstrap.servers", "localhost:9092");
// 必须指定
props.put("group.id", "test-group");
// 必须指定
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必须指定
props.put("value.deserializer", "com.kafka.producer.UserDeserializer");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-user-topic"));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
// 获取分区
for (TopicPartition partition : records.partitions()) {
// 获取分区下的每条记录
List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
}
}
}
} finally {
consumer.close();
}
复制代码
与之前的Producer程序配合使用,得到消费结果:
多线程消费
与KafkaProducer不同,KafkaConsumer是非线程安全的,因此在实践过程中,推荐KafkaProducer单实例供多线程使用,
对于KafkaConsumer非线程安全有两种实践方式推荐:
1. 每个线程单独建立自己的KafkaConsumer
既然KafkaConsumer实例是非线程安全的,那么每个线程创建时,都各自创建一个仅自己使用的KafkaConsumer就可以避免问题了。
样例设计,先定义三个类:
- ConsumerRunnable 类:消费线程类,执行真正的消费任务
- ConsumerGroup 类:消费线程管理类,创建多个线程类执行消费任务
- ConsumerMain 类:测试主方法类
public class ConsumerMain {
static class ConsumerRunnable implements Runnable {
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
// 必须指定
props.put("bootstrap.servers", brokerList);
// 必须指定
props.put("group.id", groupId);
// 必须指定
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必须指定
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 获取分区
for (TopicPartition partition : records.partitions()) {
// 获取分区下的每条记录
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
}
}
}
} finally {
consumer.close();
}
}
}
static class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
// 创建好相应的Consumer,等待执行
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; i++) {
consumers.add(new ConsumerRunnable(brokerList, groupId, topic));
}
}
public void execute() {
// 放到不同的线程中执行消费
for (ConsumerRunnable consumer : consumers) {
new Thread(consumer).start();
}
}
}
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
int consumerNum = 3;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
复制代码
2. 单KafkaConsumer实例+多worker线程
大量的线程和对应的KafkaConsumer创建占用的资源相对也会较多,因此可以选择消息的获取与消息的处理逻辑进行解耦,在全局维护一个或若干个消费者实例进行消息获取,然后将消息的处理逻辑放入单独的工作者线程中进行就好。
样例设计,先定义三个类:
- ConsumerThreadHandler:consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合,consumer位移提交也在这里进行。
- ConsumerWorker:本质是一个Runnable,执行真正的业务逻辑处理,并上报位移信息给ConsumerThreadHandler。
- Main类:测试主方法类。
public class Main {
static class ConsumerWorker<K, V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
// 得到消息和位移map(用于上报位移)
public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
// 消费消息
for (ConsumerRecord<K, V> record : partitionRecords) {
System.out.println(record.value());
}
// 上报位移
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
long currentOffset = offsets.get(partition).offset();
if (currentOffset <= lastOffset + 1) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
static class ConsumerThreadHandler<K, V> {
private final KafkaConsumer<K, V> consumer;
private ExecutorService executorService;
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
// 必须指定
props.put("bootstrap.servers", brokerList);
// 必须指定
props.put("group.id", groupId);
// 必须指定
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必须指定
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");
// 关闭自动提交
props.put("enable.auto.commit", "false");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交位移
consumer.commitSync(offsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 清空位移
offsets.clear();
}
});
}
// 提交任务给worker运行
public void consumer(int threadNumber) {
executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
try {
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
// 读取到消息直接提交
executorService.submit(new ConsumerWorker<>(records, offsets));
}
// commit
commitOffsets();
}
} finally {
commitOffsets();
consumer.close();
}
}
// 提交唯位移
private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
synchronized (offsets) {
if (offsets.isEmpty()) {
return;
}
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
offsets.clear();
}
consumer.commitSync(unmodfiedMap);
}
public void close() {
// 终止消费者
consumer.wakeup();
// 停止线程池
executorService.shutdown();
}
}
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
// 创建1个消费者读取消息
ConsumerThreadHandler<String, String> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
// 开启4个worker运行业务处理
new Thread(() -> handler.consumer(Runtime.getRuntime().availableProcessors())).start();
try {
// 主线程休眠20秒
Thread.sleep(20000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 20秒后关闭所有的消费者和线程池
handler.close();
}
}
复制代码
3. 两种方法对比
两种方式各有利弊,用户可根据实际业务场景进行相应的选择:
优点缺点方法1(每个线程维护专属KafkaConsumer)实现简单;无线程间交互开销,速度较快;方便位移管理;易维护分区间的消息消费顺序Socket连接开销大;consumer数量受限于topic分区数,扩展性差;因为socket连接多,发送的请求也会多,所以broker端负载相对较高;rebalance可能性增大方法2(全局consumer + 多worker线程)消息获取与处理解耦;可独立拓展consumer数量和worker数量,伸缩性较好需要实现负载;分区间的消息消费顺序难以维护;处理链路变长,位移管理困难;worker线程异常可能导致消费数据丢失
独立consumer
consumer group 会自动帮用户执行分区分配和rebalance,对于需要多个consumer共同消费某topic的场景,使用group是最合适的,
如果用户需要严格控制某个consumer固定消费某些分区,场景如下:
- 由进程自己维护分区状态
- 进程自身保证高可用(可自行重启恢复错误,例如YARN、Mesos等容器调度框架),无需kafka完成错误检测和恢复
在这种情况下,consumer group则不适用,需要应用独立消费者(standalone consumer),standalone consumer 之间彼此独立工作,任意一个consumer崩溃不会影响其他的consumer。
独立消费者Demo
使用 consumer group 进行消息的消费时,我们使用
KafkaConsumer.subscribe
直接订阅topic,独立消费者使用
KafkaConsumer.assign
方法进行消费,
如果发生多次
KafkaConsumer.assign
调用,只有最后一次会生效,之前的会被覆盖,同时 assign 和 subscribe 不可以在同一个 consumer 中混用。
assign
方法接收一个分区列表,直接赋予 consumer 访问这些分区的权力,代码如下:
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
// 获取 my-user-topic 主题的分区列表
List<PartitionInfo> partitions = consumer.partitionsFor("my-user-topic");
// 创建一个list用于保存 TopicPartition 对象, 即consumer需要订阅的主题分区
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitions) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
try {
// 分区不为空
if (!topicPartitions.isEmpty()) {
// 使用 assign 进行订阅
consumer.assign(topicPartitions);
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
// 获取分区
for (TopicPartition partition : records.partitions()) {
// 获取分区下的每条记录
List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
}
}
}
}
} finally {
consumer.close();
}
}
复制代码
新/旧consumer对比
- 旧版本Consumer使用scala语言编写,新版本使用Java
- 旧版本Consumer依赖Zookeeper提交位移,新版本直接使用borker提供的topic,ZK本质只是一个协调服务组件,并不适合高并发的读写操作
- 旧版本的Consumer读取消息需要为每个分区都新建一个线程,新版本不需要
- 旧版本区分low-level 和 high-level 两个版本,前者没有consumer group的概念,而后者支持,新版本可通过不同方法的使用来实现是否支持group
总之旧版本已经不推荐使用,请在生产环境中使用新版本sdk。
版权归原作者 多动手,勤思考 所有, 如有侵权,请联系我们删除。