文章目录
仲裁队列
https://www.rabbitmq.com/docs/quorum-queues
1、创建交换机
和仲裁队列绑定的交换机没有特殊,我们还是创建一个direct交换机即可
交换机名称:exchange.quorum.test
2、创建仲裁队列
队列名称:queue.quorum.test
经典队列类型为Classic
点击队列名称查看详细信息
绑定交换机
3、验证
// 测试是否成功绑定@SpringBootTestpublicclassRabbitMQTest{publicstaticfinalStringEXCHANGE_QUORUM_TEST="exchange.quorum.test";publicstaticfinalStringROUTING_KEY_QUORUM_TEST="routing.key.quorum.test";@ResourceprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessageToQuorum(){
rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST,ROUTING_KEY_QUORUM_TEST,"message test quorum ~~~ @@@");}}
基础配置参考
@Component@Slf4jpublicclassMyProcessor{publicstaticfinalStringQUEUE_QUORUM_TEST="queue.quorum.test";@RabbitListener(queues ={QUEUE_QUORUM_TEST})publicvoidquorumMessageProcess(String data,Message message,Channel channel)throwsIOException{
log.info("消费端:"+ data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
主节点宕机不影响消息发送和接收
流式队列(不推荐,Kafka主场)
概念
- 从客户端支持角度来说,生态尚不健全
- 从使用习惯角度来说,和原有队列用法不完全兼容
- 从竞品角度来说,像Kafka但远远比不上Kafka
- 从应用场景角度来说:
- 经典队列:适用于系统内部异步通信场景
- 流式队列:适用于系统间跨平台、大流量、实时计算场景(Kafka主场)
- 使用建议:Stream队列在目前企业实际应用非常少,真有特定场景需要使用肯定会倾向于使用Kafka,而不是RabbitMQ Stream
- 未来展望:Classic Queue已经有和Quorum Queue合二为一的趋势,Stream也有加入进来整合成一种队列的趋势,但Stream内部机制决定这很 难
异地容灾
Disaster Recovery at a Different Location
- Federation插件
- Shovel插件
概述:是通过配置联邦交换机拉取上游节点消息,关于联邦交换机的配置都在下游节点配置upstream,上游节点为常规配置
一、Federation插件
官方链接:
Federation Plugin:https://www.rabbitmq.com/docs/federation
Federated-exchanges:https://www.rabbitmq.com/docs/federated-exchanges
概述
Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。
它可以在不同的管理域中的Broker或集群间传递消息,这些管理域可能设置了不同的用户和vhost,也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP 0-9-1协议在不同的Broker之间进行通信,并且设计成能够容忍不稳定的网络连接情况。
二、Federation交换机
1、总体说明
- 各节点操作:启用联邦插件
- 下游操作: - 添加上游连接端点- 创建控制策略
2、准备工作
为了执行相关测试,我们使用Docker创建两个RabbitMQ实例。
特别提示:由于Federation机制的最大特点就是跨集群同步数据,所以这两个Docker容器中的RabbitMQ实例不加入集群!!!是两个独立的broker实例。
docker run -d\--name rabbitmq-shenzhen \-p51000:5672 \-p52000:15672 \-v rabbitmq-plugin:/plugins \-eRABBITMQ_DEFAULT_USER=guest \-eRABBITMQ_DEFAULT_PASS=123456\
rabbitmq:3.13-management
docker run -d\--name rabbitmq-shanghai \-p61000:5672 \-p62000:15672 \-v rabbitmq-plugin:/plugins \-eRABBITMQ_DEFAULT_USER=guest \-eRABBITMQ_DEFAULT_PASS=123456\
rabbitmq:3.13-management
使用
dockerps
3、启用联邦插件
在上游、下游节点中都需要开启。
Docker容器中的RabbitMQ已经开启了rabbitmq_federation,还需要开启rabbitmq_federation_management
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmq_federation_management插件启用后会在Management UI的Admin选项卡下看到:
4、添加上游连接端点
在下游节点填写上游节点的连接信息:
# #
amqp://guest:[email protected]:51000
添加后
5、创建控制策略
6、测试
测试计划
特别提示
- 普通交换机和联邦交换机名称要一致
- 交换机名称要能够和策略正则表达式匹配上
- 发送消息时,两边使用的路由键也要一致
- 队列名称不要求一致
创建组件
交换机名称和路由建键相同,队列名称可不同
所在机房交换机名称路由键队列名称深圳机房(上游)federated.exchange.demorouting.key.demo.testqueue.normal.shenzhen上海机房(下游)federated.exchange.demorouting.key.demo.testqueue.normal.shanghai
创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
查看状态
发布消息执行测试
在上游节点向交换机发布消息:
上游
下游
三、Federation队列
1、总体说明
Federation队列和Federation交换机的最核心区别就是:
- Federation Police作用在交换机上,就是Federation交换机
- Federation Police作用在队列上,就是Federation队列
2、创建控制策略
queue.federation
^fed.queue.
3、测试
测试计划
上游节点和下游节点中队列名称是相同的,只是下游队列中的节点附加了联邦策略而已
所在机房交换机路由键队列深圳机房(上游)exchange.normal.shenzhenrouting.key.normal.shenzhenfed.queue.demo上海机房(下游)————fed.queue.demo
创建组件
上游节点都是常规操作,此处省略。重点需要关注的是下游节点的联邦队列创建时需要指定相关参数:
创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
执行测试
在上游节点向交换机发布消息:
注意:
但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。
对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消息的!
如果有消费端监听联邦队列,那么首先消费联邦队列自身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。
所以现在的测试效果需要消费端程序配合才能看到:
消费端配置
application.yml
spring:rabbitmq:host: 192.168.217.134
port:61000username: guest
password:123456virtual-host: /
listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认
消费端
@Component@Slf4jpublicclassMyMessageListener{publicstaticfinalStringQUEUE_NAME="fed.queue.demo";@RabbitListener(queues ={QUEUE_NAME})publicvoidprocessMessage(String dataString,Message message,Channel channel)throwsIOException{
log.info("[Federation]"+ dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
版权归原作者 Lucky_Turtle 所有, 如有侵权,请联系我们删除。