0


Spring RabbitMQ那些事(1-交换机配置&消息发送订阅实操)

目录

一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件

application.yml

,如下:

server:port:8080spring:rabbitmq:addresses: localhost:5672username: admin
    password: admin
    virtual-host: /
    listener:type: simple
      simple:acknowledge-mode: auto
        concurrency:5max-concurrency:20prefetch:5

备注:这里我们指定了

RabbitListenerContainerFactory

的类型为

SimpleRabbitListenerContainerFactory

,并且指定消息确认模式为自动确认

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/**
 * 定义4个队列
 */@ConfigurationprotectedstaticclassQueueConfig{@BeanpublicQueuequeue1(){returnQueueBuilder.durable("queue-1").build();}@BeanpublicQueuequeue2(){returnQueueBuilder.durable("queue-2").build();}@BeanpublicQueuequeue3(){returnQueueBuilder.durable("queue-3").build();}@BeanpublicQueuequeue4(){returnQueueBuilder.durable("queue-4").build();}}

2、定义Fanout交换机和队列绑定关系

/**
 * 定义Fanout交换机和对应的绑定关系
 */@ConfigurationprotectedstaticclassFanoutExchangeBindingConfig{@BeanpublicFanoutExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange("fanout-exchange").build();}/**
     * 定义多个Fanout交换机和队列的绑定关系
     * @param fanoutExchange
     * @param queue1
     * @param queue2
     * @param queue3
     * @param queue4
     * @return
     */@BeanpublicDeclarablesbindQueueToFanoutExchange(FanoutExchange fanoutExchange,Queue queue1,Queue queue2,Queue queue3,Queue queue4){Binding queue1Binding =BindingBuilder.bind(queue1).to(fanoutExchange);Binding queue2Binding =BindingBuilder.bind(queue2).to(fanoutExchange);Binding queue3Binding =BindingBuilder.bind(queue3).to(fanoutExchange);Binding queue4Binding =BindingBuilder.bind(queue4).to(fanoutExchange);returnnewDeclarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);}}

备注:这里我们将4个队列绑定到了名为

fanout-exchange

的交换机上。

2、定义Direct交换机和队列绑定关系

@ConfigurationprotectedstaticclassDirectExchangeBindingConfig{@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("direct-exchange").build();}@BeanpublicBindingbindingQueue3ToDirectExchange(DirectExchange directExchange,Queue queue3){returnBindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");}}

备注:这里我们定义了名为

direct-exchange

的交换机并通过路由key

queue3-route-key

queue-3

绑定到了该交换机上。


3、定义Topic交换机和队列绑定关系

@ConfigurationprotectedstaticclassTopicExchangeBindingConfig{@BeanpublicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange("topic-exchange").build();}@BeanpublicDeclarablesbindQueueToTopicExchange(TopicExchange topicExchange,Queue queue1,Queue queue2){Binding queue1Binding =BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");Binding queue2Binding =BindingBuilder.bind(queue2).to(topicExchange).with("com.#");returnnewDeclarables(queue1Binding, queue2Binding);}}

这里我们定义了名为

topic-exchange

类型的交换机,该类型交换机支持路由key通配符匹配,

*

代表一个任意字符,

#

代表一个或多个任意字符。

备注:

  1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
  2. 通过路由key com.#queue-2也绑定到了该交换机上。

4、定义Header交换机和队列绑定关系

@ConfigurationprotectedstaticclassHeaderExchangeBinding{@BeanpublicHeadersExchangeheadersExchange(){returnExchangeBuilder.headersExchange("headers-exchange").build();}@BeanpublicBindingbindQueueToHeadersExchange(HeadersExchange headersExchange,Queue queue4){returnBindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");}}

备注:这里我们定义了名为

headers-exchange

类型的交换机,并通过参数

function=logging

queue-4

绑定到了该交换机上。


四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j@ComponentpublicclassRabbitMqConsumer{@RabbitListener(queues ="queue-1")publicvoidhandleMsgFromQueue1(String msg){
        log.info("Message received from queue-1, message body: {}", msg);}@RabbitListener(queues ="queue-2")publicvoidhandleMsgFromQueue2(String msg){
        log.info("Message received from queue-2, message body: {}", msg);}@RabbitListener(queues ="queue-3")publicvoidhandleMsgFromQueue3(String msg){
        log.info("Message received from queue-3, message body: {}", msg);}@RabbitListener(queues ="queue-4")publicvoidhandleMsgFromQueue4(String msg){
        log.info("Message received from queue-4, message body: {}", msg);}}

备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

五、RabbitMQ生产者

@Slf4j@Component@RequiredArgsConstructorpublicclassRabbitMqProducer{privatefinalRabbitTemplate rabbitTemplate;publicvoidsendMsgToFanoutExchange(String body){
        log.info("开始发送消息到fanout-exchange, 消息体:{}", body);Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
        rabbitTemplate.send("fanout-exchange",StringUtils.EMPTY, message);}publicvoidsendMsgToDirectExchange(String body){
        log.info("开始发送消息到direct-exchange, 消息体:{}", body);Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
        rabbitTemplate.send("direct-exchange","queue3-route-key", message);}publicvoidsendMsgToTopicExchange(String routingKey,String body){
        log.info("开始发送消息到topic-exchange, 消息体:{}", body);Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
        rabbitTemplate.send("topic-exchange", routingKey, message);}publicvoidsendMsgToHeadersExchange(String body){
        log.info("开始发送消息到headers-exchange, 消息体:{}", body);MessageProperties messageProperties =MessagePropertiesBuilder.newInstance().setHeader("function","logging").build();Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
        rabbitTemplate.send("headers-exchange",StringUtils.EMPTY, message);}}

六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController@RequiredArgsConstructorpublicclassRabbitMsgController{privatefinalRabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/fanout")publicResponseEntity<String>sendMsgToFanoutExchange(String body){
        rabbitMqProducer.sendMsgToFanoutExchange(body);returnResponseEntity.ok("广播消息发送成功");}@RequestMapping("/exchange/direct")publicResponseEntity<String>sendMsgToDirectExchange(String body){
        rabbitMqProducer.sendMsgToDirectExchange(body);returnResponseEntity.ok("消息发送到Direct交换成功");}@RequestMapping("/exchange/topic")publicResponseEntity<String>sendMsgToTopicExchange(String routingKey,String body){
        rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);returnResponseEntity.ok("消息发送到Topic交换机成功");}@RequestMapping("/exchange/headers")publicResponseEntity<String>sendMsgToHeadersExchange(String body){
        rabbitMqProducer.sendMsgToHeadersExchange(body);returnResponseEntity.ok("消息发送到Headers交换机成功");}}

1、发送到FanoutExchage

直接访问

http://localhost:8080/exchange/fanout?body=hello

,可以看到该消息广播到了4个队列上。

2023-11-0717:41:12.959INFO39460---[nio-8080-exec-9]c.u.r.i.producer.RabbitMqProducer: 开始发送消息到fanout-exchange, 消息体:hello
2023-11-0717:41:12.972INFO39460---[ntContainer#1-5]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-1, message body: hello
2023-11-0717:41:12.972INFO39460---[ntContainer#0-4]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-4, message body: hello
2023-11-0717:41:12.972INFO39460---[ntContainer#3-3]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-3, message body: hello
2023-11-0717:41:12.972INFO39460---[ntContainer#2-4]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-2, message body: hello

2、发送到DirectExchage

访问

http://localhost:8080/exchange/direct?body=hello

,可以看到消息通过路由key

queue3-route-key

发送到了

queue-3

上。

2023-11-0717:43:26.804INFO39460---[nio-8080-exec-1]c.u.r.i.producer.RabbitMqProducer: 开始发送消息到direct-exchange, 消息体:hello
2023-11-0717:43:26.822INFO39460---[ntContainer#3-5]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-3, message body: hello

3、发送到TopicExchange

访问

http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create

,路由key为

com.order.create

的消息分别发送到了

queue-1

queue-2

上。

2023-11-0717:44:45.301INFO39460---[nio-8080-exec-4]c.u.r.i.producer.RabbitMqProducer: 开始发送消息到topic-exchange, 消息体:hello
2023-11-0717:44:45.312INFO39460---[ntContainer#1-3]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-1, message body: hello
2023-11-0717:44:45.312INFO39460---[ntContainer#2-3]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问

http://localhost:8080/exchange/headers?body=hello

,消息通过头部信息

function=logging

发送到了

headers-exchange

上。

2023-11-0717:47:21.736INFO39460---[nio-8080-exec-9]c.u.r.i.producer.RabbitMqProducer: 开始发送消息到headers-exchange, 消息体:hello
2023-11-0717:47:21.749INFO39460---[ntContainer#0-3]c.u.r.i.consumer.RabbitMqConsumer:Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
在这里插入图片描述


本文转载自: https://blog.csdn.net/lingbomanbu_lyl/article/details/134249252
版权归原作者 凌波漫步& 所有, 如有侵权,请联系我们删除。

“Spring RabbitMQ那些事(1-交换机配置&消息发送订阅实操)”的评论:

还没有评论