0


Kafka原理及应用实践,用心看这篇就够了【重点】

1.1 概述

Kafka

是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。

1.2 基础架构及术语

在这里插入图片描述
通过上面一张图,可能有的术语还不太清楚,下面我们一个一个的解释:

Producer

Producer

即生产者,消息的产生者,是消息的入口。

kafka cluster

Broker

Broker

kafka

实例,每个服务器上有一个或多个

kafka

的实例,我们姑且认为每个

broker

对应一台服务器。每个

kafka

集群内的

broker

都有一个不重复的编号,如图中的

broker-0

broker-1

等……

Topic

:消息的主题,可以理解为消息的分类,

kafka

的数据就保存在

topic

。在每个

broker

上都可以创建多个

topic

Partition

Topic

的分区,每个

topic

可以有多个分区,分区的作用是做负载,提高

kafka

的吞吐量。同一个

topic

在不同的分区的数据是不重复的,

partition

的表现形式就是一个一个的文件夹!

Replication

:每一个分区都有多个副本,副本的作用是做备胎。当主分区(

Leader

)故障的时候会选择一个备胎(

Follower

)上位,成为

Leader

。在

kafka

中默认副本的最大数量是

10

个,且副本的数量不能大于

Broker

的数量,

follower

leader

绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

Message

:每一条发送的消息主体。

Consumer

:消费者,即消息的消费方,是消息的出口。

Consumer Group

:我们可以将多个消费者组成一个消费者组,在

kafka

的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个

topic

的不同分区的数据,这也是为了提高

kafka

的吞吐量!

Zookeeper

kafka

集群依赖

zookeeper

来保存集群的的元信息,来保证系统的可用性。

1.3 工作流程分析

上面介绍了

kafka

的基础架构及基本概念,不知道大家看完有没有对

kafka

有个大致印象,如果对还比较懵也没关系!我们接下来再结合上面的结构图分析

kafka

的工作流程,最后再回来整个梳理一遍我相信你会更有收获!

1.3.1 发送数据

我们看上面的架构图中,

producer

就是生产者,是数据的入口。注意看图中的红色箭头,

Producer

在写入数据的时候永远的找

leader

,不会直接将数据写入

follower

!那

leader

怎么找呢?写入的流程又是什么样的呢?我们看下图:

在这里插入图片描述
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入

leader

后,

follower

是主动的去

leader

进行同步的!

producer

采用

push

模式将数据发布到

broker

,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

在这里插入图片描述
上面说到数据会写入到不同的分区,那

kafka

为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
  1、 方便扩展。因为一个

topic

可以有多个

partition

,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2、 提高并发。以

partition

为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

  熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在

kafka

中,如果某个

topic

有多个

partition

producer

又怎么知道该将数据发往哪个

partition

呢?

kafka

中有几个原则:
  1、

partition

在写入的时候可以指定需要写入的

partition

,如果有指定,则写入对应的

partition


  2、 如果没有指定

partition

,但是设置了数据的

key

,则会根据

key

的值

hash

出一个

partition


  3、 如果既没指定

partitio

n,又没有设置

key

,则会轮询选出一个

partition

保证消息不丢失是一个消息队列中间件的基本保证,那

producer

在向

kafka

写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过

ACK

应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认

kafka

接收到数据,这个参数可设置的值为

0、1、all

0

代表

producer

往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。

1

代表

producer

往集群发送数据只要

leader

应答就可以发送下一条,只确保

leader

发送成功。

all

代表

producer

往集群发送数据需要所有的

follower

都完成从

leader

的同步才会发送下一条,确保

leader

发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的

topic

写数据,能不能写入成功呢?

kafka

会自动创建

topic

,分区和副本的数量根据默认配置都是

1

1.3.2 保存数据

Producer

将数据写入

kafka

后,集群就需要对数据进行保存了!

kafka

将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。

Kafka

初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

1.3.2.1

Partition

结构

前面说过了每个

topic

都可以分为一个或多个

partition

,如果你觉得

topic

比较抽象,那

partition

就是比较具体的东西了!

Partition

在服务器上的表现形式就是一个一个的文件夹,每个

partition

的文件夹下面会有多组

segment

文件,每组

segment

文件又包含

.index

文件、

.log

文件、

.timeindex

文件(早期版本中没有)三个文件,

log

文件就实际是存储

message

的地方,而

index

timeindex

文件为索引文件,用于检索消息。

在这里插入图片描述
如上图,这个

partition

有三组

segment

文件,每个

log

文件的大小是一样的,但是存储的

message

数量是不一定相等的(每条的

message

大小不一致)。文件的命名是以该

segment

最小

offset

来命名的,如

000.index

存储

offset为0~368795

的消息,

kafka

就是利用分段+索引的方式来解决查找效率的问题。

1.3.2.2

Message

结构

上面说到

log

文件就实际是存储

message

的地方,我们在

producer

kafka

写入的也是一条一条的

message

,那存储在

log

中的

message

是什么样子的呢?消息主要包含消息体、消息大小、

offset

、压缩类型……等等!我们重点需要知道的是下面三个:
  1、

offset

offset

是一个占

8byte

的有序

id

号,它可以唯一确定每条消息在

parition

内的位置!
  2、 消息大小:消息大小占用

4byte

,用于描述消息的大小。
  3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

1.3.2.3 存储策略

无论消息是否被消费,

kafka

都会保存所有的消息。那对于旧数据有什么删除策略呢?
  1、 基于时间,默认配置是

168

小时(

7

天)。
  2、 基于大小,默认配置是

1073741824


  需要注意的是,

kafka

读取特定消息的时间复杂度是

O(1)

,所以这里删除过期的文件并不会提高

kafka

的性能!

1.3.3 消费数据

消息存储在

log

文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找

leader

去拉取。

多个消费者可以组成一个消费者组(

consumer group

),每个消费者组都有一个组

id

!同一个消费组者的消费者可以消费同一

topic

下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!是不是有点绕。我们看下图:

在这里插入图片描述
图示是消费者组内的消费者小于

partition

数量的情况,所以会出现某个消费者消费多个

partition

数据的情况,消费的速度也就不及只处理一个

partition

的消费者的处理速度!如果是消费者组的消费者多于

partition

的数量,那会不会出现多个消费者消费同一个

partition

的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何

partition

的数据。所以在实际的应用中,建议消费者组的

consumer``的数量与partition

的数量一致!
  在保存数据的小节里面,我们聊到了

partition

划分为多组

segment

,每个

segment

又包含

.log、.index、.timeindex

文件,存放的每条

message

包含

offset

、消息大小、消息体……我们多次提到

segment

offset

,查找消息的时候是怎么利用

segment+offset

配合查找的呢?假如现在需要查找一个

offset

368801

message

是什么样的过程呢?我们先看看下面的图:

在这里插入图片描述
1、先找到

offset

368801message

所在的

segment

文件(利用二分法查找),这里找到的就是在第二个

segment

文件。
2、打开找到的

segment

中的

.index

文件(也就是

368796.index

文件,该文件起始偏移量为

368796+1

,我们要查找的

offset

368801

message

在该

index

内的偏移量为

368796+5=368801

,所以这里要查找的相对

offset

5

)。由于该文件采用的是稀疏索引的方式存储着相对

offse

t及对应

message

物理偏移量的关系,所以直接找相对

offset

5

的索引找不到,这里同样利用二分法查找相对

offset

小于或者等于指定的相对

offset

的索引条目中最大的那个相对

offset

,所以找到的是相对

offset

4

的这个索引。
3、根据找到的相对

offset

4

的索引确定

message

存储的物理偏移位置为

256

。打开数据文件,从位置为

256

的那个地方开始顺序扫描直到找到

offset

368801

的那条

Message

这套机制是建立在

offset

为有序的基础上,利用

segment

+有序

offset

+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的

offset

维护

zookeeper

中,

consumer

每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的

offset

已经直接维护在

kafka

集群的

__consumer_offsets

这个

topic

中!

1.4

Kafka

安装

说明:教程使用

4

台服务器,

1

台用作

zookeeper

3

台用

kafka

必备条件:

jdk

1.4.1

zookeeper

下载安装

1、首先去官网下载

zookeeper

https://downloads.apache.org/zookeeper/

在这里插入图片描述
进入目录下载

在这里插入图片描述
2、下载好放到一台

zookeeper

服务器上面解压

tar zxvf apache-zookeeper-3.5.7-bin.tar.gz -C ./

在这里插入图片描述
3、进入

conf

目录下,复制

zoo_sample.cfg

文件,名字为

zoo.cfg

(不然启动找不到该文件)

cd ./apache-zookeeper-3.5.7-bin/conf
cp zoo_sample.cfg ./zoo.cfg

4、启动服务

#启动
sh zkServer.sh start
#查看启动状态
sh ./zkServer.sh status

启动成功

在这里插入图片描述
在这里插入图片描述

1.4.2

kafka

下载安装

1、首先去官网下载

kafka

http://kafka.apache.org/downloads

在这里插入图片描述
在这里插入图片描述
2、下载好放到

kafka

服务器上面解压

tar zxvf kafka_2.12-2.3.0.tgz -C ./

在这里插入图片描述
3、在

config

目录下配置集群

vi server.properties

1、把

broker.id

0,1,2

三台机器不一样

broker.id=0

在这里插入图片描述
2、把

listeners

生效,并加上本机

ip
listeners=PLAINTEXT://本机ip:9092

在这里插入图片描述
3、指定

zookeeper

连接地址,改为

zookeeper

服务器地址

zookeeper.connect=192.168.88.137:2181

在这里插入图片描述
4、启动

kafka

,三台机器启动

sh kafka-server-start.sh -daemon ../config/server.properties
#查看zookeeper
ps -ef|grep zookeeper

在这里插入图片描述

可以看

kafka

日志文件是否有报错
在这里插入图片描述
启动完成后,查看

zookeeper

集群连接情况

  • 进入zookeeperbin目录下执行sh zkCil.sh#查看 执行ls /#查看连接情况ls /brokers/ids

在这里插入图片描述
5、任意一台机器 新建

topic
sh kafka-topics.sh --create --zookeeper 192.168.88.137:2181--replication-factor 1--partitions 1--topic test
 
 #说明
 #192.168.88.137:2181  ###这是zookeeper服务ip+端口号
 #test ###这是topic

在这里插入图片描述
6、使用任意一台

kafka

服务器做生产者

sh kafka-console-producer.sh --broker-list 192.168.88.132:9092--topic test

7、使用三台

kafka

消费

./kafka-console-consumer.sh --bootstrap-server 192.168.88.132:9092--topic test --from-beginning

在这里插入图片描述
动态图如下所示:

在这里插入图片描述
如果也需要再搭建

zookeeper

集群可以参考以下文章,这里不做阐述:
https://www.cnblogs.com/panwenbin-logs/p/10369402.html

1.5

Kafka

实际应用

本节主要讲述在

Springboot

中如何正确的使用

Kafka

1.5.1 准备工作

1.5.1.1 网络配置

在项目中连接

kafka

,因为是外网,首先要开放

kafka

配置文件中的如下配置(其中IP为公网IP)

advertised.listeners=PLAINTEXT://112.126.74.249:9092

1.5.1.2

topic

主题创建准备

在开始前我们先创建两个

topic

topic1

topic2

,其分区和副本数都设置为

2

,用来测试

[root@iZ2zegzlkedbo3e64vkbefZ~]#  cd /usr/local/kafka-cluster/kafka1/bin/[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181--replication-factor 2--partitions 2--topic topic1
Created topic topic1.[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181--replication-factor 2--partitions 2--topic topic2
Created topic topic2.

当然我们也可以不手动创建

topic

,在执行代码

kafkaTemplate.send("topic1", normalMessage)

发送消息时,

kafka

会帮我们自动完成

topic

的创建工作,但这种情况下创建的

topic

默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化

topic

,如下所示:

@ConfigurationpublicclassKafkaInitialConfiguration{// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2@Beanpublic NewTopic initialTopic(){returnnewNewTopic("testtopic",8,(short)2);}
​
     // 如果要修改分区数,只需修改配置值重启项目即可// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小@Beanpublic NewTopic updateTopic(){returnnewNewTopic("testtopic",10,(short)2);}}

1.5.1.3 导入依赖修改配置文件

1、 引入

pom

依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、

application.propertise

配置(本文用到的配置项这里全列了出来)

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
​
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
​
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

1.5.3

Hello Kafka

简单示例

1.5.3.1 简单生产示例

@RestControllerpublicclassKafkaProducer{@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;
​
    // 发送消息@GetMapping("/kafka/normal/{message}")publicvoidsendMessage1(@PathVariable("message") String normalMessage){
        kafkaTemplate.send("topic1", normalMessage);}}

1.5.3.2 简单消费示例

@ComponentpublicclassKafkaConsumer{// 消费监听@KafkaListener(topics ={"topic1"})publicvoidonMessage1(ConsumerRecord<?,?> record){// 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());}}

上面示例创建了一个生产者,发送消息到

topic1

,消费者监听

topic1

消费消息。监听器用

@KafkaListener

注解,

topics

表示监听的

topic

,支持同时监听多个,用英文逗号分隔。启动项目,

postman

调接口触发生产者发送消息:

在这里插入图片描述
可以看到监听器消费成功:

在这里插入图片描述

1.5.4 详解生产者

1.5.4.1 带回调的生产者

kafkaTemplate

提供了一个回调方法

addCallback

,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。

@GetMapping("/kafka/callbackOne/{message}")publicvoidsendMessage2(@PathVariable("message") String callbackMessage){
    kafkaTemplate.send("topic1", callbackMessage).addCallback(success ->{// 消息发送到的topic
        String topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:"+ topic +"-"+ partition +"-"+ offset);}, failure ->{
        System.out.println("发送消息失败:"+ failure.getMessage());});}@GetMapping("/kafka/callbackTwo/{message}")publicvoidsendMessage3(@PathVariable("message") String callbackMessage){
    kafkaTemplate.send("topic1", callbackMessage).addCallback(newListenableFutureCallback<SendResult<String, Object>>(){@OverridepublicvoidonFailure(Throwable ex){
            System.out.println("发送消息失败:"+ex.getMessage());}@OverridepublicvoidonSuccess(SendResult<String, Object> result){
            System.out.println("发送消息成功:"+ result.getRecordMetadata().topic()+"-"+ result.getRecordMetadata().partition()+"-"+ result.getRecordMetadata().offset());}});}

1.5.4.2 自定义分区器

我们知道,

kafka

中每个

topic

被划分为多个分区,那么生产者将消息发送到

topic

时,具体追加到哪个分区呢?这就是所谓的分区策略,

Kafka

为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息

append

到指定分区;
2、若发送消息时未指定

patition

,但指定了

key

kafka

允许为每条消息设置一个

key

),则对

key

值进行

hash

计算,根据计算结果路由到指定分区,这种情况下可以保证同一个

Key

的所有消息都进入到相同的分区;
3、

patition

key

都未指定,则使用

kafka

默认的分区策略,轮询选出一个

patition

我们来**自定义一个分区策略,将消息发送到我们指定的

partition

,首先新建一个分区器类实现

Partitioner

接口,重写方法,其中

partition

方法的返回值就表示将消息发送到几号分区**。

publicclassCustomizePartitionerimplementsPartitioner{@Overridepublicintpartition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster){// 自定义分区规则(这里假设全部发到0号分区)// ......return0;}
​
    @Overridepublicvoidclose(){
​
    }
​
    @Overridepublicvoidconfigure(Map<String,?> configs){
​
    }}

application.propertise

中配置自定义分区器,配置的值就是分区器类的全路径名:

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

1.5.4.3

kafka

事务提交

如果在发送消息时需要创建事务,可以使用

KafkaTemplate

executeInTransaction

方法来声明事务:

@GetMapping("/kafka/transaction")publicvoidsendMessage7(){// 声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(operations ->{
        operations.send("topic1","test executeInTransaction");thrownewRuntimeException("fail");});
​
    // 不声明事务:后面报错但前面消息已经发送成功了
   kafkaTemplate.send("topic1","test executeInTransaction");thrownewRuntimeException("fail");}

**

Springboot kafka

事务注意事项**
采用

kafkatemplate

发送事务消息,需要配置地方
1、

spring.kafka.producer.transaction-id-prefix=kafka-tran

2、

spring.kafka.producer.retries=1

—这个必须大于

0

,可以配置

1

或者

all

这个头不为空,会在默认的

producerfactory

kafkatemplate

初始化中用到

DefaultKafkaProducerFactory
 
publicbooleantransactionCapable(){returnthis.transactionIdPrefix != null;}
 
KafkaTemplate
 
publicKafkaTemplate(ProducerFactory<K, V> producerFactory,boolean autoFlush){this.producerFactory = producerFactory;this.autoFlush = autoFlush;this.transactional = producerFactory.transactionCapable();}//这样就可以发送带事务的消息了,不需要@Transtractional注解,//且kafkaTemplate.send()等不带事务的消息是无法发送的,直接报异常publicvoidsendMessageTransactional(){
    String jsonMessage =buildMessage();//局部开启事务
    kafkaTemplate.executeInTransaction(operations ->{
        operations.send(topic,1,"key2", jsonMessage);returntrue;});
    logger.info("已发送事务消息。。。。");}

1.5.5 详解消费者

1.5.5.1 指定

topic、partition、offset

消费

前面我们在监听消费

topic1

的时候,监听的是

topic1

上所有的消息,如果我们想指定

topic

、指定

partition

、指定

offset

来消费呢?也很简单,

@KafkaListener

注解已全部为我们提供,

/**
 * @Title 指定topic、partition、offset消费
 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
 * @Param [record]
 * @return void
 **/@KafkaListener(id ="consumer1",groupId ="felix-group",topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0"}),@TopicPartition(topic ="topic2", partitions ="0", partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="8"))})publicvoidonMessage2(ConsumerRecord<?,?> record){
    System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());}

属性解释:

id

:消费者

ID

groupId

:消费组

ID

topics

:监听的

topic

,可监听多个;

topicPartitions

:可配置更加详细的监听信息,可指定

topic

parition

offset

监听。

上面

onMessage2

监听的含义:监听

topic1

0

号分区,同时监听

topic2

0

号分区和

topic2

1

号分区里面

offset

8

开始的消息。

注意:

topics

topicPartitions

不能同时使用;

1.5.5.2 批量消费

设置

application.prpertise

开启批量消费即可,

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

接收消息时用

List

来接收,监听代码如下:

@KafkaListener(id ="consumer2",groupId ="felix-group", topics ="topic1")publicvoidonMessage3(List<ConsumerRecord<?,?>> records){
    System.out.println(">>>批量消费一次,records.size()="+records.size());for(ConsumerRecord<?,?> record : records){
        System.out.println(record.value());}}

1.5.5.3

ConsumerAwareListenerErrorHandler

异常处理器

通过异常处理器,我们可以处理

consumer

在消费时发生的异常。

新建一个

ConsumerAwareListenerErrorHandler

类型的异常处理方法,用

@Bean

注入,

BeanName

默认就是方法名,然后我们将这个异常处理器的

BeanName

放到

@KafkaListener

注解的

errorHandler

属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

// 新建一个异常处理器,用@Bean注入@Beanpublic ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){return(message, exception, consumer)->{
        System.out.println("消费异常:"+message.getPayload());return null;};}
​
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面@KafkaListener(topics ={"topic1"},errorHandler ="consumerAwareErrorHandler")publicvoidonMessage4(ConsumerRecord<?,?> record)throws Exception {thrownewException("简单消费-模拟异常");}
​
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息@KafkaListener(topics ="topic1",errorHandler="consumerAwareErrorHandler")publicvoidonMessage5(List<ConsumerRecord<?,?>> records)throws Exception {
    System.out.println("批量消费一次...");thrownewException("批量消费-模拟异常");}

执行看一下效果:

在这里插入图片描述

1.5.5.4 消息过滤器

消息过滤器可以在消息抵达

consumer

之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由

KafkaListener

处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个

RecordFilterStrategy

(消息过滤策略),返回

true

的时候消息将会被抛弃,返回

false

时,消息能正常抵达监听容器。

@ComponentpublicclassKafkaConsumer{@Autowired
    ConsumerFactory consumerFactory;
​
    // 消息过滤器@Beanpublic ConcurrentKafkaListenerContainerFactory filterContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);// 消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord ->{if(Integer.parseInt(consumerRecord.value().toString())%2==0){returnfalse;}//返回true消息则被过滤returntrue;});return factory;}
​
    // 消息过滤监听@KafkaListener(topics ={"topic1"},containerFactory ="filterContainerFactory")publicvoidonMessage6(ConsumerRecord<?,?> record){
        System.out.println(record.value());}}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向

topic1

发送

0-99

总共

100

条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数。

在这里插入图片描述

1.5.5.5 消息转发

在实际开发中,我们可能有这样的需求,应用

A

TopicA

获取到消息,经过处理后转发到

TopicB

,再由应用

B

监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

SpringBoot

集成

Kafka

实现消息的转发也很简单,只需要通过一个

@SendTo

注解,被注解方法的

return

值即转发的消息内容,如下,

/**
 * @Title 消息转发
 * @Description 从topic1接收到的消息经过处理后转发到topic2
 * @Param [record]
 * @return void
 **/@KafkaListener(topics ={"topic1"})@SendTo("topic2")public String onMessage7(ConsumerRecord<?,?> record){return record.value()+"-forward message";}

1.5.5.6 定时启动、停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定

topic

的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用

KafkaListenerEndpointRegistry

,下面我们就来实现:

1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个定时任务类,用注解

@EnableScheduling

声明,

KafkaListenerEndpointRegistry

SpringIO

中已经被注册为

Bean

,直接注入,设置禁止

KafkaListener

自启动,

@EnableScheduling@ComponentpublicclassCronTimer{
​
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/@Autowiredprivate KafkaListenerEndpointRegistry registry;
​
    @Autowiredprivate ConsumerFactory consumerFactory;
​
    // 监听器容器工厂(设置禁止KafkaListener自启动)@Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory(){
        ConcurrentKafkaListenerContainerFactory container =newConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动
        container.setAutoStartup(false);return container;}
​
    // 监听器@KafkaListener(id="timingConsumer",topics ="topic1",containerFactory ="delayContainerFactory")publicvoidonMessage1(ConsumerRecord<?,?> record){
        System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());}
​
    // 定时启动监听器@Scheduled(cron ="0 42 11 * * ? ")publicvoidstartListener(){
        System.out.println("启动监听器...");// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器if(!registry.getListenerContainer("timingConsumer").isRunning()){
            registry.getListenerContainer("timingConsumer").start();}//重新开始//registry.getListenerContainer("timingConsumer").resume();}
​
    // 定时停止监听器@Scheduled(cron ="0 45 11 * * ? ")publicvoidshutDownListener(){
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();}}

启动项目,触发生产者向

topic1

发送消息,可以看到

consumer

没有消费,因为这时监听器还没有开始工作。

在这里插入图片描述
11:42分监听器启动开始工作,消费消息

在这里插入图片描述
在这里插入图片描述

11:45分监听器停止工作

在这里插入图片描述

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_42039228/article/details/123542494
版权归原作者 __奋斗的卡卡 所有, 如有侵权,请联系我们删除。

“Kafka原理及应用实践,用心看这篇就够了【重点】”的评论:

还没有评论