0


RabbitMQ讲解与整合

RabbitMq安装

类型概念

租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机
在这里插入图片描述
交换机属性意义值意义type类型direct默认的直接交换机
根据交换机下队列绑定的routingKey直接匹配fanout扇形交换机
简单来说就是发布订阅
队列直接绑定在交换机下,统一发布消息headers头部交换机,通过message header头部信息进行比对
可以根据定义全匹配、部分匹配等规则topic主题交换机
通过绑定routingKey进行模糊匹配Durability耐用
(持久化)durable持久化,数据存放于硬盘transient瞬态,数据存放于内存Auto delete自动删除Yes没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物No不自动删除Internal内部使用Yes该路由绑定的队列不会被用户消费No不自动删除

队列

在这里插入图片描述
队列属性意义值意义type类型Default for virtual host租户配置的默认选项,下列三种其一
默认Classic无需设置Classic传统的队列类型
数据存储在单个节点上
不具备quorum队列的高可用性和数据保护特性
ps:单机时使用Quorum高可用性队列
数据会被复制到多个节点
提供更好的数据可靠性和持久性
ps:部署多节点时使用Stream特殊类型的队列
用于支持事件流处理(event streaming)
具有类似于Kafka的流式处理特性
ps:听说不成熟,暂时用不上Durability耐用
(持久化)durable持久化,数据存放于硬盘transient瞬态,数据存放于内存
参数:
显示参数实际参数作用Auto expirex-expires设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除Message TTLx-message-ttl设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除Overflow behaviourx-overflow设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息)Single active consumerx-single-active-consumer配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接Dead letter exchangex-dead-letter-exchange设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机Dead letter routing keyx-dead-letter-routing-key设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机Max lengthx-max-length设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队Max length bytesx-max-length-bytes设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队Leader locatorx-queue-leader-locator配置队列的领导者(Leader)定位器,集群中使用

SpringBoot整合

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version></dependency>

配置数据源

spring:rabbitmq:addresses: xxx.xxx.xx.xx:5672username: admin
    password: xxxxxx
    virtual-host: /

配置交换机和队列

@ComponentpublicclassRabbitMqConfig{// 定义交换机名称publicstaticfinalStringFANOUT_EXCHANGE="fanout.test";@Bean(name =FANOUT_EXCHANGE)publicFanoutExchangefanoutExchange(){// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息// 参数2:是否持久化// 参数3:是否自动删除returnnewFanoutExchange(FANOUT_EXCHANGE,true,false);}//  定义队列publicstaticfinalStringFANOUT_QUEUE1="queue1";@Bean(name =FANOUT_QUEUE1)publicQueuefanoutQueue1(){// 后三个不写也行,这是默认值// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)// 参数3:是否具有排他性// 参数4:队列不再使用时是否自动删除returnnewQueue(FANOUT_QUEUE1,true,false,false);}publicstaticfinalStringFANOUT_QUEUE2="queue2";@Bean(name =FANOUT_QUEUE2)publicQueuefanoutQueue2(){returnnewQueue(FANOUT_QUEUE2,true,false,false);}@BeanpublicBindingbindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1)Queue fanoutQueue1,@Qualifier(FANOUT_EXCHANGE)FanoutExchange fanoutExchange){// 将交换机和队列绑定returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingbindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2)Queue fanoutQueue2,@Qualifier(FANOUT_EXCHANGE)FanoutExchange fanoutExchange){// 将交换机和队列绑定returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

测试发一条消息到队列

@SpringBootTest(classes =TemplateApplication.class)publicclassRabbitMQTest{@AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;@TestpublicvoidtestSent(){//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)
        rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。}}

测试接收队列消息

写个监听类接收消息:

@ComponentpublicclassRabbitMqListenter{@RabbitListener(queues ={RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})publicvoidreciveLogAll(String msg)throwsException{System.out.println("消费到数据:"+ msg);}}

-------------基础的使用到这里就结束了-------------

拓展事项

rabbitMqPusher

自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate

,但它们在使用方式和功能上有一些不同点:

RabbitMessagingTemplate:

RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。

RabbitTemplate:

RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。

publicinterfaceRabbitMqPublish{voidsend(String quene,String message);voidsend(String exchange,String routingKey,String message);voidsend(String quene,String message,Integer expiration);voidsend(String exchange,String routingKey,String message,Integer expiration);}
packagecom.template.rabbitmq.producer.impl;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.MessageBuilder;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassRabbitMqPublishImplimplementsRabbitMqPublish{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
     * 发送消息
     * @param quene   队列名称 或 交换机名称
     * @param message 消息内容
     */publicvoidsend(String quene,String message){
        rabbitTemplate.send(quene,MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> quene:{} ---> message:{}", message, quene);}/**
     * 直接发送消息到队列
     * 超过有效期丢弃
     *
     * @param quene      队列名称
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */publicvoidsend(String quene,String message,Integer expiration){
        rabbitTemplate.send(quene,MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);}/**
     * 发送消息
     * 超过有效期丢弃
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */publicvoidsend(String exchange,String routingKey,String message,Integer expiration){
        rabbitTemplate.send(exchange, routingKey,MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);}/**
     * 发送消息
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     */publicvoidsend(String exchange,String routingKey,String message){
        rabbitTemplate.send(exchange, routingKey,MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);}}
  • 在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。*

实测以上列代码的方式直接对消息设置有效期是生效的。

死信队列

和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由

// 配置交换机的文件中继续增加配置publicstaticfinalStringDIRECT_GP_DEAD_LETTER_EXCHANGE="DIRECT_GP_DEAD_LETTER_EXCHANGE";publicstaticfinalStringDIRECT_GP_DEAD_LETTER_QUEUE="DIRECT_GP_DEAD_LETTER_QUEUE";@Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)publicDirectExchangedirectDeadLetterExchange(){returnnewDirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE,true,false,newHashMap<>());}@Bean(DIRECT_GP_DEAD_LETTER_QUEUE)publicQueuedirectDeadLetterQueue(){returnnewQueue(DIRECT_GP_DEAD_LETTER_QUEUE,true,false,false,newHashMap<>());}

设置队列消息有效期并绑定死信队列

@Bean(name =DIRECT_QUEUE1)publicQueuedirectQueue1(){HashMap<String,Object> headers =newHashMap<>();// 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期
        headers.put("x-message-ttl",10000);// 配置超期交换机,消息过期后会发送到此交换机
        headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey,消息过期后转移消息时指定的routingKey
        headers.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃returnnewQueue(DIRECT_QUEUE1,true,false,false,headers);}

配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。

延时队列
延时队列场景举例:

预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。

延时队列的实现方式:

死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。

标签: rabbitmq

本文转载自: https://blog.csdn.net/qwe1047978302/article/details/136226731
版权归原作者 _多说无益 所有, 如有侵权,请联系我们删除。

“RabbitMQ讲解与整合”的评论:

还没有评论