0


Spring Cloud Stream的配置及使用——以RabbitMQ为例

1. 简介

https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html
英语好的可以直接看官方文档,文档里讲的更全面
RabbitMQ Binder

By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue is bound to that TopicExchange.

上图是RabbitMQ Binder(绑定器)。默认情况下,绑定器实现将每一个destination映射到一个TopicExchange。对于每一个消费者组,都有一个队列绑定到那个TopicExchange。

2. 依赖配置

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

3. 生产者配置及消息发送

3.1 yaml配置

spring:cloud:stream:#      如果有一个binder的话,就不需要设置default-binder: rabbit
      binders:rabbit1:type: rabbit
          environment:spring:rabbitmq:host: 192.168.70.224
                port:5672username: admin
                password:444944virtual-host: GHost
        rabbit:type: rabbit
          defaultCandidate:falseenvironment:spring:rabbitmq:host: 192.168.70.167
                port:5672username: admin
                password: public
                virtual-host: /
      bindings:order:binder: rabbit
          destination: order
          producer:#            默认是trueautoStartup:truecart:binder: rabbit
          destination: cart
          routingKeyExpression: han
          producer:#            默认是trueautoStartup:true# rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-<partition>

配置说明:

  • 在Spring Cloud Stream中可以配置多个binder,也就是可以配置连接多个MQ服务器
  • 在RabbitMQ中,binding的名称对应的是output和input的名称,destination对应mq中的exchange名称。上述配置会生成order,cart两个exchange
  • rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-,分区内容这里不涉及,具体内容看官方文档

3.2 生产者声明

publicinterfaceCartSource{/**
     * Name of the output channel.
     * cart对应的是binding,
     * destination对应的是rabbitmq里的exchange,kafka中的topic.
     * 如果没有设置destination, rabbitmq会自动创建一个和binding同名的exchange
     */
    String OUTPUT ="cart";/**
     * @return output channel
     */@Output(CartSource.OUTPUT)
    MessageChannel output();}

cloud stream中output表示发送,这里的cart与配置中的binding名称是对应的

//必须添加,要不然无法注入。也可以加在启动类上,可以重复添加@EnableBinding(CartSource.class)@ComponentpublicclassCartSender{@Autowiredprivate CartSource orderSource;privatestaticfinal Logger logger= LoggerFactory.getLogger(CartSender.class);publicvoidpushMsg(Order order){
        logger.info("sending rabbitmq message:{}",order.toString());
        orderSource.output().send(MessageBuilder.withPayload(order).build());}

3.3 消息发送

@RestController@RequestMapping("mqTest1")publicclassMqTest1Controller{@Autowired
    CartSender cartSender;@GetMapping("streamPush")public String streamPush(){
        cartSender.pushMsg(newOrder());return"hehe";}}

4. 消费者配置

4.1 yaml配置

spring:cloud:stream:#      如果有一个binder的话,就不需要设置default-binder: rabbit
      binders:rabbit1:type: rabbit
          environment:spring:rabbitmq:host: 192.168.70.224
                port:5672username: admin
                password:444944virtual-host: /
        rabbit:type: rabbit
          defaultCandidate:falseenvironment:spring:rabbitmq:host: 192.168.70.167
                port:5672username: admin
                password: public
                virtual-host: /
      bindings:order:binder: rabbit
          destination: order
          group: myOrderQueue
          conumer:concurrency:3cart:binder: rabbit
          destination: cart
          group: myCartQueue
          conumer:concurrency:3# rabbit的扩展配置 RabbitExtendedBindingPropertiesrabbit:bindings:#          order:#            consumer:#              bindingRoutingKey: order-keycart:consumer:#          如果没有指定routing key,会使用默认的 #bindingRoutingKey: cart-key

配置说明:

  • 只有消费者才会创建队列,queue名称为:<bindingName><destination>.<group>
  • routingKey设置spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey 如果没有设置的话,默认为#

在这里插入图片描述

4.2 消费者声明与消息接收

publicinterfaceCartSink{

    String INPUT ="cart";/**
     * @return input channel.
     */@Input(CartSink.INPUT)
    SubscribableChannel input();}

cloud stream中input表示接收,这里的cart与配置中的binding名称是对应的

@EnableBinding(CartSink.class)publicclassCartHandler{privatestaticfinal Logger logger = LoggerFactory.getLogger(CartHandler.class);/**
     * 参数也可以是对象,会自动将消息转换为对象
     * @param headers
     * @param payload
     */@StreamListener(CartSink.INPUT)publicvoidloggerSink(@Headers MessageHeaders headers,byte[] payload){
        String cartChange=newString(payload);
        logger.info("cart change:{}",cartChange);}}

本文转载自: https://blog.csdn.net/CaptHua/article/details/123455144
版权归原作者 CaptHua 所有, 如有侵权,请联系我们删除。

“Spring Cloud Stream的配置及使用&mdash;&mdash;以RabbitMQ为例”的评论:

还没有评论