1.1 概述
Kafka
是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。
1.2 基础架构及术语
通过上面一张图,可能有的术语还不太清楚,下面我们一个一个的解释:
Producer
:
Producer
即生产者,消息的产生者,是消息的入口。
kafka cluster
:
Broker
:
Broker
是
kafka
实例,每个服务器上有一个或多个
kafka
的实例,我们姑且认为每个
broker
对应一台服务器。每个
kafka
集群内的
broker
都有一个不重复的编号,如图中的
broker-0
、
broker-1
等……
Topic
:消息的主题,可以理解为消息的分类,
kafka
的数据就保存在
topic
。在每个
broker
上都可以创建多个
topic
。
Partition
:
Topic
的分区,每个
topic
可以有多个分区,分区的作用是做负载,提高
kafka
的吞吐量。同一个
topic
在不同的分区的数据是不重复的,
partition
的表现形式就是一个一个的文件夹!
Replication
:每一个分区都有多个副本,副本的作用是做备胎。当主分区(
Leader
)故障的时候会选择一个备胎(
Follower
)上位,成为
Leader
。在
kafka
中默认副本的最大数量是
10
个,且副本的数量不能大于
Broker
的数量,
follower
和
leader
绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message
:每一条发送的消息主体。
Consumer
:消费者,即消息的消费方,是消息的出口。
Consumer Group
:我们可以将多个消费者组成一个消费者组,在
kafka
的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个
topic
的不同分区的数据,这也是为了提高
kafka
的吞吐量!
Zookeeper
:
kafka
集群依赖
zookeeper
来保存集群的的元信息,来保证系统的可用性。
1.3 工作流程分析
上面介绍了
kafka
的基础架构及基本概念,不知道大家看完有没有对
kafka
有个大致印象,如果对还比较懵也没关系!我们接下来再结合上面的结构图分析
kafka
的工作流程,最后再回来整个梳理一遍我相信你会更有收获!
1.3.1 发送数据
我们看上面的架构图中,
producer
就是生产者,是数据的入口。注意看图中的红色箭头,
Producer
在写入数据的时候永远的找
leader
,不会直接将数据写入
follower
!那
leader
怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入
leader
后,
follower
是主动的去
leader
进行同步的!
producer
采用
push
模式将数据发布到
broker
,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那
kafka
为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
1、 方便扩展。因为一个
topic
可以有多个
partition
,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
2、 提高并发。以
partition
为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在
kafka
中,如果某个
topic
有多个
partition
,
producer
又怎么知道该将数据发往哪个
partition
呢?
kafka
中有几个原则:
1、
partition
在写入的时候可以指定需要写入的
partition
,如果有指定,则写入对应的
partition
。
2、 如果没有指定
partition
,但是设置了数据的
key
,则会根据
key
的值
hash
出一个
partition
。
3、 如果既没指定
partitio
n,又没有设置
key
,则会轮询选出一个
partition
。
保证消息不丢失是一个消息队列中间件的基本保证,那
producer
在向
kafka
写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过
ACK
应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认
kafka
接收到数据,这个参数可设置的值为
0、1、all
。
0
代表
producer
往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1
代表
producer
往集群发送数据只要
leader
应答就可以发送下一条,只确保
leader
发送成功。
all
代表
producer
往集群发送数据需要所有的
follower
都完成从
leader
的同步才会发送下一条,确保
leader
发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的
topic
写数据,能不能写入成功呢?
kafka
会自动创建
topic
,分区和副本的数量根据默认配置都是
1
。
1.3.2 保存数据
Producer
将数据写入
kafka
后,集群就需要对数据进行保存了!
kafka
将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。
Kafka
初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
1.3.2.1
Partition
结构
前面说过了每个
topic
都可以分为一个或多个
partition
,如果你觉得
topic
比较抽象,那
partition
就是比较具体的东西了!
Partition
在服务器上的表现形式就是一个一个的文件夹,每个
partition
的文件夹下面会有多组
segment
文件,每组
segment
文件又包含
.index
文件、
.log
文件、
.timeindex
文件(早期版本中没有)三个文件,
log
文件就实际是存储
message
的地方,而
index
和
timeindex
文件为索引文件,用于检索消息。
如上图,这个
partition
有三组
segment
文件,每个
log
文件的大小是一样的,但是存储的
message
数量是不一定相等的(每条的
message
大小不一致)。文件的命名是以该
segment
最小
offset
来命名的,如
000.index
存储
offset为0~368795
的消息,
kafka
就是利用分段+索引的方式来解决查找效率的问题。
1.3.2.2
Message
结构
上面说到
log
文件就实际是存储
message
的地方,我们在
producer
往
kafka
写入的也是一条一条的
message
,那存储在
log
中的
message
是什么样子的呢?消息主要包含消息体、消息大小、
offset
、压缩类型……等等!我们重点需要知道的是下面三个:
1、
offset
:
offset
是一个占
8byte
的有序
id
号,它可以唯一确定每条消息在
parition
内的位置!
2、 消息大小:消息大小占用
4byte
,用于描述消息的大小。
3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
1.3.2.3 存储策略
无论消息是否被消费,
kafka
都会保存所有的消息。那对于旧数据有什么删除策略呢?
1、 基于时间,默认配置是
168
小时(
7
天)。
2、 基于大小,默认配置是
1073741824
。
需要注意的是,
kafka
读取特定消息的时间复杂度是
O(1)
,所以这里删除过期的文件并不会提高
kafka
的性能!
1.3.3 消费数据
消息存储在
log
文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找
leader
去拉取。
多个消费者可以组成一个消费者组(
consumer group
),每个消费者组都有一个组
id
!同一个消费组者的消费者可以消费同一
topic
下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!是不是有点绕。我们看下图:
图示是消费者组内的消费者小于
partition
数量的情况,所以会出现某个消费者消费多个
partition
数据的情况,消费的速度也就不及只处理一个
partition
的消费者的处理速度!如果是消费者组的消费者多于
partition
的数量,那会不会出现多个消费者消费同一个
partition
的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何
partition
的数据。所以在实际的应用中,建议消费者组的
consumer``的数量与partition
的数量一致!
在保存数据的小节里面,我们聊到了
partition
划分为多组
segment
,每个
segment
又包含
.log、.index、.timeindex
文件,存放的每条
message
包含
offset
、消息大小、消息体……我们多次提到
segment
和
offset
,查找消息的时候是怎么利用
segment+offset
配合查找的呢?假如现在需要查找一个
offset
为
368801
的
message
是什么样的过程呢?我们先看看下面的图:
1、先找到
offset
的
368801message
所在的
segment
文件(利用二分法查找),这里找到的就是在第二个
segment
文件。
2、打开找到的
segment
中的
.index
文件(也就是
368796.index
文件,该文件起始偏移量为
368796+1
,我们要查找的
offset
为
368801
的
message
在该
index
内的偏移量为
368796+5=368801
,所以这里要查找的相对
offset
为
5
)。由于该文件采用的是稀疏索引的方式存储着相对
offse
t及对应
message
物理偏移量的关系,所以直接找相对
offset
为
5
的索引找不到,这里同样利用二分法查找相对
offset
小于或者等于指定的相对
offset
的索引条目中最大的那个相对
offset
,所以找到的是相对
offset
为
4
的这个索引。
3、根据找到的相对
offset
为
4
的索引确定
message
存储的物理偏移位置为
256
。打开数据文件,从位置为
256
的那个地方开始顺序扫描直到找到
offset
为
368801
的那条
Message
。
这套机制是建立在
offset
为有序的基础上,利用
segment
+有序
offset
+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的
offset
维护
zookeeper
中,
consumer
每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的
offset
已经直接维护在
kafka
集群的
__consumer_offsets
这个
topic
中!
1.4
Kafka
安装
说明:教程使用
4
台服务器,
1
台用作
zookeeper
,
3
台用
kafka
必备条件:
jdk
1.4.1
zookeeper
下载安装
1、首先去官网下载
zookeeper
:
https://downloads.apache.org/zookeeper/
进入目录下载
2、下载好放到一台
zookeeper
服务器上面解压
tar zxvf apache-zookeeper-3.5.7-bin.tar.gz -C ./
3、进入
conf
目录下,复制
zoo_sample.cfg
文件,名字为
zoo.cfg
(不然启动找不到该文件)
cd ./apache-zookeeper-3.5.7-bin/conf
cp zoo_sample.cfg ./zoo.cfg
4、启动服务
#启动
sh zkServer.sh start
#查看启动状态
sh ./zkServer.sh status
启动成功
1.4.2
kafka
下载安装
1、首先去官网下载
kafka
:
http://kafka.apache.org/downloads
2、下载好放到
kafka
服务器上面解压
tar zxvf kafka_2.12-2.3.0.tgz -C ./
3、在
config
目录下配置集群
vi server.properties
1、把
broker.id
改
0,1,2
三台机器不一样
broker.id=0
2、把listeners
生效,并加上本机
ip
listeners=PLAINTEXT://本机ip:9092
3、指定zookeeper
连接地址,改为
zookeeper
服务器地址
zookeeper.connect=192.168.88.137:2181
4、启动kafka
,三台机器启动
sh kafka-server-start.sh -daemon ../config/server.properties
#查看zookeeper
ps -ef|grep zookeeper
可以看
kafka
日志文件是否有报错
启动完成后,查看
zookeeper
集群连接情况
- 进入
zookeeper
的bin
目录下执行sh zkCil.sh#查看 执行ls /#查看连接情况ls /brokers/ids
5、任意一台机器 新建
topic
sh kafka-topics.sh --create --zookeeper 192.168.88.137:2181--replication-factor 1--partitions 1--topic test
#说明
#192.168.88.137:2181 ###这是zookeeper服务ip+端口号
#test ###这是topic
6、使用任意一台
kafka
服务器做生产者
sh kafka-console-producer.sh --broker-list 192.168.88.132:9092--topic test
7、使用三台
kafka
消费
./kafka-console-consumer.sh --bootstrap-server 192.168.88.132:9092--topic test --from-beginning
动态图如下所示:
如果也需要再搭建
zookeeper
集群可以参考以下文章,这里不做阐述:
https://www.cnblogs.com/panwenbin-logs/p/10369402.html
1.5
Kafka
实际应用
本节主要讲述在
Springboot
中如何正确的使用
Kafka
1.5.1 准备工作
1.5.1.1 网络配置
在项目中连接
kafka
,因为是外网,首先要开放
kafka
配置文件中的如下配置(其中IP为公网IP)
advertised.listeners=PLAINTEXT://112.126.74.249:9092
1.5.1.2
topic
主题创建准备
在开始前我们先创建两个
topic
:
topic1
、
topic2
,其分区和副本数都设置为
2
,用来测试
[root@iZ2zegzlkedbo3e64vkbefZ~]# cd /usr/local/kafka-cluster/kafka1/bin/[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181--replication-factor 2--partitions 2--topic topic1
Created topic topic1.[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181--replication-factor 2--partitions 2--topic topic2
Created topic topic2.
当然我们也可以不手动创建
topic
,在执行代码
kafkaTemplate.send("topic1", normalMessage)
发送消息时,
kafka
会帮我们自动完成
topic
的创建工作,但这种情况下创建的
topic
默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化
topic
,如下所示:
@ConfigurationpublicclassKafkaInitialConfiguration{// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2@Beanpublic NewTopic initialTopic(){returnnewNewTopic("testtopic",8,(short)2);}
// 如果要修改分区数,只需修改配置值重启项目即可// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小@Beanpublic NewTopic updateTopic(){returnnewNewTopic("testtopic",10,(short)2);}}
1.5.1.3 导入依赖修改配置文件
1、 引入
pom
依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2、
application.propertise
配置(本文用到的配置项这里全列了出来)
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
1.5.3
Hello Kafka
简单示例
1.5.3.1 简单生产示例
@RestControllerpublicclassKafkaProducer{@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息@GetMapping("/kafka/normal/{message}")publicvoidsendMessage1(@PathVariable("message") String normalMessage){
kafkaTemplate.send("topic1", normalMessage);}}
1.5.3.2 简单消费示例
@ComponentpublicclassKafkaConsumer{// 消费监听@KafkaListener(topics ={"topic1"})publicvoidonMessage1(ConsumerRecord<?,?> record){// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());}}
上面示例创建了一个生产者,发送消息到
topic1
,消费者监听
topic1
消费消息。监听器用
@KafkaListener
注解,
topics
表示监听的
topic
,支持同时监听多个,用英文逗号分隔。启动项目,
postman
调接口触发生产者发送消息:
可以看到监听器消费成功:
1.5.4 详解生产者
1.5.4.1 带回调的生产者
kafkaTemplate
提供了一个回调方法
addCallback
,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。
@GetMapping("/kafka/callbackOne/{message}")publicvoidsendMessage2(@PathVariable("message") String callbackMessage){
kafkaTemplate.send("topic1", callbackMessage).addCallback(success ->{// 消息发送到的topic
String topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:"+ topic +"-"+ partition +"-"+ offset);}, failure ->{
System.out.println("发送消息失败:"+ failure.getMessage());});}@GetMapping("/kafka/callbackTwo/{message}")publicvoidsendMessage3(@PathVariable("message") String callbackMessage){
kafkaTemplate.send("topic1", callbackMessage).addCallback(newListenableFutureCallback<SendResult<String, Object>>(){@OverridepublicvoidonFailure(Throwable ex){
System.out.println("发送消息失败:"+ex.getMessage());}@OverridepublicvoidonSuccess(SendResult<String, Object> result){
System.out.println("发送消息成功:"+ result.getRecordMetadata().topic()+"-"+ result.getRecordMetadata().partition()+"-"+ result.getRecordMetadata().offset());}});}
1.5.4.2 自定义分区器
我们知道,
kafka
中每个
topic
被划分为多个分区,那么生产者将消息发送到
topic
时,具体追加到哪个分区呢?这就是所谓的分区策略,
Kafka
为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
1、若发送消息时指定了分区(即自定义分区策略),则直接将消息
append
到指定分区;
2、若发送消息时未指定
patition
,但指定了
key
(
kafka
允许为每条消息设置一个
key
),则对
key
值进行
hash
计算,根据计算结果路由到指定分区,这种情况下可以保证同一个
Key
的所有消息都进入到相同的分区;
3、
patition
和
key
都未指定,则使用
kafka
默认的分区策略,轮询选出一个
patition
;
我们来**自定义一个分区策略,将消息发送到我们指定的
partition
,首先新建一个分区器类实现
Partitioner
接口,重写方法,其中
partition
方法的返回值就表示将消息发送到几号分区**。
publicclassCustomizePartitionerimplementsPartitioner{@Overridepublicintpartition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster){// 自定义分区规则(这里假设全部发到0号分区)// ......return0;}
@Overridepublicvoidclose(){
}
@Overridepublicvoidconfigure(Map<String,?> configs){
}}
在
application.propertise
中配置自定义分区器,配置的值就是分区器类的全路径名:
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
1.5.4.3
kafka
事务提交
如果在发送消息时需要创建事务,可以使用
KafkaTemplate
的
executeInTransaction
方法来声明事务:
@GetMapping("/kafka/transaction")publicvoidsendMessage7(){// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations ->{
operations.send("topic1","test executeInTransaction");thrownewRuntimeException("fail");});
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");thrownewRuntimeException("fail");}
**
Springboot kafka
事务注意事项**
采用
kafkatemplate
发送事务消息,需要配置地方
1、
spring.kafka.producer.transaction-id-prefix=kafka-tran
2、
spring.kafka.producer.retries=1
—这个必须大于
0
,可以配置
1
或者
all
这个头不为空,会在默认的
producerfactory
及
kafkatemplate
初始化中用到
DefaultKafkaProducerFactory
publicbooleantransactionCapable(){returnthis.transactionIdPrefix != null;}
KafkaTemplate
publicKafkaTemplate(ProducerFactory<K, V> producerFactory,boolean autoFlush){this.producerFactory = producerFactory;this.autoFlush = autoFlush;this.transactional = producerFactory.transactionCapable();}//这样就可以发送带事务的消息了,不需要@Transtractional注解,//且kafkaTemplate.send()等不带事务的消息是无法发送的,直接报异常publicvoidsendMessageTransactional(){
String jsonMessage =buildMessage();//局部开启事务
kafkaTemplate.executeInTransaction(operations ->{
operations.send(topic,1,"key2", jsonMessage);returntrue;});
logger.info("已发送事务消息。。。。");}
1.5.5 详解消费者
1.5.5.1 指定
topic、partition、offset
消费
前面我们在监听消费
topic1
的时候,监听的是
topic1
上所有的消息,如果我们想指定
topic
、指定
partition
、指定
offset
来消费呢?也很简单,
@KafkaListener
注解已全部为我们提供,
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @Param [record]
* @return void
**/@KafkaListener(id ="consumer1",groupId ="felix-group",topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0"}),@TopicPartition(topic ="topic2", partitions ="0", partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="8"))})publicvoidonMessage2(ConsumerRecord<?,?> record){
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());}
属性解释:
id
:消费者
ID
;
groupId
:消费组
ID
;
topics
:监听的
topic
,可监听多个;
topicPartitions
:可配置更加详细的监听信息,可指定
topic
、
parition
、
offset
监听。
上面
onMessage2
监听的含义:监听
topic1
的
0
号分区,同时监听
topic2
的
0
号分区和
topic2
的
1
号分区里面
offset
从
8
开始的消息。
注意:
topics
和
topicPartitions
不能同时使用;
1.5.5.2 批量消费
设置
application.prpertise
开启批量消费即可,
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
接收消息时用
List
来接收,监听代码如下:
@KafkaListener(id ="consumer2",groupId ="felix-group", topics ="topic1")publicvoidonMessage3(List<ConsumerRecord<?,?>> records){
System.out.println(">>>批量消费一次,records.size()="+records.size());for(ConsumerRecord<?,?> record : records){
System.out.println(record.value());}}
1.5.5.3
ConsumerAwareListenerErrorHandler
异常处理器
通过异常处理器,我们可以处理
consumer
在消费时发生的异常。
新建一个
ConsumerAwareListenerErrorHandler
类型的异常处理方法,用
@Bean
注入,
BeanName
默认就是方法名,然后我们将这个异常处理器的
BeanName
放到
@KafkaListener
注解的
errorHandler
属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
// 新建一个异常处理器,用@Bean注入@Beanpublic ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){return(message, exception, consumer)->{
System.out.println("消费异常:"+message.getPayload());return null;};}
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面@KafkaListener(topics ={"topic1"},errorHandler ="consumerAwareErrorHandler")publicvoidonMessage4(ConsumerRecord<?,?> record)throws Exception {thrownewException("简单消费-模拟异常");}
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息@KafkaListener(topics ="topic1",errorHandler="consumerAwareErrorHandler")publicvoidonMessage5(List<ConsumerRecord<?,?>> records)throws Exception {
System.out.println("批量消费一次...");thrownewException("批量消费-模拟异常");}
执行看一下效果:
1.5.5.4 消息过滤器
消息过滤器可以在消息抵达
consumer
之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由
KafkaListener
处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个
RecordFilterStrategy
(消息过滤策略),返回
true
的时候消息将会被抛弃,返回
false
时,消息能正常抵达监听容器。
@ComponentpublicclassKafkaConsumer{@Autowired
ConsumerFactory consumerFactory;
// 消息过滤器@Beanpublic ConcurrentKafkaListenerContainerFactory filterContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord ->{if(Integer.parseInt(consumerRecord.value().toString())%2==0){returnfalse;}//返回true消息则被过滤returntrue;});return factory;}
// 消息过滤监听@KafkaListener(topics ={"topic1"},containerFactory ="filterContainerFactory")publicvoidonMessage6(ConsumerRecord<?,?> record){
System.out.println(record.value());}}
上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向
topic1
发送
0-99
总共
100
条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数。
1.5.5.5 消息转发
在实际开发中,我们可能有这样的需求,应用
A
从
TopicA
获取到消息,经过处理后转发到
TopicB
,再由应用
B
监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在
SpringBoot
集成
Kafka
实现消息的转发也很简单,只需要通过一个
@SendTo
注解,被注解方法的
return
值即转发的消息内容,如下,
/**
* @Title 消息转发
* @Description 从topic1接收到的消息经过处理后转发到topic2
* @Param [record]
* @return void
**/@KafkaListener(topics ={"topic1"})@SendTo("topic2")public String onMessage7(ConsumerRecord<?,?> record){return record.value()+"-forward message";}
1.5.5.6 定时启动、停止监听器
默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定
topic
的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用
KafkaListenerEndpointRegistry
,下面我们就来实现:
1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解
@EnableScheduling
声明,
KafkaListenerEndpointRegistry
在
SpringIO
中已经被注册为
Bean
,直接注入,设置禁止
KafkaListener
自启动,
@EnableScheduling@ComponentpublicclassCronTimer{
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/@Autowiredprivate KafkaListenerEndpointRegistry registry;
@Autowiredprivate ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)@Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory(){
ConcurrentKafkaListenerContainerFactory container =newConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动
container.setAutoStartup(false);return container;}
// 监听器@KafkaListener(id="timingConsumer",topics ="topic1",containerFactory ="delayContainerFactory")publicvoidonMessage1(ConsumerRecord<?,?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());}
// 定时启动监听器@Scheduled(cron ="0 42 11 * * ? ")publicvoidstartListener(){
System.out.println("启动监听器...");// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器if(!registry.getListenerContainer("timingConsumer").isRunning()){
registry.getListenerContainer("timingConsumer").start();}//重新开始//registry.getListenerContainer("timingConsumer").resume();}
// 定时停止监听器@Scheduled(cron ="0 45 11 * * ? ")publicvoidshutDownListener(){
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();}}
启动项目,触发生产者向
topic1
发送消息,可以看到
consumer
没有消费,因为这时监听器还没有开始工作。
11:42分监听器启动开始工作,消费消息
11:45分监听器停止工作
版权归原作者 __奋斗的卡卡 所有, 如有侵权,请联系我们删除。