RabbitMQ
一、RabbitMQ是什么?为什么要使用它?
1.Rabbitmq属于消息队列中的一种,常用的消息队列技术还有kafka,RockerMq等等。Rabbitmq的稳定性比较强。
2.消息队列主要帮我们解决了系统的高并发问题,可以提高系统的性能。
(1)异步
消息队列中有两个重要的概念,一个是生产者,负责生产消息到MQ,一个是消费者,负责消费消息。当生产者生产完消息之后,可以放到MQ中,而不用等待消费者的回应,进而生产者可以继续做其他的事情。消费者只需监听这个MQ,就可以完成消息的异步消费,这样可以大大提升系统的效率。
(2)解耦
MQ可以实现系统与系统之间的解耦。例如现在有一个订单系统,一个快递系统,一个短信系统。用户在下单之后,需要通知快递系统补全发货信息,以及短信系统通知用户下单成功的信息。
假如刚开始只有订单系统和快递系统,这两个系统采用同步的方式传递信息,这样也问题不大,但是系统的性能就降低了一些。
但是现在增加一个需求,需要新增一个短信系统向用户发送信息。如果这个短信系统继续采用同步的方法,接在订单系统的后面,那么会导致两个问题:新增短信系统之后,订单系统的代码要进行修改;三个系统之间的耦合度很高,未来如果订单系统出现问题,则剩下的两个系统都要进行修改。这样不符合软件设计的开闭原则。
而如果加上MQ就能实现系统之间解耦,以及系统之间消息的异步传输。
在订单系统与其他两个系统之间添加了一层MQ,则解决了上述问题。订单系统只需将用户下单成功的消息发送到MQ后就可以向用户返回结果,提高了系统的效率;其他两个系统监听这个MQ,一旦有消息,则会进行消费。新增的短信系统则只需要面向MQ进行代码的编写和修改,原有的两个系统不需要进行代码的改动
,符合开闭原则。
(3)削峰填谷
再考虑一种情况,假如现在正值双十一,用户下单的数量激增,订单系统向MQ发送了巨量消息,进而导致快递系统和短信系统的崩溃,这种情况应该如何使用MQ进行解决?
MQ可以使用限流的方案来进行上述情况的削峰填谷。首先,我们可以在生产者这边做文章,在订单系统发送信息到MQ时,我们可以设定只允许其一次发送1000条信息,这样剩下的两个系统就没那么大的消费压力了;其次,我们可以限定消费者端一次性只能接收1000条消息,只有这1000条消息消费完了之后才能进行下一批消息的获取。这样就能实现系统的削峰填谷,缓解系统的压力。
二、RabbitMQ的框架图
(1)producer:生产者负责发送消息,有可能是系统、队列等等;
(2)Connection:连接。一个连接中包含多个信道,生产者通过信道向交换机发送消息(一般生产者在发送完消息之后就和信道断开连接,以节约资源);
(2)broker:一个MQ服务器为一个broker;
(3)exchange:交换机,根据路由规则负责转发消息到相应的队列。常见的交换机类型有fanout、direct、topic等等;
(4)queue:暂时存储消息的队列;
(5)consumer:消费者负责消费消息;
三、RabbitMQ的工作模式
官网提供了七种模式,常用的只有五种。
官网模式说明
(1)简单模式
简单模式中一个队列对应一个消费者,没有其他的消费者绑定到队列。生产者发送消息到默认交换机(也可以理解为没有交换机),默认交换机转发到队列,再由单一的消费者进行消费。
(2)工作模式
work模式与简单模式最大的区别就是有多个消费者绑定到队列上,而且多个消费者之间是竞争关系,谁先抢到消息谁先消费。
(3)发布订阅模式
发布订阅模式中有一个交换机X,这个交换机的类型是fanout(扇形交换机)。生产者发送消息到交换机,交换机为与它绑定的所有队列都转发一份相同的消息,在消费者消费这些队列的信息之前,这多个队列存储的消息是完全一样的。
(4)路由模式
路由模式中存在一个direct类型的交换机,交换机与各个队列之间通过不同的路由key值相绑定。生产者在发送消息到交换机的时候,需要携带不同的路由key,以便交换机知道要将这条消息转发到哪个队列。
(5)topic模式
主题模式是特殊的路由模式,它的交换机类型是topic,交换机与队列之间绑定的key值可以使用通配符:* ** 表示可以恰好代替一个单词,# 表示可以替代0个或者多个单词。**
例如生产发送消息带上路由 a.orange.b ,则这条消息被路由到Q1;带上路由 a.b.rabbit 或者 lazy 或者 lazy.a 或者lazy.a.b ,这条消息会被路由到Q2;
四、RabbitMQ消息的可靠性
(1)从生产者的角度考虑这件事:生产者首先将消息发给交换机,然后交换机将消息转发给队列。在这个过程中,我们如何保证消息一定到达了交换机呢?就算到达了交换机之后,又如何保证消息一定到达了队列呢?万一过程中出现了网络波动或者其他因素,这个消息不就传递不到了吗?
针对上述的情况,Rabbitmq为我们提供了解决办法,来保证生产者的消息可靠性。
首先对于第一步:生产者到交换机,Rabbitmq提供了退回模式。即如果消息没有到达交换机,则Rabbitmq会返回一个false的ack,以及原因。来表示交换机没有接收到消息,你可以进行后续的重发处理或者别的什么操作。
第二步:交换机到队列,Rabbitmq提供了返回模式。即如果消息没有到达队列,Rabbitmq也会返回响应码,响应消息等等来表示队列没接收到消息,你也可以根据响应消息进行后续的重发处理或者别的什么操作。
在springboot项目中的Rabbitmq的配置文件中进行如下设置可以实现:
@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);/*
* 设置 生产者------->交换机 的确认模式回调
* 当生产者发送的消息成功到达交换机时,此回调函数的参数2返回true
* 否则返回false,以及原因*/
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean b,String s){/*
* 参数1:生产者发送消息的时候携带的相关信息
* 参数2:ack确认
* 参数3:原因*/if(!b){System.out.println("确认模式 生产者------->交换机 发送失败");System.out.println("生产者发送消息时携带的相关数据 "+ correlationData);System.out.println("ack "+ b);System.out.println("cause "+ s);}/*System.out.println("确认模式 生产者------->交换机 ");
System.out.println(correlationData);
System.out.println("ack " + b);
System.out.println("cause " + s);
System.out.println();*/}});/*
* 设置 交换机-------->队列 的返回模式
* 只有当交换机接收到信息无法发送到队列时,才会触发这个回调函数
* !!!!!!注意是 ReturnCallback */
rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){@OverridepublicvoidreturnedMessage(Message message,int i,String s,String s1,String s2){System.out.println("返回模式 交换机-------->队列 发送失败");System.out.println("消息 "message);System.out.println("响应码 "+ i);System.out.println("响应消息 "+ s);System.out.println("交换机 "+ s1);System.out.println("路由key "+ s2);}});return rabbitTemplate;}
(2)再考虑一件事,就算消息到达了队列,生产者如何知道消息是否被成功消费了呢?
同样的,Rabbitmq也为我们提供了消费者端的确认模式,来保证生产者知道消息是否被成功消费。
在springboot的消费者项目中的yml配置文件中添加这样的配置:
//simple是简单队列,工作队列模式下的消费者开启ack配置//direct是扇形,路由,主题等模式下的消费者开启ack配置//mode的取值 (1)none表示不确认 (2)manual表示开启手动确认 (3)Auto表示自动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
并且在消费者消费完消息后,要添加手动确认的代码,例如:
@Component@RabbitListener(queues ="spring-boot-fanout-queue02")publicclass fanoutConsumer02 {/**
*
* @param message
* @param channel
* @throws IOException
* @throws InterruptedException
*/@RabbitHandler(isDefault =true)publicvoidreceiveMessage(Message message,Channel channel)throwsIOException,InterruptedException{//接收Message对象中的String字符串System.out.println("consumer02 "+newString(message.getBody()));//手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
这样生产者就能根据ack应答知道消费者是否成功消费,如果没有成功消费,由用户决定下一步操作,重发、丢弃等等。
五、RabbitMQ消费端限流
在消费者端的系统的yml配置文件中添加如下配置,则可以让消费者一次性只能从队列中取两条消息:
//simple是简单队列,工作队列模式下的消费者开启限流模式//direct是扇形,路由,主题等模式下的消费者开启限流模式
spring.rabbitmq.listener.direct.prefetch=2
spring.rabbitmq.listener.simple.prefetch=2
六、消息存活时间、死信队列与延迟队列
(1)TTL
考虑这样一个问题:如果生产者发送到队列中消息一直无人消费,最终导致队列的消息溢出,这种情况怎么办?
Rabbitmq也为我们提供了解决方案,我们可以为每个队列中所有的消息都设置一个过期时间ttl,或者在生产者发送消息的时候为单条消息设置过期时间ttl,当ttl时间过了之后,可以把这条消息重发、丢弃或者转发到别的队列等等。这样的话就算消息无人消费,也不会造成队列的拥堵。
为单条消息设置ttl:
发送消息时带上MessagePostProcessor 对象
publicvoidsendMessage(){String message ="hello";Message msg =newMessage(message.getBytes());/*
* 参数1:交换机名字
* 参数2:路由key
* 参数3:信息
* *///CorrelationData correlationData = new CorrelationData("111");MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{//设置消息的过期时间为五秒
message.getMessageProperties().setExpiration("5000");return message;}};//发送消息
rabbitTemplate.convertAndSend("spring-boot-fanout-exchange","",msg,messagePostProcessor)}
为整个队列中所有的消息设置过期时间:
需要在创建队列时配置
@Bean(name ="queue01")publicQueuegetQueue01(){//返回一个队列,名字为 spring-boot-fanout-queuereturnQueueBuilder.durable("spring-boot-fanout-queue01")//设置队列中所有消息的过期时间为五秒.ttl(5000).build();}
(2)死信队列
首先什么是死信,死信就是死了的信息。以下是死信的三种来源:
过期的消息、被消费者拒绝消费且不重新入队的消息 以及 队列已满之后继续发往队列的消息
当一个消息成为死信之后,如果有需求要将死信转发到一个队列中,那么这个队列就是死信队列。转发过程中经过的交换机称为死信交换机。
死信队列架构图如下:
在创建正常队列时对其进行死信交换机的绑定:
@Bean(name ="normalQueue")publicQueuegetNormalQueue(){returnQueueBuilder.durable("spring-boot-normal-queue").ttl(5000).deadLetterExchange("spring-boot-fanout-delay-exchange").deadLetterRoutingKey("delayMessage").build();}
(3)延迟队列
顾名思义,延迟队列就是队列中的消息不要被立马消费,而是延迟一段时间后再被消费。然而Rabbitmq没有提供现成的延迟队列,需要我们使用TTL+死信队列进行实现。
延迟队列的经典使用场景就是订票的时候,当你下单了之后但是没有付款,你的下单信息会被发送到一个过期时间为30分钟的队列;30分钟过了之后,这条消息变成了死信,会被转发到死信队列中;库存系统充当消费者来消费这条死信,如果你在30分钟内支付了订单,则什么也不做,如果你未支付,则取消订单,回滚库存。
七、RabbitMQ高可用集群搭建
我们可以在一台主机上创建多个Rabbitmq节点(或者在多台主机上创建),使这些节点之间相互同步信息,以实现Rabbitmq的高可用。就算一个节点挂掉,客户端也可以从另外的节点获取信息,因为所有的节点存储的信息都是相同的。
版权归原作者 chaozhouchao 所有, 如有侵权,请联系我们删除。