0


Springboot-RabbitMQ 消息队列使用

一、概念介绍:

RabbitMQ中几个重要的概念介绍:

  • Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。- 交换机类型主要有以下几种:- Direct Exchange(直连交换机):这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景。- Fanout Exchange(扇形交换机):这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。- Topic Exchange(主题交换机):这种类型的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。- Headers Exchange(头交换机):这种类型的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于需要在消息头中携带额外信息的场景。
  • Queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

二、引入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

三、添加配置信息

spring:rabbitmq:host: 127.0.0.1
    port:5672username: guest
    password: guest
    listener:simple:acknowledge-mode: manual  # 手动提交

四、Direct Exchange(直连交换机)模式

1、新建配置文件 RabbitDirectConfig类

packagecom.example.direct;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,
 * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景
 */@ConfigurationpublicclassRabbitDirectConfig{/**
     * 队列名称
     */publicstaticfinalStringQUEUE_MESSAGE="QUEUE_MESSAGE";publicstaticfinalStringQUEUE_USER="QUEUE_USER";/**
     * 交换机
     */publicstaticfinalStringEXCHANGE="EXCHANGE_01";/**
     * 路由
     */publicstaticfinalStringROUTING_KEY="ROUTING_KEY_01";@BeanpublicQueuequeue01(){returnnewQueue(QUEUE_MESSAGE,//队列名称true,//是否持久化false,//是否排他false//是否自动删除);}@BeanpublicQueuequeue02(){returnnewQueue(QUEUE_USER,//队列名称true,//是否持久化false,//是否排他false//是否自动删除);}@BeanpublicDirectExchangeexchange01(){returnnewDirectExchange(EXCHANGE,true,//是否持久化false//是否排他);}@BeanpublicBindingdemoBinding(){returnBindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);}@BeanpublicBindingdemoBinding2(){returnBindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);}}

2、添加消息生产者 Producer类

packagecom.example.direct;importcom.example.entity.User;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@ComponentpublicclassProducer{@ResourceRabbitTemplate rabbitTemplate;publicvoidsendMessageByExchangeANdRoute(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE,RabbitDirectConfig.ROUTING_KEY,message);}/**
     * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
     * @param message
     */publicvoidsendMessageByQueue(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);}publicvoidsendMessage(User user){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);}}

3、添加消息消费者

packagecom.example.direct;importcom.example.entity.User;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@ComponentpublicclassConsumer{@RabbitListener(queues =RabbitDirectConfig.QUEUE_USER)publicvoidonMessage(User user){System.out.println("收到的实体bean消息:"+user);}@RabbitListener(queues =RabbitDirectConfig.QUEUE_MESSAGE)publicvoidonMessage2(String message){System.out.println("收到的字符串消息:"+message);}}

4、 测试

packagecom.example;importcom.example.entity.User;importcom.example.direct.Producer;importcom.example.fanout.FanoutProducer;importcom.example.topic.TopicProducer;importorg.junit.jupiter.api.Test;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestclassSpringbootRabbitMqApplicationTests{@ResourceProducer producer;@TestpublicvoidsendMessage()throwsInterruptedException{
        producer.sendMessageByQueue("哈哈");
        producer.sendMessage(newUser().setAge(10).setName("wasin"));}}

五、Topic Exchange(主题交换机)模式

1、新建RabbitTopicConfig类

packagecom.example.topic;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,
 * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
 */@ConfigurationpublicclassRabbitTopicConfig{/**
     * 交换机
     */publicstaticfinalStringEXCHANGE="EXCHANGE_TOPIC1";/**
     * 队列名称
     */publicstaticfinalStringQUEUE_TOPIC1="QUEUE_TOPIC";/**
     * 路由
     * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
     * 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....
     * aa.bb.wasin.cc 无法匹配
     */publicstaticfinalStringROUTING_KEY1="*.wasin.#";@BeanpublicQueuequeue(){returnnewQueue(QUEUE_TOPIC1,//队列名称true,//是否持久化false,//是否排他false//是否自动删除);}@BeanpublicTopicExchangeexchange(){returnnewTopicExchange(EXCHANGE,true,//是否持久化false//是否排他);}@BeanpublicBindingbinding(){returnBindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);}}

2、新建 消息生产者和发送者

  • TopicProducer类
packagecom.example.topic;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@ComponentpublicclassTopicProducer{@ResourceRabbitTemplate rabbitTemplate;/**
     * @param routeKey 路由
     * @param message 消息
     */publicvoidsendMessageByQueue(String routeKey,String message){
        rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);}}
  • TopicConsumer类
packagecom.example.topic;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@Slf4j@ComponentpublicclassTopicConsumer{@RabbitListener(queues =RabbitTopicConfig.QUEUE_TOPIC1)publicvoidonMessage2(String message){
        log.info("topic收到的字符串消息:{}",message);}}

六、Fanout Exchange(扇形交换机)模式

1、 新建 RabbitFanoutConfig类

packagecom.example.fanout;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,
 * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
 */@ConfigurationpublicclassRabbitFanoutConfig{/**
     * 交换机
     */publicstaticfinalStringEXCHANGE="EXCHANGE_FANOUT";/**
     * 队列名称
     */publicstaticfinalStringQUEUE_FANOUT1="QUEUE_FANOUT";/**
     * 队列名称
     */publicstaticfinalStringQUEUE_FANOUT2="QUEUE_FANOUT2";@BeanpublicQueuequeueFanout1(){returnnewQueue(QUEUE_FANOUT1,//队列名称true,//是否持久化false,//是否排他false//是否自动删除);}@BeanpublicQueuequeueFanout2(){returnnewQueue(QUEUE_FANOUT2,//队列名称true,//是否持久化false,//是否排他false//是否自动删除);}@BeanpublicFanoutExchangeexchangeFanout(){returnnewFanoutExchange(EXCHANGE,true,//是否持久化false//是否排他);}@BeanpublicBindingbindingFanout(){returnBindingBuilder.bind(queueFanout1()).to(exchangeFanout());}@BeanpublicBindingbindingFanout2(){returnBindingBuilder.bind(queueFanout2()).to(exchangeFanout());}}

2、新建 消息生产者和发送者

  • FanoutProducer类:
packagecom.example.fanout;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@ComponentpublicclassFanoutProducer{@ResourceRabbitTemplate rabbitTemplate;/**
     * @param message 消息
     */publicvoidsendMessageByQueue(String message){
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE,"", message);}}
  • FanoutConsumer类
packagecom.example.fanout;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importjava.io.IOException;/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */@Slf4j@ComponentpublicclassFanoutConsumer{/**
     * 手动提交
     * @param message
     * @param channel
     * @param tag
     * @throws IOException
     */@RabbitListener(queues =RabbitFanoutConfig.QUEUE_FANOUT1)publicvoidonMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long tag)throwsIOException{
        log.info("fanout1收到的字符串消息:{}",message);
        channel.basicAck(tag,false);}@RabbitListener(queues =RabbitFanoutConfig.QUEUE_FANOUT2)publicvoidonMessage2(String message){
        log.info("fanout2到的字符串消息:{}",message);}}

本文转载自: https://blog.csdn.net/weixin_38301116/article/details/139444988
版权归原作者 bj_wasin 所有, 如有侵权,请联系我们删除。

“Springboot-RabbitMQ 消息队列使用”的评论:

还没有评论