0


spring cloud stream使用

技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换。

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
在这里插入图片描述
inputs是消费者,outputs是生产者

Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)

其主要流程如下图
在这里插入图片描述
Binder:很方便的连接中间件,屏蔽差异。

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。

Source和Sink:简单理解为消息的生产者和消费者。

消息分组

可以让每个消息组都获取消息,而每个组内,只能有一个消费者消费这条消息。
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这样多个消费者实例在一个组下面就不会再出现消息重复消费。
多个消息消费者只有其中一个可以消费(同一个group,多个消费者之间是竞争的关系)。
简单说,生产者发送10条,其中一个消费者收到4条,另一个消费者可能收到6条

Spring Cloud Stream 当使用的消息中间件为Rabbit MQ的时候,配置input的时候,如果没有指定消息分组,那么生成的队列名称就是匿名的,并且当连接断开的时候会自动删除对应的队列。

在Rabbit MQ可以看到对应队列如下,第一个队列就是没有指定消息分组属性group生成的队列,可以看到生成的队列特性auto-delete:true,exclusive:true,也就是队列是排他性的,只有当前连接可见,并且当连接断开的时候队列会自动删除。
在这里插入图片描述
在这里插入图片描述
如果指定的分组,那么生成的队列名称就是destination+group,例如如下配置生成的队列名字就是css_test.css_test_queue,队列是配置:

spring:
  cloud:
    stream:
      bindings:
        css_test:
          destination: css_test
          binder: rabbit1
          group: css_test_queue      

在Rabbit MQ可以看到对应队列如下,队列是durable: true,队列是持久的,不会自动删除。
在这里插入图片描述
在这里插入图片描述
1、队列持久化的概念
队列的声明默认是存放到内存中的,若是rabbitmq重启会丢失,若是想重启以后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启以后会读取该数据库。数据库

2、排他
简单理解就是在链接关闭时是否会删除队列(不管队列中有没有消息) 性能

3、自动删除
当队列中有消息时,不管是否排他,关闭链接都不会删除队列,此时消费者消费完消息后再断开消费者,队列会被自动删除。(这里若是有多个消费者消费同一个队列,则须要全部消费者都断开后才能自动删除)

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。

官方版本目前仅仅支持RabbitMQ和Kafka。

MQ相关术语
Message:生产者/消费者之间靠消息媒介传递信息内容

MessageChannel:消息必须走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。

相关注解
Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。
Input:表示输入通道,消息进入该通道传到应用程序。
Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
StreamListener:监听队列,用于消费者的队列的消息接收。
EnableBinding:将信道channel和exchange绑定在一起。

如果多次重复消费依然不成功,总要有个兜底的,那就给它丢到死信队列(Dead Letter Queue, DLQ)里就好了。

在死信队列中的消息,不会主动向消费者发送消息,需要我们人工处理,比如将它们取出再次消费等。

在测试时,可以抛出异常试一下,会发现超过阈值(本地重试max-attempts=2)后会抛异常,并将消息放入死信队列中。如下图:
在这里插入图片描述
dlq-topic.dlq-group只是正常的队列。死信队列名字的格式为topicName.queueName.dlq。也就是说这里实际的死信队列是dlq-topic.dlq-group.dlq。

而死信队列也可以通过move message把消息重新移入正常队列中进行重新消费。(其实正常队列也可以move,不过没必要)
在这里插入图片描述
在这里插入图片描述
Spring Cloud Stream核心架构1

两个比较重要的地方:inputs(输入)消息接收端、outputs(输出)消息发送端

一个 Spring Cloud Stream 应用以消息中间件为核心,应用通过Spring Cloud Stream注入的输入/输出通道 channels 与外部进行通信。channels 通过特定的Binder实现与外部消息中间件进行通信。
在这里插入图片描述
Spring Cloud Stream核心架构2

黄色:表示RabbitMQ

绿色:插件,消息的输入输出都套了一层插件,插件可以用于各种各样不同的消息,也可以用于消息中间件的替换。

核心概念:

Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:

@Output:输出注解,用于定义发送消息接口

@Input:输入注解,用于定义消息的消费者接口

@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常合适,但是使用SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。目前SpringCloudStream整合了RabbitMQ与Kafka,我们都知道Kafka是无法进行消息可靠性投递的,这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本SpringCloudStream的定位。

因此在实际的工作中,可以采用SpringCloudStream,如果需要保证可靠性投递,也可以单独采用RabbitMQ,也是可以的。

yl-jts-edi-tiktok项目发送消息的yl-jts-edi-tiktok-rabbitmq.yaml配置

spring:
  cloud:
    stream:
      bindings:
        test-trace-push-input:
          destination: test-trace-push-phl
          binder: mq-edi
          group: jts-edi-tiktok
          consumer:
            maxAttempts: 3
            backOffInitialInterval: 10000
            backOffMaxInterval: 200000
            backOffMultiplier: 3.0

        tiktok-trace-push-input:
          destination: tiktok-trace-push
          binder: mq-edi
          group: jts-edi-tiktok
          consumer:
            maxAttempts: 3
            backOffInitialInterval: 10000
            backOffMaxInterval: 200000
            backOffMultiplier: 3.0# 测试轨迹推送
        test-trace-push-output:
          destination: test-trace-push-phl
          binder: mq-edi
        #下单
        tiktok-order-create-output:
          destination: tiktok-order-create
          binder: mq-order

      rabbit:
        bindings:
          test-trace-push-input:
            consumer:
              concurrency: 4
              max-concurrency: 8
              prefetch: 10#这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
              auto-bind-dlq: true
              republish-to-dlq: true

          tiktok-trace-push-input:
            consumer:
              concurrency: 4
              max-concurrency: 8
              prefetch: 10#这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
              auto-bind-dlq: true
              republish-to-dlq: true#消费者开启延时队列支持
              delayed-exchange: true#生产者开启延时队列支持
            producer:
              delayed-exchange: true
              
      defaultBinder: mq-edi
      binders:
        mq-edi:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.31.44.118
                port: 5672
                username: admin
                password: 123456
                virtual-host: /jts-phl-job

        mq-order:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.31.44.118
                port: 5672
                username: admin
                password: 123456
                virtual-host: /jts-phl-order

jts-phl-openplatform-consumer项目消费的jts-phl-common-mq.yaml配置

spring:
  rabbitmq:
    addresses: 172.31.44.118:5672,172.31.44.119:5672,172.31.44.120:5672
    username: admin
    password: 123456
    virtual-host: /jts-phl-order
    connection-timeout: 3000
  cloud:
    stream:
      bindings:
        #创建订单输出通道
        create-order-out-put:
          destination: createOrder
          group: order_create_group
          content-type: application/json
        #创建订单输入通道
        create-order-in-put:
          destination: createOrder
          group: order_create_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
            concurrency: 1#取消订单输出通道
        cancel-order-out-put:
          destination: cancelOrder
          group: order_cancel_group
          content-type: application/json
        #取消订单输入通道
        cancel-order-in-put:
          destination: cancelOrder
          group: order_cancel_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
            concurrency: 1#订单日志输出通道
        order-log-out-put:
          destination: orderLog
          group: order_log_group
          content-type: application/json
        #订单日志输入通道
        order-log-in-put:
          destination: orderLog
          group: order_log_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
            concurrency: 1#取消包裹状态输出通道
        cancel-package-status-out-put:
          destination: cancelPackageStatus
          group: cancel_package_status_group
          content-type: application/json
        #取消包裹状态输入通道
        cancel-package-status-in-put:
          destination: cancelPackageStatus
          group: cancel_package_status_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
            concurrency: 1#tiktok创建订单
        tiktok-order-create-input:
          destination: tiktok-order-create
          group: tiktok-order-create
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3#初始/最少/空闲时 消费者数量。默认1
            concurrency: 5                                    
      rabbit:
        bindings:
          #创建订单输入通道 启用死信队列
          create-order-in-put: 
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true#取消订单输入通道 启用死信队列
          cancel-order-in-put: 
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true#订单日志输入通道
          order-log-in-put:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true#取消包裹状态输入通道
          cancel-package-status-in-put:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true#tiktok创建订单 启用死信队列
          tiktok-order-create-input:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10#默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10#是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true

springCloud使用stream配置rabbitMq实现延时消息

先安装rabbitMq延时插件
参考我另一篇文章

https://blog.csdn.net/weixin_43944305/article/details/120828003

上依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

消息通道

package com.fchan.springcloudstream.service;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;

public interface MyMessageChannel {

    String out ="out";
    String in="in";
    @Output(out)
    MessageChannel out();
    @Input(in)
    SubscribableChannel in();}

// 发送延迟消息

@PostMapping("/delayed")
public String sendDelayedMessage(@RequestParam("body") String body,
                                 @RequestParam("seconds") Integer seconds){

    Map<String,Object> message = new HashMap<>();
    message.put("body", body);
    myMessageChannel.out().send(
            MessageBuilder.withPayload(message)
                    .setHeader("x-delay", seconds * 1000)
                    .build());
    log.info("发送延迟消息成功");return"SUCCESS";}

延时消息接收

package com.fchan.springcloudstream.service;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import java.util.Map;

@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {
    Logger log = LoggerFactory.getLogger(MyConsumer.class);
    @StreamListener(MyMessageChannel.in)
    public void input(Message<Map<String,Object>> message){
        log.info("收到消息:{}", message.getPayload());}}

yml配置

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
  cloud:
    stream:
      rabbit:
        bindings:
          #消费者开启延时队列支持
          in:
            consumer:
              delayed-exchange: true#生产者开启延时队列支持
          out:
            producer:
              delayed-exchange: true
      bindings:
        in:
           #指定消息所属exchange
          destination: test#指定消费者分组,在多实例的时候必需指定,防止重复消费
          group: myIn
        out:
          destination: test

再展示一个例子配合 Spring Cloud Stream 使用延迟交换机

首先来看一下延迟交换机如何配置:

spring:
  cloud:
    stream:
      bindings:
        delayedQueueOutput:
          destination: delayedQueueTopic
          content-type: application/json
          binder: rabbit

        delayedQueueInput:
          destination: delayedQueueTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          delayedQueueOutput:
            producer:
              delayedExchange: true# 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

          delayedQueueInput:
            consumer:
              delayedExchange: true# 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

重点关注2个配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchange 和 spring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange。

这2个配置分别属于生产者和消费者的配置,但都是用于告诉 Spring Cloud Stream 是否将交换机声明为一个延迟消息交换机。这2个是成对出现,如果少配置了一个,服务启动时会报一个警告.


本文转载自: https://blog.csdn.net/blueheartstone/article/details/124897487
版权归原作者 磊哥 低调 所有, 如有侵权,请联系我们删除。

“spring cloud stream使用”的评论:

还没有评论