0


实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq

文章目录

前言

前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多个消息中间件和多种消息中间件的替换。今天,我们就在一个项目中用Spring Cloud Stream 集成两个消息中间件kafka和rabbitmq。

实战要点

1、完美集成并兼容kafka和rabbitmq
2、增加消费组概念,直接保证消息唯一消费
3、增加重试机制,重试条件满足后自动加入死信
4、增加死信消费者,可以直接移植生产
5、消费者手动ack、offset
6、rabbitmq、kafka配置,保证消息不丢失

技术积累

Spring Cloud Stream简介

Spring Cloud Stream是用于构建微服务具有消息驱动能力的框架,应用程序通过inputs、outputs通道与binder进行交互,binder与消息中间件进行通信。

binder的作用是将消息中间件进行粘合,相当于对第三方中间件进行封装整合,让开发人员不用关心底层消息中间件如何运行。
在这里插入图片描述

inputs是消息输入通道,类似于消息中间件的consumer消费者;outputs是消息输出通道,类似于消息中间件的producer生产者。应用程序收发消息不再直接调用消息中间件的接口或者逻辑代码,直接使用Spring Cloud Stream 的OUTPUT与INPUT通道进行处理。

可以通过binder绑定选用各种消息中间件,用binding进行中间件的相关参数配置,让应用程序达到灵活配置和切换消息中间件的目的。

集成kafka要点

1、修改server.properties文件,将#listeners=PLAINTEXT://:9092这一句注释放开,改为listeners=PLAINTEXT://kafka服务器ip:9092
如果此处不改SpringBoot在启动时会报错:
Error connecting to node devops-01:9092 (id: 0 rack: null)
2、kafka 2.8版本开始自带zk,建议使用2.8版本以上的版本不用安装zk
3、spring-boot-starter-paren与spring-cloud-starter-stream-kafk版本号一定要对应上,特别是springboot2之后的版本。如果没有特殊要求,需严格按照本文的版本号进行配置和实战
4、kafka本身、生产者、消费者保证消息不丢失,注意必须使用kafka HA配合修改配置

集成rabbitmq要点

1、rabbitmq比kafka的限制条件就少很多,基本上不用考虑spring版本号兼容
2、rabbimq本身、生产者、消费者保证消息不丢失,注意必须使用rabbitmq HA

实战演示

本次实战直接采用从0到1的策略进行演示,适合小白直接入手,可直接接入生产
本次实战MQ组件全部采用单机进行测试,生产环境请更换为HA

本次实战提供:
1、Kafka、Rabbitmq消息中间件信道注册
2、Kafka、Rabbitm消息中间件消息发送、接收消息监听、死信消息监听

Maven依赖版本号选择

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.3.12.RELEASE</version>
  5. <relativePath/>
  6. </parent>
  7. <properties>
  8. <java.version>1.8</java.version>
  9. <spring-cloud.version>Hoxton.SR10</spring-cloud.version>
  10. </properties>
  11. <dependencies>
  12. <dependency>
  13. <groupId>org.springframework.cloud</groupId>
  14. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.cloud</groupId>
  18. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  19. <version>3.0.3.RELEASE</version>
  20. </dependency>
  21. </dependencies>
  22. <dependencyManagement>
  23. <dependencies>
  24. <dependency>
  25. <groupId>org.springframework.cloud</groupId>
  26. <artifactId>spring-cloud-dependencies</artifactId>
  27. <version>${spring-cloud.version}</version>
  28. <type>pom</type>
  29. <scope>import</scope>
  30. </dependency>
  31. </dependencies>
  32. </dependencyManagement>

Spring及MQ主要配置

  1. server:
  2. port: 9999
  3. spring:
  4. rabbitmq:
  5. host: 10.10.22.187
  6. port: 5672
  7. username: admin
  8. password: admin
  9. virtual-host: /
  10. kafka:
  11. bootstrap-servers: 10.10.22.174:9092
  12. cloud:
  13. stream:
  14. default-binder: myRabbit #默认绑定的mq
  15. binders: #stream框架粘接的mq
  16. myRabbit: #自定义个人mq名称
  17. type: rabbit
  18. environment:
  19. spring: ${spring.rabbitmq}
  20. myKafka:
  21. type: kafka
  22. environment:
  23. spring:
  24. cloud:
  25. stream:
  26. kafka: ${spring.cloud.stream.kafka.binder}
  27. bindings: #stream绑定信道
  28. output_channel: #自定义发送信道名称
  29. destination: assExchange #目的地 交换机/主题
  30. content-type: application/json
  31. binder: myRabbit #粘接到的mq
  32. input_channel: #自定义接收信道
  33. destination: assExchange #目的地 交换机/主题
  34. content-type: application/json
  35. binder: myRabbit #粘接到的mq
  36. group: assGroup
  37. consumer:
  38. maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
  39. backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
  40. backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
  41. backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
  42. output_kafka_channel: #自定义发送信道名称
  43. destination: assTopic #目的地 交换机/主题
  44. content-type: text/plain
  45. binder: myKafka #粘接到的mq
  46. producer:
  47. partition-count: 2 #分区数目
  48. input_kafka_channel: #自定义接收信道
  49. destination: assTopic #目的地 交换机/主题
  50. content-type: text/plain
  51. binder: myKafka #粘接到的mq
  52. group: assGroup
  53. consumer:
  54. maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
  55. backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
  56. backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
  57. backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
  58. rabbit: #stream mq配置
  59. bindings:
  60. out_channel:
  61. producer:
  62. delivery-mode: persistent #消息持久化 non-persistent
  63. useConfirmHeader: true #Future<Confirm>获取异常投递,与confirmAckChannel互斥
  64. input_channel:
  65. consumer:
  66. concurrency: 1 #消费者数量
  67. max-concurrency: 5 #最大消费者数量
  68. durable-subscription: true #持久化队列
  69. recovery-interval: 3000 #3s 重连
  70. acknowledge-mode: MANUAL #手动
  71. requeue-rejected: false #是否重新放入队列
  72. auto-bind-dlq: true #开启死信队列
  73. requeueRejected: true #异常放入死信
  74. kafka:
  75. binder:
  76. brokers: ${spring.kafka.bootstrap-servers}
  77. auto-add-partitions: true #自动分区
  78. auto-create-topics: true #自动创建主题
  79. replication-factor: 1 #两个副本
  80. min-partition-count: 1 #最小分区
  81. bindings:
  82. out_kafka_channel:
  83. producer:
  84. # 无限制重发不产生消息丢失
  85. retries: Integer.MAX_VALUE
  86. #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
  87. #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
  88. #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
  89. #可以设置的值为:all, -1, 0, 1
  90. acks: all
  91. min:
  92. insync:
  93. replicas: 1 #感知副本数
  94. input_kafka_channel:
  95. consumer:
  96. concurrency: 1 #消费者数量
  97. max-concurrency: 5 #最大消费者数量
  98. recovery-interval: 3000 #3s 重连
  99. auto-rebalance-enabled: true #主题分区消费者组成员自动平衡
  100. auto-commit-offset: false #手动提交偏移量
  101. enable-dlq: true # 开启 dlq队列
  102. dlq-name: assTopic.dlq
  103. deserializationExceptionHandler: sendToDlq #异常加入死信

基础信道

  1. /**
  2. * MqChannel
  3. * @author senfel
  4. * @version 1.0
  5. * @date 2023/6/2 15:46
  6. */
  7. public interface MqChannel {
  8. /**
  9. * 消息目的地
  10. * RabbitMQ中为交换机名称
  11. * kafka topic
  12. */
  13. String DESTINATION = "assExchange";
  14. String DESTINATIONBYGROUP = "assGroup";
  15. String DESTINATIONBYTOPIC = "assTopic";
  16. /**
  17. * 输出信道
  18. */
  19. String OUTPUT_CHANNEL = "output_channel";
  20. String OUTPUT_KAFKA_CHANNEL = "output_kafka_channel";
  21. /**
  22. * 输入信道
  23. */
  24. String INPUT_CHANNEL = "input_channel";
  25. String INPUT_KAFKA_CHANNEL = "input_kafka_channel";
  26. String INPUT_KAFKA_CHANNEL_ERROR = "assTopic.dlq";
  27. /**
  28. * 死信队列
  29. */
  30. String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";
  31. @Output(MqChannel.OUTPUT_CHANNEL)
  32. MessageChannel output();
  33. @Output(MqChannel.OUTPUT_KAFKA_CHANNEL)
  34. MessageChannel outputByKafka();
  35. @Input(MqChannel.INPUT_CHANNEL)
  36. SubscribableChannel input();
  37. @Input(MqChannel.INPUT_KAFKA_CHANNEL)
  38. SubscribableChannel inputByKafka();
  39. @Input(MqChannel.INPUT_KAFKA_CHANNEL_ERROR)
  40. SubscribableChannel inputByKafkaError();
  41. }

绑定信道消息发送

提供绑定信道,增加rabbitmq、kafka发消息逻辑

1、启动类增加绑定mq注解@EnableBinding(MqChannel.class)

  1. @SpringBootApplication
  2. @EnableBinding(MqChannel.class)
  3. public class TestDemoApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(TestDemoApplication.class, args);
  6. }
  7. }

2、增加发送消息接口

  1. /**
  2. * TestMQService
  3. * @author senfel
  4. * @version 1.0
  5. * @date 2023/6/2 15:47
  6. */
  7. public interface TestMQService {
  8. /**
  9. * rabbitmq发送消息
  10. */
  11. void send(String str);
  12. /**
  13. * kafka发送消息
  14. */
  15. void sendByKafka(String str);
  16. }

3、实现发送消息接口

  1. /**
  2. * TestMQServiceImpl
  3. * @author senfel
  4. * @version 1.0
  5. * @date 2023/6/2 15:49
  6. */
  7. @Service
  8. @Slf4j
  9. public class TestMQServiceImpl implements TestMQService {
  10. @Resource
  11. private MqChannel mqChannel;
  12. @Override
  13. public void send(String str) {
  14. mqChannel.output().send(MessageBuilder.withPayload("rabbitmq测试:"+str).build());
  15. }
  16. @Override
  17. public void sendByKafka(String str) {
  18. mqChannel.outputByKafka().send(MessageBuilder.withPayload("kafka测试:"+str).build());
  19. }
  20. }

4、提供接口层

  1. /**
  2. * @author senfel
  3. * @version 1.0
  4. * @date 2023/6/2 17:27
  5. */
  6. @RestController
  7. public class TestController{
  8. @Resource
  9. private TestMQService testMQService;
  10. /**
  11. * testRabbitmq
  12. * @param str
  13. * @author senfel
  14. * @date 2023/6/8 11:27
  15. * @return java.lang.String
  16. */
  17. @GetMapping("/test")
  18. public String testMq(String str){
  19. testMQService.send(str);
  20. return str;
  21. }
  22. /**
  23. * testKafka
  24. * @param str
  25. * @author senfel
  26. * @date 2023/6/8 11:27
  27. * @return java.lang.String
  28. */
  29. @GetMapping("/testKafka")
  30. public String testKafka(String str){
  31. testMQService.sendByKafka(str);
  32. return str;
  33. }
  34. }

集成兼容多mq演示

Rabbitmq演示

1、TestMQServiceImpl增加mq消息监听和私信监听

  1. /**
  2. * 接收消息监听
  3. * @param message 消息体
  4. * @param channel 信道
  5. * @param tag 标签
  6. * @author senfel
  7. * @date 2023/6/5 9:25
  8. * @return void
  9. */
  10. @StreamListener(MqChannel.INPUT_CHANNEL)
  11. public void process(String message,
  12. @Header(AmqpHeaders.CHANNEL) Channel channel,
  13. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
  14. log.info("message : "+message);
  15. if(message.contains("9")){
  16. // 参数1为消息的tag 参数2为是否多条处理 参数3为是否重发
  17. //channel.basicNack(tag,false,false);
  18. System.err.println("--------------rabbitmq消费者消费异常--------------------------------------");
  19. System.err.println(message);
  20. throw new RuntimeException("消费异常");
  21. }else{
  22. System.err.println("--------------rabbitmq消费者--------------------------------------");
  23. System.err.println(message);
  24. channel.basicAck(tag,false);
  25. }
  26. }
  27. /**
  28. * 死信监听
  29. * @param message 消息体
  30. * @param channel 信道
  31. * @param tag 标签
  32. * @author senfel
  33. * @date 2023/6/5 14:30
  34. * @return void
  35. */
  36. @RabbitListener(
  37. bindings = @QueueBinding(
  38. value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
  39. , exchange = @Exchange(MqChannel.DESTINATION)
  40. ),
  41. concurrency = "1-5"
  42. )
  43. public void processByDlq(String message,
  44. @Header(AmqpHeaders.CHANNEL) Channel channel,
  45. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
  46. log.info("message : "+message);
  47. System.err.println("---------------rabbitmq死信消费者------------------------------------");
  48. System.err.println(message);
  49. }

2、测试正常消息投递

在这里插入图片描述

--------------rabbitmq消费者--------------------------------------
rabbitmq测试:777777777777777

3、测试异常消息投递,投递规则3次消费失败直接进入死信

在这里插入图片描述

--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
---------------rabbitmq死信消费者------------------------------------
rabbitmq测试:7777777777777779

Kafka演示

1、TestMQServiceImpl增加mq消息监听和私信监听

  1. /**
  2. * kafka消费者
  3. * @param message 消息体
  4. * @param acknowledgment ack
  5. * @param receivedTopic topic
  6. * @param groupId 消费者group
  7. * @author senfel
  8. * @date 2023/6/7 15:59
  9. * @return void
  10. */
  11. @StreamListener(MqChannel.INPUT_KAFKA_CHANNEL)
  12. public void processByKafka(String message,
  13. @Header(value = KafkaHeaders.ACKNOWLEDGMENT,required = false) Acknowledgment acknowledgment,
  14. @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
  15. @Header(value = KafkaHeaders.GROUP_ID,required = false) String groupId,
  16. @Header(value = KafkaHeaders.PARTITION_ID,required = false) String partitionId) throws Exception {
  17. System.err.println("-------进入kafka消费者---------------");
  18. System.err.println(message);
  19. System.err.println(receivedTopic);
  20. if(message.contains("9")){
  21. log.error("kafka消费异常:{}",message);
  22. System.err.println("kafka1消费异常"+message);
  23. throw new RuntimeException("kafka消费异常");
  24. }
  25. System.err.println("kafka接受的数据为"+message);
  26. acknowledgment.acknowledge();
  27. }
  28. /**
  29. * kafka死信消费
  30. * @param message 消息体
  31. * @param receivedTopic topic
  32. * @author senfel
  33. * @date 2023/6/7 15:58
  34. * @return void
  35. */
  36. @KafkaListener(topics = {MqChannel.INPUT_KAFKA_CHANNEL_ERROR},
  37. groupId = MqChannel.DESTINATIONBYGROUP)
  38. public void processByKafkaError(String message,
  39. @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) throws Exception {
  40. System.err.println("-------进入死信消费者---------------");
  41. System.err.println(message);
  42. System.err.println(receivedTopic);
  43. System.err.println("kafka死信接受的数据为"+message);
  44. System.err.println(message);
  45. }

2、测试正常消息投递

在这里插入图片描述

-------进入kafka消费者---------------
kafka测试:7777777777777777
assTopic
kafka接受的数据为kafka测试:7777777777777777

3、测试异常消息投递,投递规则3次消费失败直接进入死信

在这里插入图片描述

-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入死信消费者---------------
kafka测试:7777777777777779
assTopic.dlq
kafka死信接受的数据为kafka测试:7777777777777779
kafka测试:7777777777777779

写在最后

Spring Cloud Stream集成多消息中间件kafka、rabbitmq较为简单,直接省去了原生中间的的操作与处理,开发人员可以直接任意切换和混用多种消息中间件,大大增加架构的可用性与可移植性。本实战案例提供重试、私信、手动ack、消费者分组和负载等高可用方案,直接可接入生产使用。

⭐️路漫漫其修远兮,吾将上下而求索 🔍


本文转载自: https://blog.csdn.net/weixin_39970883/article/details/131108033
版权归原作者 小沈同学呀 所有, 如有侵权,请联系我们删除。

“实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq”的评论:

还没有评论