0


【实战】Spring Cloud Stream 3.1+整合Kafka

文章目录

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      binders:
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}
      default-binder: kafkahub 
      function:
          definition: inputChannel,outputChannel
      bindings:
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3#分区数目

此时消息生产者为:

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;}

此时消息消费者为:

@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>>inputChannel(){return message ->{
            System.out.println("接收到消息Payloa:" + message.getPayload());
            System.out.println("接收到消息Header:" + message.getHeaders());};}

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>cce-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>seata-demo-order</name><description>Demo project for Spring Boot</description><properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version></properties><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

增加applicaiton.yaml配置

spring:
  #kafka
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:  # kafka配置
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
          auto-add-partitions: true#自动分区
          auto-create-topics: true#自动创建主题
          replication-factor: 3#副本
          min-partition-count: 3#最小分区
        bindings:
          outputChannel-out-0:
            producer:
              # 无限制重发不产生消息丢失
              retries: Integer.MAX_VALUE
              #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1
              acks: all
              min:
                insync:
                  replicas: 3#感知副本数
          inputChannel-in-0:
            consumer:
              concurrency: 1#消费者数量
              max-concurrency: 5#最大消费者数量
              recovery-interval: 3000#3s 重连
              auto-rebalance-enabled: true#主题分区消费者组成员自动平衡
              auto-commit-offset: false#手动提交偏移量
              enable-dlq: true# 开启 dlq队列
              dlq-name: test-kafka-topic.dlq
              deserializationExceptionHandler: sendToDlq #异常加入死信
      binders: # 与外部mq组件绑定
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}

      default-binder: kafkahub #默认绑定
      function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
        definition: inputChannel;outputChannel;dlqChannel
      bindings: # 通道绑定
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1# 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000# 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2# 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000# 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3#分区数目
        dlqChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1# 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000# 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2# 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000# 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        dlqChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          content-type: text/plain
          producer:
            partition-count: 3#分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import javax.annotation.Resource;import java.util.function.Consumer;

/**
 * KafkaCustomer
 * @author senfel
 * @version 1.0
 * @date 2024/6/18 15:22
 */
@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>>inputChannel(){return message ->{
            System.out.println("接收到消息:" + message.getPayload());
            System.out.println("接收到消息:" + message.getHeaders());
            if(message.getPayload().contains("9")){
                boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());
                System.err.println("向dlqChannel发送消息:"+send);}};}

    /**
     * dlqChannel 死信消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>>dlqChannel(){return message ->{
            System.out.println("死信dlqChannel接收到消息:" + message.getPayload());
            System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

标签: spring cloud kafka java

本文转载自: https://blog.csdn.net/weixin_39970883/article/details/139805324
版权归原作者 小沈同学呀 所有, 如有侵权,请联系我们删除。

“【实战】Spring Cloud Stream 3.1+整合Kafka”的评论:

还没有评论