0


【应用】SpringBoot 整合 Kafka

SpringBoot 整合 Kafka

基本使用-简单的生产消费

项目的基本构建

新建一个 maven 项目,引入 kafka 依赖,pom 文件内容如下

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

编写配置文件 application.yml,进行 kafka 的基本配置

spring:# Kafka 基本配置kafka:# ===============Kafka 服务器配置===============bootstrap-servers: localhost:9092listener:# 消费者监听的topic不存在时,项目会报错,设置为falsemissing-topics-fatal:false# 设置批量消费消息(不批量处理时需要注释掉)#type: batch# ===============消费者配置===============consumer:# 是否自动提交偏移量offsetenable-auto-commit:true# 提交offset延时auto:commit:intervals:ms:1000# 当kafka中没有初始offset或offset超出范围时将自动重置offset# (1) earliest:重置为分区中最小的offset;# (2) latest:重置为分区中最新的offset(消费分区中新产生的数据);# (3) none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latest
      # 消息的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 设置批量消费的消息数max-poll-records:10properties:# 消费会话超时时间session:timeout:ms:120000# 消费请求超时时间request:timeout:ms:180000# 默认的消费组idgroup:id: defaultConsumerGroup
    # ===============生产者配置===============producer:# 重试次数retries:1# 应答级别:多少个分区副本完成备份后向生产者发送应答消息(可选0、1、all/-1)acks:-1# 批量大小batch-size:16384# 生产端缓冲区大小buffer-memory:33554432# 消息的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 设置生成事务的前缀(使用事务时开启,不使用则注释掉)#transaction-id-prefix: transaction_properties:# 提交延时# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka# linger.ms为0表示每接收到一条消息就提交给kafka,此时batch-size失效linger:ms:0# 自定义分区器#partitioner:#class: com.zqf.config.MyPartitioner # 配置自定义分区策略后所有消息都在 partition 0

创建启动类

@SpringBootApplicationpublicclassKafkaDemoApplication{publicstaticvoidmain(String[] args){SpringApplication.run(KafkaDemoApplication.class);}}

可以编写一个配置类,创建一个名为 testTopic 的主题,并将其分区数设置为 8(当我们发送消息到主题时,Kafka 会自动创建该主题,但此时仅存在一个分区)

@ConfigurationpublicclassKafkaInitialConfiguration{// 创建一个名为 testTopic 的 topic 并设置分区数为 8,分区副本数为 2@BeanpublicNewTopicinitialTopic(){returnnewNewTopic("testTopic",8,(short)2);}// 如果要修改分区数,只需修改配置值重启项目即可// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小@BeanpublicNewTopicupdateTopic(){returnnewNewTopic("testTopic",10,(short)2);}}

编写生产者 Controller 控制类模拟消息的生产,引入 KafkaTemplate

@RestController@RequestMapping("/kafka/producer")publicclassProducerController{@ResourceprivateKafkaTemplate<Object,Object> kafkaTemplate;}

简单的生产消费

在 ProducerController 中模拟消息的生产

/**
     * 简单生成消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */@GetMapping("/simpleSend/{topic}/{msg}")publicStringsimpleSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){
        kafkaTemplate.send(topic, msg);return"Kafka 生产消息成功~";}

创建 ConsumerService 作为消费者进行消息的消费,

@KafkaListener

topics 

属性指定了监听的主题,可同时监听多个,用逗号隔开即可

/**
     * 简单监听
     * @param record 消息
     */@KafkaListener(topics ={"testTopic"})publicvoidsimpleGetMsg(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}

使用 postman 测试结果如下
在这里插入图片描述
消费结果
在这里插入图片描述

进阶使用-生产者

带回调的生产者

/**
     * 带回调的发送消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */@GetMapping("/callbackSend/{topic}/{msg}")publicStringcallbackSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){try{// 发送消息并设置成功和失败的逻辑
            kafkaTemplate.send(topic, msg).addCallback(newListenableFutureCallback<SendResult<Object,Object>>(){// 发送失败处理逻辑@OverridepublicvoidonFailure(Throwable e){System.out.println("消息发送失败..."+ e.getMessage());}// 发送成功处理逻辑@OverridepublicvoidonSuccess(SendResult<Object,Object> result){System.out.println("消息发送成功..."+ result.getRecordMetadata().topic()+"-"+
                            result.getRecordMetadata().partition()+"-"+ result.getRecordMetadata().offset());}});}catch(Throwable throwable){return"kafka 发送消息失败~";}return"kafka 发送消息成功~";}

测试结果
在这里插入图片描述
发送消息成功回调打印信息
在这里插入图片描述

事务提交消息

/**
     * kafka 事务发送消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */@GetMapping("/transactionSend/{topic}/{msg}")publicStringtransactionSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){finalBoolean[] result ={true};
        kafkaTemplate.executeInTransaction(newKafkaOperations.OperationsCallback<Object,Object,Object>(){@OverridepublicObjectdoInOperations(KafkaOperations<Object,Object> kafkaOperations){try{if("HelloKafka_transaction".equals(msg)){thrownewRuntimeException("消息异常,启动 Kafka 事务,不生产对应消息~");}try{
                        kafkaTemplate.send(topic, msg).get();}catch(ExecutionException e){
                        e.printStackTrace();}return"生产消息无异常,生产成功~";}catch(Exception e){
                    e.printStackTrace();
                    result[0]=false;return"生产消息有异常,生产失败~";}}});if(!result[0]){return"kafka 发送消息失败~";}return"kafka 发送消息成功~";}

使用事务发送消息时,需要在配置文件中配置

transaction-id-prefix

属性,即事务前缀,详见开篇的配置文件,打开相应的注释即可
在这里插入图片描述
生产异常处理,当有异常产生时,消息不会被发送,故消费者没有监听到信息
在这里插入图片描述

自定义分区器

Kafka 中 每一个 Topic 都可以划分成多个分区,而消息将被 append 到哪一个分区,则有对应的分区策略:

  • 若我们在发送消息时制定了分区策略,则将消息按照策略 append 到相应分区;
  • 若我们在发送消息时没有指定分区,但消息携带了 key,此时 kafka 将根据 key 将消息划分到对应分区,该策略将保证 key 相同的消息被 append 到同一分区;
  • 若分区和 key 都没有指定,则使用 kafka 的默认分区策略,轮询得到一个分区;

编写自定义分区器

publicclassMyPartitionerimplementsPartitioner{@OverridepublicvoidonNewBatch(String topic,Cluster cluster,int prevPartition){Partitioner.super.onNewBatch(topic, cluster, prevPartition);}// 在此处自定义分区策略,目前该策略默认使用 partition 0@Overridepublicintpartition(String s,Object o,byte[] bytes,Object o1,byte[] bytes1,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> map){}}

配置好自定义的分区逻辑后,需要在配置文件中配置自定义分区策略

spring.kafka.producer.properties.partitioner.class=com.zqf.config.MyPartitioner

详见开篇配置文件末尾,将注释去掉即可

测试发现,所有消息都会被分发进入分区 0
在这里插入图片描述

进阶使用-消费者

指定消费者监听主题、分区、偏移量

配置

@KafkaListener

的属性可指定消费者监听的主题、分区以及偏移量

/**
     * 监听特定主题、特定分区
     * @param record 消息
     */@KafkaListener(id ="consumer01", groupId ="testGroup", topicPartitions ={@TopicPartition(topic ="topic1", partitions ="0"),@TopicPartition(topic ="testTopic", partitions ={"1","3","5","7","9"})})publicvoidtargetGetMsg1(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("=======consumer01收到消息=======");System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}/**
     * 监听特定主题、特定分区
     * @param record 消息
     */@KafkaListener(id ="consumer02", groupId ="testGroup", topicPartitions ={@TopicPartition(topic ="topic2", partitions ="0"),@TopicPartition(topic ="testTopic", partitions ={"0","2","4","6","8"})})publicvoidtargetGetMsg2(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("=======consumer02收到消息=======");System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}

测试结果
在这里插入图片描述

消费者批量消费

修改配置文件,开启批量消费模式,并指定每一次消费的消息数量

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

详见开篇配置文件,将对应属性的注释去掉即可

编写生产者方法批量生产数据并发送

/**
     * 批量消息处理
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */@GetMapping("/multiAccept/{topic}/{msg}")publicStringmultiAccept(@PathVariable("topic")String topic,@PathVariable("msg")String msg){for(int i =1; i <=30; i++){
            kafkaTemplate.send(topic, msg + i);}return"Kafka 发送消息成功~";}

消费者批量进行消息的消费

/**
     * 批量消息处理
     * @param records 消息
     */@KafkaListener(topics ={"multiAcceptTopic"})publicvoidmultiAccept(List<ConsumerRecord<Object,Object>> records){System.out.println("======= Kafka 开始批量消费 =======");System.out.println("消息数量 >>> "+ records.size());
        records.forEach(consumer ->{System.out.println("msg:"+(String) consumer.value());});}

测试结果
在这里插入图片描述

消费者异常处理

消费者异常处理需要编写异常处理器,并配置到监听方法中

/**
 * 消费异常处理器
 */@ConfigurationpublicclassMyConsumerAwareErrorHandler{@BeanpublicConsumerAwareListenerErrorHandlerconsumerAwareListenerErrorHandler(){returnnewConsumerAwareListenerErrorHandler(){@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException e,Consumer<?,?> consumer){// 编写消费者异常处理逻辑System.out.println("消费发生异常...");System.out.println("异常信息:"+ e.getMessage());returnnull;}};}}

模拟消费者消费异常

/**
     * 消费异常处理
     * @param record 消息
     */@KafkaListener(topics ={"errorHandler"}, errorHandler ="consumerAwareListenerErrorHandler")publicvoiderrorHandlerListener(ConsumerRecord<Object,Object>record){thrownewRuntimeException("模拟接收消息异常...");}

测试结果
在这里插入图片描述

消费者消息过滤

消费者进行消息的过滤需要编写过滤器

/**
 * 消息过滤器
 */@ComponentpublicclassMyKafkaFilter{@AutowiredConsumerFactory<Object,Object> consumerFactory;@BeanpublicConcurrentKafkaListenerContainerFactory<Object,Object>listenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Object,Object> factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);// 设置被过滤的消息将被丢弃
        factory.setAckDiscarded(true);// 设置消息过滤的策略
        factory.setRecordFilterStrategy(consumerRecord ->{if(consumerRecord.value().toString().length()<5){// 返回 false 则消息不被过滤returnfalse;}// 返回 true 则消息被过滤System.out.println("消息不合格~被过滤丢弃~");returntrue;});return factory;}}

编写消费者消费方法

/**
     * 消费者消息过滤
     * @param record 消息
     */@KafkaListener(topics ={"msgFilter"}, containerFactory ="listenerContainerFactory")publicvoidmsgFilterListener(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}

测试结果
根据过滤逻辑,未被过滤的消息,正常被消费
在这里插入图片描述
被过滤掉的消息,执行过滤逻辑,消息未被消费
在这里插入图片描述

消费者消息转发

消费者可以在监听到消息后,接受消息并对消息进行处理,并将处理后的消息转发到另外的主题,只需要添加注解

@sendTo

即可

/**
     * 消费者消息转发
     * @param record 消息
     */@KafkaListener(topics ={"sendTo"})@SendTo("testTopic")publicStringsendToListener(ConsumerRecord<Object,Object>record){// 此处编写消息处理逻辑,处理完成后将处理后的消息转发至目标主题returnrecord.value()+"--我被处理了~";}

进行测试,发送消息内容为:“HiKafka
在这里插入图片描述
消费者进行监听,接收到消息后进行处理并转发到主题

testTopic

,并被对应的消费者处理
在这里插入图片描述

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/zqf787351070/article/details/126858773
版权归原作者 情绪大瓜皮丶 所有, 如有侵权,请联系我们删除。

“【应用】SpringBoot 整合 Kafka”的评论:

还没有评论