0


RabbitMQ

为什么要用 MQ
1.流量消峰
2.应用解耦
3.异步处理,比如后面讲到的异步的发布确认

四大核心概念
生产者、交换机、队列、消费者

工作原理
RabbitMQ的工作原理

Broker:接收和分发消息的应用,RabbitMQ Server 就是Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

代码上大体是,
配置类:定义交换机和队列,然后二者进行绑定
生产者:封装好的类,发送到交换机,交换机跟据路由键发送到消息指定队列
消费者:对消息处理

六种模式
六种模式
第一个模式的代码:

publicclassProducer{privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args)throwsException{//创建一个连接工厂ConnectionFactory factory =newConnectionFactory();
        factory.setHost("182.92.234.71");
        factory.setUsername("admin");
        factory.setPassword("123");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){/**
            * 生成一个队列
            * 1.队列名称。队列的名称于我们来说至关重要,我们需要指定我们的消费者去消费哪个队列的消息。
            * 2.队列里面的消息是否持久化 默认消息存储在内存中
            * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
            * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
            * 5.其他参数
            */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";/**
            * 发送一个消息
            * 1.发送到那个交换机
            * 2.路由的 key 是哪个
            * 3.其他的参数信息
            * 4.发送消息的消息体
            */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}}}
publicclassConsumer{privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("182.92.234.71");
        factory.setUsername("admin");
        factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag,delivery)->{String message=newString(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消费被中断");};/**
        * 消费者消费消息
        * 1.消费哪个队列
        * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
        * 3.消费者未成功消费的回调
        */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}

生产者:获取channel、声明队列(queueDeclare)、发送消息(basicPublish)
消费者:获取channel、接收和拒绝的回调接口、消费消息(basicConsume)

消息应答
自动应答 (默认)
要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

手动应答的三个方法。手动应答的好处是可以【批量应答】并且减少网络拥堵。

Channel.basicAck(用于肯定确认);// RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了Channel.basicNack(用于否定确认);Channel.basicReject(用于否定确认);// 与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将【队列和消息】都标记为持久化。

将消息标记为持久化并不能完全保证不会丢失消息。如果需要
更强有力的持久化策略,参考后边课件发布确认章节。

不公平分发:

int prefetchCount =1;// 预取值
channel.basicQos(prefetchCount );// 预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下

预取值

发布确认
不要把消息应答和发布确认搞混了,
消息应答(手动)是,你的消息已经被消费了或拒绝了,一个明确的答复。
发布确认是,消息我发送到队列里了,消费与否是另外一回事。

一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。
单个发布确认,批量发布确认(只是和单个发布确认的逻辑不同而已)

channel.confirmSelect();

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功.
异步确认

publicstaticvoidpublishMessageAsync()throwsException{try(Channel channel =RabbitMqUtils.getChannel()){String queueName =UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);//开启发布确认
        channel.confirmSelect();/**
        * 线程安全有序的一个哈希表,适用于高并发的情况
        * 1.轻松的将序号与消息进行关联
        * 2.轻松批量删除条目 只要给到序列号
        * 3.支持并发访问
        */ConcurrentSkipListMap<Long,String> outstandingConfirms =newConcurrentSkipListMap<>();/**
        * 确认收到消息的一个回调
        * 1.消息序列号
        * 2.true 可以确认小于等于当前序列号的消息
        * false 确认当前序列号消息
        */ConfirmCallback ackCallback =(sequenceNumber, multiple)->{if(multiple){//返回的是小于等于当前序列号的未确认消息 是一个 mapConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(sequenceNumber,true);//清除该部分未确认消息
                confirmed.clear();}else{//只清除当前序列号的消息
                outstandingConfirms.remove(sequenceNumber);}};ConfirmCallback nackCallback =(sequenceNumber, multiple)->{String message = outstandingConfirms.get(sequenceNumber);System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);};/**
        * 添加一个异步确认的监听器
        * 1.确认收到消息的回调
        * 2.未收到消息的回调
        */
        channel.addConfirmListener(ackCallback,null);long begin =System.currentTimeMillis();for(int i =0; i <MESSAGE_COUNT; i++){String message ="消息"+ i;/**
            * channel.getNextPublishSeqNo()获取下一个消息的序列号
            * 通过序列号与消息体进行一个关联
            * 全部都是未确认的消息体
            */
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("", queueName,null, message.getBytes());}long end =System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+(end - begin)+"ms");}}

异步的含义
异步:输出“发布1000个异步发布确认消息,耗时62ms”的时候,已经发完了,但是确认并没有完成,二者互相不耽误。

如何处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks(成功和失败) 与发布线程之间进行消息的传递。发消息的线程和监听器的线程。一个线程负责监听、一个线程负责发送并打印结果。两个线程如何交互:ConcurrentLinkedQueue。老师这里实际用的是:ConcurrentSkipListMap
弹幕:如果你需要【按顺序存储】键值对并且希望能够高效地进行并发读写操作,SkipListMap 是一个不错的选择。只需线程安全的队列,【不关心元素的顺序】, LinkedQueue 更适合。

1、发消息的时候把发送的,放到ConcurrentSkipListMap里,outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
2、在确认回调里,删除了已经确认的,剩下就是未确认的喽。ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
弹幕:headMap方法返回的是一个视图,对视图的操作会影响原来的 map,所以清空headMap没有问题(ConcurrentNavigableMap)这是一个支持高并发的map,headmap是生成一个映射并不是new了一个新的map
headMap方法,截取 ConcurrentSkipListMap 集合元素,将 sequenceNumber 之前的截取到 ConcurrentNavigableMap 里面。

3、未确认的回调里面处理,没有确认的消息

代码模式:
生产者:获取连接,发动到队列,只不过有两个异步确认的回调函数
消费者:

下面引入交换机,
fanout 代码就是在生产者和消费者之中,声明交换机并通过路由键绑定临时队列。
direct 代码就是在消费者之中,声明交换机并通过路由键绑定声明的队列;生产者中,声明交换机但是没有绑定。
topic 代码就是在消费者之中,声明交换机并通过路由键绑定声明的队列;生产者中,声明交换机但是没有绑定。

死信队列 代码就是在消费者之中,死信队列绑定死信交换机,正常队列绑定死信交换机,channel.queueDeclare(normalQueue, false, false, false, params);;生产者中,声明交换机但是没有绑定。

延时队列代码上大体是,
配置类:定义交换机和队列,然后二者进行绑定
生产者:封装好的类,发送到交换机,交换机跟据路由键发送到消息指定队列
消费者:对消息处理

如何处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

发布/订阅
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。
直接(direct),主题(topic),标题(headers) ,扇出(fanout)
fanout
扇形交换机

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 交换机在生产者中定义

direct

direct交换机
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定
topic
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型。发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”,这种类型的。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
topic交换机

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

死信队列
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
死信架构图
DLX:(dead-letter-exchange的缩写)死信队列交换机
DLK:(dead-letter-routing-key的缩写)死信队列routingKey
TTL:(time-to-live的缩写)存活时间

publicclassProducer{privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";publicstaticvoidmain(String[] argv)throwsException{try(Channel channel =RabbitMqUtils.getChannel()){
            channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for(int i =1; i <11; i++){String message="info"+i;
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan", properties, message.getBytes());System.out.println("生产者发送消息:"+message);}}}}
publicclassConsumer01{//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] argv)throwsException{Channel channel =RabbitUtils.getChannel();//声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);//死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key","lisi");String normalQueue ="normal-queue";
        channel.queueDeclare(normalQueue,false,false,false, params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println("Consumer01 接收到消息"+message);};
        channel.basicConsume(normalQueue,true, deliverCallback, consumerTag ->{});}}

延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
场景

消息设置 TTL 、队列设置 TTL 。
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列过期的消息会被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

死信队列、 TTL,RabbitMQ 实现延时队列的两大要素。

思考
多了一个类型
注意这里多出来的类型:x-delayed-message

先发送延时20000(20秒)的请求,再发送延时2000(2秒)的请求。
按理来说应该是2000(2秒)的先被消费,20000(20秒)的后被消费。
第一个,58分56秒到59分16秒刚好20秒,但是为啥第二个为02秒到16秒是12秒呢,因为延时队列有顺序。多个请求的情况下,你只能第一个先消费了,第二个才能消费,不管你需要延时多少秒。还记得在消息上设置ttl的隐藏缺陷吗?
在消息上设置ttl
代码架构图

@ConfigurationpublicclassDelayedQueueConfig{publicstaticfinalStringDELAYED_QUEUE_NAME="delayed.queue";publicstaticfinalStringDELAYED_EXCHANGE_NAME="delayed.exchange";publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.routingkey";@BeanpublicQueuedelayedQueue(){returnnewQueue(DELAYED_QUEUE_NAME);}//自定义交换机 我们在这里定义的是一个延迟交换机【CustomExchange】@BeanpublicCustomExchangedelayedExchange(){Map<String,Object> args =newHashMap<>();//自定义交换机的类型
        args.put("x-delayed-type","direct");// 交换机名字、交换机类型(安装插件后出现的)、是否持久化、是否自动删除、参数returnnewCustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false, args);}@BeanpublicBindingbindingDelayedQueue(@Qualifier("delayedQueue")Queue queue,@Qualifier("delayedExchange")CustomExc delayedExchange){returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

发布确认高级

不要把消息应答和发布确认搞混了,
消息应答(手动)是,你的消息已经被消费了或拒绝了,一个明确的答复(回调)。
发布确认(异步确认)是,消息我发送到队列里了,消费与否是另外一回事,用的是监听。
发布确认的高级也是通过回调。
发到队列里面需要交换机,但是交换机出问题了咋整啊?这就是高级要解决的

生产者知道消息已经正确到达目的队列了,broker就会发送一个确认给生产者(包含消息的唯一 ID);如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出
初级方案

内部的回调接口

关键代码

@Component@Slf4jpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback{// 我们实现的是 RabbitTemplate 内部的一个接口 ConfirmCallback,但是我们的实现类MyCallBack 不在 RabbitTemplate 里面,导致 RabbitTemplate 将来调用不到 ConfirmCallback 这个自身接口,需要把我们的 MyCallBack 注入到 RabbitTemplate 的 ConfirmCallback 接口里@AutowiredRabbitTemplate rabbitTemplate;@PostConstruct// 这个注解是在@Component、@Autowired之后执行publicvoidinit(){
    rabbitTemplate.setConfirmCallback(this);// this,就是当前类喽}//当前类,注入到这个类的这个接口上,才能使用这个接口,这个实现类///**
    * 交换机不管是否收到消息的一个回调方法
    * CorrelationData,发消息我们提供的
    * 消息相关数据
    * ack
    * 交换机是否收到消息
    */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){String id=correlationData!=null?correlationData.getId():"";if(ack){
            log.info("交换机已经收到 id 为:{}的消息",id);}else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}}

生产者,应该能知道,交换机出了问题,我的消息,没有发出去。

下图模拟交换机正常,交换机都能正确应答,但是在队列不正常的时候,并且队列失败的信息无法给到生产者
在这里插入图片描述

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(队列出了问题),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

@Component@Slf4jpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{// 我们实现的是 RabbitTemplate 内部的一个接口 ConfirmCallback,但是我们的实现类MyCallBack 不在 RabbitTemplate 里面,导致 RabbitTemplate 将来调用不到 ConfirmCallback 这个自身接口,需要把我们的 MyCallBack 注入到 RabbitTemplate 的 ConfirmCallback 接口里@AutowiredRabbitTemplate rabbitTemplate;@PostConstruct// 这个注解是在@Component、@Autowired之后执行publicvoidinit(){
    rabbitTemplate.setConfirmCallback(this);// this,就是当前类喽,交换机
    rabbitTemplate.setReturnCallback (this);// this,就是当前类喽,队列}//当前类,注入到这个类的这个接口上,才能使用这个接口,这个实现类/**
    * 交换机不管是否收到消息的一个回调方法
    * CorrelationData
    * 消息相关数据
    * ack
    * 交换机是否收到消息
    */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){String id=correlationData!=null?correlationData.getId():"";if(ack){
            log.info("交换机已经收到 id 为:{}的消息",id);}else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法(就是交换机路由到队列的那个时候)@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){
        log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}}

高级发布确认解决了消息丢失(交换机、队列)生产者无感知的问题,但是不够好。我们使用备份交换机更好地处理

备份交换机
高级发布确认的备份交换机

可以为队列设置死信交换机来存储那些 处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

@ConfigurationpublicclassConfirmConfig{publicstaticfinalStringCONFIRM_EXCHANGE_NAME="confirm.exchange";publicstaticfinalStringCONFIRM_QUEUE_NAME="confirm.queue";publicstaticfinalStringBACKUP_EXCHANGE_NAME="backup.exchange";publicstaticfinalStringBACKUP_QUEUE_NAME="backup.queue";publicstaticfinalStringWARNING_QUEUE_NAME="warning.queue";// 声明确认队列@Bean("confirmQueue")publicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")publicFanoutExchangebackupExchange(){returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME);return(DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")publicQueuewarningQueue(){returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@BeanpublicBindingwarningBinding(@Qualifier("warningQueue")Queue queue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")publicQueuebackQueue(){returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@BeanpublicBindingbackupBinding(@Qualifier("backQueue")Queue queue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(queue).to(backupExchange);}}

主要是

//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME);return(DirectExchange)exchangeBuilder.build();}

需要把确认交换机和备份交换机进行产生关联。

测试结果
备份交换机,报警队列进行消费(处理)

注意:
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

RabbitMQ 其他知识点

幂等性、优先队列、惰性队列

幂等性:
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。

消息重复消费:
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单的消费者消费 MQ 中的消息,也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作
:a、唯一 ID+指纹码机制,利用数据库主键去重。优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
b、利用 redis 的原子性去实现。利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。(setnx:只有当该key不存在的时候才加入,否则不加入)

优先级队列的例子。
而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
设置优先级队列
当然,上班使用代码,不用页面。
要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待【消息已经发送到队列】中才去消费因为,这样才有机会对消息进行排序。

惰性队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列(交换机也是),然后再重新声明一个新的。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String,Object> args =newHashMap<String,Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("myqueue",false,false,false, args);

惰性队列的内存开销

RabbitMQ 集群

最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?

镜像队列,使用镜像的原因
如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

联邦交换机:
镜像队列是集群的,这个是两个独立服务器之间

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/qq_43465706/article/details/136448659
版权归原作者 年轻人999 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论