0


spring cloud stream使用

技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换。

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
在这里插入图片描述
inputs是消费者,outputs是生产者

Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)

其主要流程如下图
在这里插入图片描述
Binder:很方便的连接中间件,屏蔽差异。

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。

Source和Sink:简单理解为消息的生产者和消费者。

消息分组

可以让每个消息组都获取消息,而每个组内,只能有一个消费者消费这条消息。
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这样多个消费者实例在一个组下面就不会再出现消息重复消费。
多个消息消费者只有其中一个可以消费(同一个group,多个消费者之间是竞争的关系)。
简单说,生产者发送10条,其中一个消费者收到4条,另一个消费者可能收到6条

Spring Cloud Stream 当使用的消息中间件为Rabbit MQ的时候,配置input的时候,如果没有指定消息分组,那么生成的队列名称就是匿名的,并且当连接断开的时候会自动删除对应的队列。

在Rabbit MQ可以看到对应队列如下,第一个队列就是没有指定消息分组属性group生成的队列,可以看到生成的队列特性auto-delete:true,exclusive:true,也就是队列是排他性的,只有当前连接可见,并且当连接断开的时候队列会自动删除。
在这里插入图片描述
在这里插入图片描述
如果指定的分组,那么生成的队列名称就是destination+group,例如如下配置生成的队列名字就是css_test.css_test_queue,队列是配置:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. css_test:
  6. destination: css_test
  7. binder: rabbit1
  8. group: css_test_queue

在Rabbit MQ可以看到对应队列如下,队列是durable: true,队列是持久的,不会自动删除。
在这里插入图片描述
在这里插入图片描述
1、队列持久化的概念
队列的声明默认是存放到内存中的,若是rabbitmq重启会丢失,若是想重启以后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启以后会读取该数据库。数据库

2、排他
简单理解就是在链接关闭时是否会删除队列(不管队列中有没有消息) 性能

3、自动删除
当队列中有消息时,不管是否排他,关闭链接都不会删除队列,此时消费者消费完消息后再断开消费者,队列会被自动删除。(这里若是有多个消费者消费同一个队列,则须要全部消费者都断开后才能自动删除)

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。

官方版本目前仅仅支持RabbitMQ和Kafka。

MQ相关术语
Message:生产者/消费者之间靠消息媒介传递信息内容

MessageChannel:消息必须走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。

相关注解
Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。
Input:表示输入通道,消息进入该通道传到应用程序。
Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
StreamListener:监听队列,用于消费者的队列的消息接收。
EnableBinding:将信道channel和exchange绑定在一起。

如果多次重复消费依然不成功,总要有个兜底的,那就给它丢到死信队列(Dead Letter Queue, DLQ)里就好了。

在死信队列中的消息,不会主动向消费者发送消息,需要我们人工处理,比如将它们取出再次消费等。

在测试时,可以抛出异常试一下,会发现超过阈值(本地重试max-attempts=2)后会抛异常,并将消息放入死信队列中。如下图:
在这里插入图片描述
dlq-topic.dlq-group只是正常的队列。死信队列名字的格式为topicName.queueName.dlq。也就是说这里实际的死信队列是dlq-topic.dlq-group.dlq。

而死信队列也可以通过move message把消息重新移入正常队列中进行重新消费。(其实正常队列也可以move,不过没必要)
在这里插入图片描述
在这里插入图片描述
Spring Cloud Stream核心架构1

两个比较重要的地方:inputs(输入)消息接收端、outputs(输出)消息发送端

一个 Spring Cloud Stream 应用以消息中间件为核心,应用通过Spring Cloud Stream注入的输入/输出通道 channels 与外部进行通信。channels 通过特定的Binder实现与外部消息中间件进行通信。
在这里插入图片描述
Spring Cloud Stream核心架构2

黄色:表示RabbitMQ

绿色:插件,消息的输入输出都套了一层插件,插件可以用于各种各样不同的消息,也可以用于消息中间件的替换。

核心概念:

Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:

@Output:输出注解,用于定义发送消息接口

@Input:输入注解,用于定义消息的消费者接口

@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常合适,但是使用SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。目前SpringCloudStream整合了RabbitMQ与Kafka,我们都知道Kafka是无法进行消息可靠性投递的,这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本SpringCloudStream的定位。

因此在实际的工作中,可以采用SpringCloudStream,如果需要保证可靠性投递,也可以单独采用RabbitMQ,也是可以的。

yl-jts-edi-tiktok项目发送消息的yl-jts-edi-tiktok-rabbitmq.yaml配置

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. test-trace-push-input:
  6. destination: test-trace-push-phl
  7. binder: mq-edi
  8. group: jts-edi-tiktok
  9. consumer:
  10. maxAttempts: 3
  11. backOffInitialInterval: 10000
  12. backOffMaxInterval: 200000
  13. backOffMultiplier: 3.0
  14. tiktok-trace-push-input:
  15. destination: tiktok-trace-push
  16. binder: mq-edi
  17. group: jts-edi-tiktok
  18. consumer:
  19. maxAttempts: 3
  20. backOffInitialInterval: 10000
  21. backOffMaxInterval: 200000
  22. backOffMultiplier: 3.0# 测试轨迹推送
  23. test-trace-push-output:
  24. destination: test-trace-push-phl
  25. binder: mq-edi
  26. #下单
  27. tiktok-order-create-output:
  28. destination: tiktok-order-create
  29. binder: mq-order
  30. rabbit:
  31. bindings:
  32. test-trace-push-input:
  33. consumer:
  34. concurrency: 4
  35. max-concurrency: 8
  36. prefetch: 10#这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
  37. auto-bind-dlq: true
  38. republish-to-dlq: true
  39. tiktok-trace-push-input:
  40. consumer:
  41. concurrency: 4
  42. max-concurrency: 8
  43. prefetch: 10#这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
  44. auto-bind-dlq: true
  45. republish-to-dlq: true#消费者开启延时队列支持
  46. delayed-exchange: true#生产者开启延时队列支持
  47. producer:
  48. delayed-exchange: true
  49. defaultBinder: mq-edi
  50. binders:
  51. mq-edi:
  52. type: rabbit
  53. environment:
  54. spring:
  55. rabbitmq:
  56. host: 172.31.44.118
  57. port: 5672
  58. username: admin
  59. password: 123456
  60. virtual-host: /jts-phl-job
  61. mq-order:
  62. type: rabbit
  63. environment:
  64. spring:
  65. rabbitmq:
  66. host: 172.31.44.118
  67. port: 5672
  68. username: admin
  69. password: 123456
  70. virtual-host: /jts-phl-order

jts-phl-openplatform-consumer项目消费的jts-phl-common-mq.yaml配置

  1. spring:
  2. rabbitmq:
  3. addresses: 172.31.44.118:5672,172.31.44.119:5672,172.31.44.120:5672
  4. username: admin
  5. password: 123456
  6. virtual-host: /jts-phl-order
  7. connection-timeout: 3000
  8. cloud:
  9. stream:
  10. bindings:
  11. #创建订单输出通道
  12. create-order-out-put:
  13. destination: createOrder
  14. group: order_create_group
  15. content-type: application/json
  16. #创建订单输入通道
  17. create-order-in-put:
  18. destination: createOrder
  19. group: order_create_group
  20. content-type: application/json
  21. consumer:
  22. #最大重试次数,默认为3
  23. maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
  24. concurrency: 1#取消订单输出通道
  25. cancel-order-out-put:
  26. destination: cancelOrder
  27. group: order_cancel_group
  28. content-type: application/json
  29. #取消订单输入通道
  30. cancel-order-in-put:
  31. destination: cancelOrder
  32. group: order_cancel_group
  33. content-type: application/json
  34. consumer:
  35. #最大重试次数,默认为3
  36. maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
  37. concurrency: 1#订单日志输出通道
  38. order-log-out-put:
  39. destination: orderLog
  40. group: order_log_group
  41. content-type: application/json
  42. #订单日志输入通道
  43. order-log-in-put:
  44. destination: orderLog
  45. group: order_log_group
  46. content-type: application/json
  47. consumer:
  48. #最大重试次数,默认为3
  49. maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
  50. concurrency: 1#取消包裹状态输出通道
  51. cancel-package-status-out-put:
  52. destination: cancelPackageStatus
  53. group: cancel_package_status_group
  54. content-type: application/json
  55. #取消包裹状态输入通道
  56. cancel-package-status-in-put:
  57. destination: cancelPackageStatus
  58. group: cancel_package_status_group
  59. content-type: application/json
  60. consumer:
  61. #最大重试次数,默认为3
  62. maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
  63. concurrency: 1#tiktok创建订单
  64. tiktok-order-create-input:
  65. destination: tiktok-order-create
  66. group: tiktok-order-create
  67. content-type: application/json
  68. consumer:
  69. #最大重试次数,默认为3
  70. maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
  71. concurrency: 5
  72. rabbit:
  73. bindings:
  74. #创建订单输入通道 启用死信队列
  75. create-order-in-put:
  76. consumer:
  77. #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
  78. prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
  79. maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
  80. autoBindDlq: true
  81. republishToDlq: true#取消订单输入通道 启用死信队列
  82. cancel-order-in-put:
  83. consumer:
  84. #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
  85. prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
  86. maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
  87. autoBindDlq: true
  88. republishToDlq: true#订单日志输入通道
  89. order-log-in-put:
  90. consumer:
  91. #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
  92. prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
  93. maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
  94. autoBindDlq: true
  95. republishToDlq: true#取消包裹状态输入通道
  96. cancel-package-status-in-put:
  97. consumer:
  98. #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
  99. prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
  100. maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
  101. autoBindDlq: true
  102. republishToDlq: true#tiktok创建订单 启用死信队列
  103. tiktok-order-create-input:
  104. consumer:
  105. #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
  106. prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
  107. maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
  108. autoBindDlq: true
  109. republishToDlq: true

springCloud使用stream配置rabbitMq实现延时消息

先安装rabbitMq延时插件
参考我另一篇文章

https://blog.csdn.net/weixin_43944305/article/details/120828003

上依赖

  1. <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

消息通道

  1. package com.fchan.springcloudstream.service;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;
  2. public interface MyMessageChannel {
  3. String out ="out";
  4. String in="in";
  5. @Output(out)
  6. MessageChannel out();
  7. @Input(in)
  8. SubscribableChannel in();}

// 发送延迟消息

  1. @PostMapping("/delayed")
  2. public String sendDelayedMessage(@RequestParam("body") String body,
  3. @RequestParam("seconds") Integer seconds){
  4. Map<String,Object> message = new HashMap<>();
  5. message.put("body", body);
  6. myMessageChannel.out().send(
  7. MessageBuilder.withPayload(message)
  8. .setHeader("x-delay", seconds * 1000)
  9. .build());
  10. log.info("发送延迟消息成功");return"SUCCESS";}

延时消息接收

  1. package com.fchan.springcloudstream.service;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import java.util.Map;
  2. @Component
  3. @EnableBinding({MyMessageChannel.class})
  4. public class MyConsumer {
  5. Logger log = LoggerFactory.getLogger(MyConsumer.class);
  6. @StreamListener(MyMessageChannel.in)
  7. public void input(Message<Map<String,Object>> message){
  8. log.info("收到消息:{}", message.getPayload());}}

yml配置

  1. spring:
  2. rabbitmq:
  3. host: 110.40.181.73
  4. port: 35672
  5. username: root
  6. password: 10086
  7. virtual-host: /fchan
  8. cloud:
  9. stream:
  10. rabbit:
  11. bindings:
  12. #消费者开启延时队列支持
  13. in:
  14. consumer:
  15. delayed-exchange: true#生产者开启延时队列支持
  16. out:
  17. producer:
  18. delayed-exchange: true
  19. bindings:
  20. in:
  21. #指定消息所属exchange
  22. destination: test#指定消费者分组,在多实例的时候必需指定,防止重复消费
  23. group: myIn
  24. out:
  25. destination: test

再展示一个例子配合 Spring Cloud Stream 使用延迟交换机

首先来看一下延迟交换机如何配置:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. delayedQueueOutput:
  6. destination: delayedQueueTopic
  7. content-type: application/json
  8. binder: rabbit
  9. delayedQueueInput:
  10. destination: delayedQueueTopic
  11. content-type: application/json
  12. group: ${spring.application.name}
  13. binder: rabbit
  14. rabbit:
  15. bindings:
  16. delayedQueueOutput:
  17. producer:
  18. delayedExchange: true# 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange
  19. delayedQueueInput:
  20. consumer:
  21. delayedExchange: true# 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

重点关注2个配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchange 和 spring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange。

这2个配置分别属于生产者和消费者的配置,但都是用于告诉 Spring Cloud Stream 是否将交换机声明为一个延迟消息交换机。这2个是成对出现,如果少配置了一个,服务启动时会报一个警告.


本文转载自: https://blog.csdn.net/blueheartstone/article/details/124897487
版权归原作者 磊哥 低调 所有, 如有侵权,请联系我们删除。

“spring cloud stream使用”的评论:

还没有评论