什么是RabbitMQ,有什么特点
- 消息传递模式:RabbitMQ支持多种消息传递模式,包括发布/订阅、点对点和工作队列等,使其更灵活适用于各种消息通信场景。
- 消息路由和交换机:RabbitMQ引入交换机的概念,用于将消息路由到一个或多个队列。允许根据消息的内容、标签或路由键进行灵活的消息路由,从而实现更复杂的消息传递逻辑。
- 消息确认机制:RabbitMQ支持消息确认机制,消费者可以确定已成功处理消息。确保了消息不会再传递后被重复消费,增加了消息的可靠性。
- 消息持久性:RabbitMQ允许消息和队列的持久性设置,确保消息再RabbitMQ重新启动后不会丢失。这对于关键的业务消息非常重要。
RabbitMQ和AMQP是什么关系
- AMQP:AMQP是一个协议规范,而不是一个具体的消息中间件。它是一个开放的消息产地协议,是一种应用层的标准协议,为面向消息的中间件设计。AMQP提供了一种同统一的消息服务,使得不同程序之间可以通过消息队列进行通信。SpringBoot框架默认就提供了对AMQP协议的支持。
- RabbitMQ:RabbitMQ是一种开源的消息中间件,是一个具体的软件产品。使用AMQP协议来实现消息传递的标准。并且其也支持其他消息传递协议,如STOMP和MQTT。RabbitMQ基于AMQP协议定义的消息格式和交互流程,实现了消息再生产者、交换机队列之间的传递和处理。
RabbitMQ的核心组件有哪些
- Broker:RabbitMQ服务器,负责接收和分发消息的应用。
- Virtual Host: 虚拟主机,是RabbitMQ中的逻辑容器,用于隔离不同环境或不同应用程序的信息流。每个虚拟主机都有自己的队列交换机等设置,可以理解为一个独立的RabbitMQ服务。
- Connection连接:管理和维护与RabbitMQ服务器的TCP连接,生产者、消费者通过这个连接和Broker建立物理网络连接。
- Channel通道:是在Connection内创建的轻量级通信通道,用于进行消息的传输和交互。应用程序通过Channel进行消息的发送和接收。通常一个Connection可以建立多个Channel。
- Exchange交换机:交换机是消息的中转站,负责接收来自生产者的消息,并将其路由到一个或多个队列中。RabbitMQ提供多种不同类型的交换机,每种不同类型的交换机都有不同的消息路由规则。
- Queue队列:队列是消息的存储位置。每个队列都有一个唯一的名称。消息从交换机路由到队列,然后等待消费者消费。
- Binding绑定关系:Binding是Exchange和Queue之间的关联规则,定义了消息如何从交换机路由到特定队列。
RabbitMQ的交换机的类型
- Direct Exchange:直连交换机,这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。只有当消息的路由键与队列绑定的路由键相同时,消息才会被路由到队列,是一种简单的路由策略,适用于点对点通信。如:当一个队列绑定到交换机要求路由键为“key”,则只会转发RoutingKey标记为“key”的消息,不会转发“key1”等等。是完全匹配、单播的模式。
//配置类importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectExchangeConfig{publicstaticfinalStringDIRECT_EXCHANGE_NAME="direct-exchange";publicstaticfinalStringQUEUE_A_NAME="queue-a";publicstaticfinalStringQUEUE_B_NAME="queue-b";publicstaticfinalStringROUTING_KEY_A="key-a";publicstaticfinalStringROUTING_KEY_B="key-b";@BeanDirectExchangedirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME);}@BeanQueuequeueA(){returnnewQueue(QUEUE_A_NAME);}@BeanQueuequeueB(){returnnewQueue(QUEUE_B_NAME);}@BeanBindingbindQueueAToDirect(Queue queueA,DirectExchange directExchange){returnBindingBuilder.bind(queueA).to(directExchange).with(ROUTING_KEY_A);}@BeanBindingbindQueueBToDirect(Queue queueB,DirectExchange directExchange){returnBindingBuilder.bind(queueB).to(directExchange).with(ROUTING_KEY_B);}}//生产者importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassDirectProducer{privatefinalAmqpTemplate rabbitTemplate;@AutowiredpublicDirectProducer(AmqpTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}publicvoidsend(String routingKey,String message){ rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE_NAME, routingKey, message);}}//消费者importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassDirectConsumerA{@RabbitListener(queues =DirectExchangeConfig.QUEUE_A_NAME)publicvoidreceiveA(String in){System.out.println("Queue A received: '"+ in +"'");}}@ComponentpublicclassDirectConsumerB{@RabbitListener(queues =DirectExchangeConfig.QUEUE_B_NAME)publicvoidreceiveB(String in){System.out.println("Queue B received: '"+ in +"'");}}
- Topic Exchange:主题交换机,这种交换机通过通配符匹配,根据消息的路由键与队列绑定时指定的路由模式匹配程度(#表示一个或多个词,表示一个词。),将消息路由到一个或者是多个队列。多用于发布/订阅模式和复杂的消息路由需求。1. Topic中,将routingKey通过“.”来分为多个部分2. “”:代表一个部分3. “#”:代表0个或多个部分(如果绑定的路由键为‘#’时,则接受所有消息,因为路由键所有都匹配)然后发送一条信息,routingkey为”key1.key2.key3.key4",那么根据"."将这个路由键分为了4个部分,此条路由键,将会匹配:1. key1.key2.key3.:成功匹配,因为可以代表一个部2. key1.#:成功匹配,因为#可以代表0或多个部分3. key2..key4: 成功匹配,因为第一和第三部分分别为key1和key3,且为4个部分,刚好匹配4.#.key3.key4:成功匹配,#可以代表多个部分,正好匹配中了我们的key1和key2如果发送消息routingkey为"key1",那么将只能匹配中key1.#,#可以代表0个部分
//配置类@ConfigurationpublicclassTopicExchangeConfig{publicstaticfinalStringTOPIC_EXCHANGE_NAME="topic-exchange";publicstaticfinalStringQUEUE_A_NAME="queue-a";publicstaticfinalStringQUEUE_B_NAME="queue-b";@BeanTopicExchangetopicExchange(){returnnewTopicExchange(TOPIC_EXCHANGE_NAME);}@BeanQueuequeueA(){returnnewQueue(QUEUE_A_NAME);}@BeanQueuequeueB(){returnnewQueue(QUEUE_B_NAME);}@BeanBindingbindQueueAToTopic(Queue queueA,TopicExchange topicExchange){returnBindingBuilder.bind(queueA).to(topicExchange).with("key.*");}@BeanBindingbindQueueBToTopic(Queue queueB,TopicExchange topicExchange){returnBindingBuilder.bind(queueB).to(topicExchange).with("*.key");}}//队列queue-a绑定到主题交换机上的key.*模式,这意味着任何以key.开头的routing key都会被路由到queue-a。队列queue-b绑定到*.key模式,这意味着任何以.key结尾的routing key都会被路由到queue-b//生产者importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassTopicProducer{privatefinalAmqpTemplate rabbitTemplate;@AutowiredpublicTopicProducer(AmqpTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}publicvoidsend(String routingKey,String message){ rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE_NAME, routingKey, message);}}//当 routingKey="hello.key" 则会路由到queueB//消费者@ComponentpublicclassTopicConsumerA{@RabbitListener(queues =TopicExchangeConfig.QUEUE_A_NAME)publicvoidreceiveA(String in){System.out.println("Queue A received: '"+ in +"'");}}@ComponentpublicclassTopicConsumerB{@RabbitListener(queues =TopicExchangeConfig.QUEUE_B_NAME)publicvoidreceiveB(String in){System.out.println("Queue B received: '"+ in +"'");}}
- Headers Exchange:头交换机,不处理路由键, 而是根据发送的消息内容中的headers属性进行匹配 。只有当消息的标头和绑定规则完全匹配时,消息才会被路由到队列。适用于需要复杂消息匹配的场景。消费方指定的headers中必须包含一个“x-match“的键。 键"x-match"的值有2个1. x-match=all :表示所有的键值对都匹配才能接受到消息2. x-match =any:表示只要有键值对匹配就能接受到消息
//配置类@BeanHeadersExchangeheadersExchange(){returnnewHeadersExchange(HEADER_EXCHANGE_NAME);}@BeanQueuequeue(){returnnewQueue(QUEUE_NAME);}@BeanBindingbinding(Queue queue,HeadersExchange headersExchange){returnBindingBuilder.bind(queue).to(headersExchange).whereAll(newString[]{"x-match","id"}).matches("true","123");}//whereAll:必须匹配所有的键值对 whereAny:至少有一个匹配的键值对//生产者importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassHeaderProducer{privatefinalRabbitTemplate rabbitTemplate;@AutowiredpublicHeaderProducer(RabbitTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}publicvoidsend(String message){MessageProperties props =newMessageProperties(); props.setHeader("x-match","true"); props.setHeader("id","123");Message msg =newMessage(message.getBytes(), props); rabbitTemplate.convertAndSend(HeaderExchangeConfig.HEADER_EXCHANGE_NAME,"", msg);}}//消费者importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassHeaderConsumer{@RabbitListener(queues =HeaderExchangeConfig.QUEUE_NAME)publicvoidreceive(String in){System.out.println("Received: '"+ in +"'");}}
- Fanout Exchange:扇形交换机,采用广播的方式,根据绑定的交换机路由到与之对应的所有队列。用于发布/订阅模式,其中一个消息被广播给所有订阅者。
//配置类 publicstaticfinalStringFANOUT_EXCHANGE_NAME="fanout-exchange";publicstaticfinalStringQUEUE_A_NAME="queue-a";publicstaticfinalStringQUEUE_B_NAME="queue-b";@BeanFanoutExchangefanoutExchange(){returnnewFanoutExchange(FANOUT_EXCHANGE_NAME);}@BeanQueuequeueA(){returnnewQueue(QUEUE_A_NAME);}@BeanQueuequeueB(){returnnewQueue(QUEUE_B_NAME);}@BeanBindingbindQueueAToFanout(Queue queueA,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queueA).to(fanoutExchange);}@BeanBindingbindQueueBToFanout(Queue queueB,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queueB).to(fanoutExchange);}//生产者importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassFanoutProducer{privatefinalAmqpTemplate rabbitTemplate;@AutowiredpublicFanoutProducer(AmqpTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}publicvoidsend(String message){ rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE_NAME,"", message);}}//消费者importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassFanoutConsumerA{@RabbitListener(queues =FanoutExchangeConfig.QUEUE_A_NAME)publicvoidreceiveA(String in){System.out.println("Queue A received: '"+ in +"'");}}@ComponentpublicclassFanoutConsumerB{@RabbitListener(queues =FanoutExchangeConfig.QUEUE_B_NAME)publicvoidreceiveB(String in){System.out.println("Queue B received: '"+ in +"'");}}
##### RabbitMq如何保证消息不丢失?- 消息丢失得原因分析如下图:
从图可知 生产者的消息到达消费者会经过两次网络传输,并且会在RabbitMq服务器中进行路由。
所以一般有以下三种丢失的场景:
- 生产者消息发送到RabbitMq服务器过程中出现消息丢失。可能由于网络波动,或者是服务器宕机。
- RabbitMQ 服务器消息持久化出现消息丢失。 消息发送到 RabbitMQ 之后,未能及时存储完成持久化,RabbitMQ 服务器出现宕机重启,消息出现丢失。
- 消费者拉取消息过程以及拿到消息后出现消息丢失。消费者从 RabbitMQ 服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。
针对上述三种消息丢失场景,RabbitMQ 提供了相应的解决方案,confirm 消息确认机制(生产者),消息持久化机制(RabbitMQ 服务),ACK事务机制(消费者)
confirm消息确认机制(生产者)
Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时,它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。消息正确投递到 queue 时,会返回 ack。
消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失。
application.yml中需要配置publisher-confirm-type
- **
none
**: - 这是默认设置,意味着不启用发布确认。在这种模式下,消息发送后,无论是否到达 RabbitMQ 服务器,都不会有确认信息返回。这通常用于性能优化场景,因为没有确认机制会带来更高的消息吞吐量。 - **
simple
**: - 在这种模式下,RabbitMQ 会为每个发送的消息提供一个确认。当消息被 RabbitMQ 接收并持久化后,会发送一个确认给生产者。但是,simple
模式不会关联每个消息的确认信息,也就是说,你无法知道具体是哪一个消息被确认了。这对于不需要精细控制的消息发布场景来说足够了。 - **
correlated
**: - 这是最强大的确认模式。在correlated
模式下,每个消息都有一个唯一的标识符(CorrelationData
),这样就可以跟踪每个消息的确认状态。当消息被确认时,RabbitMQ 会发送回一个包含该消息标识符的确认信息。这样,你就可以确切地知道哪些消息被成功接收,哪些可能失败了。这对于需要高可靠性和精确消息追踪的应用场景非常有用。
示例代码如下:
配置类代码如下:
@ConfigurationpublicclassRabbitConfig{@BeanpublicAmqpTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){System.out.println("Message with id: "+ correlationData.getId()+" successfully sent");}else{System.out.println("Message with id: "+ correlationData.getId()+" failed to send due to: "+ cause);}});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{System.out.println("Returned message: "+newString(message.getBody()));System.out.println("Exchange: "+ exchange);System.out.println("Routing key: "+ routingKey);System.out.println("Reply code: "+ replyCode);System.out.println("Reply text: "+ replyText);});return rabbitTemplate;}@BeanpublicMessageConverterproducerJackson2MessageConverter(){returnnewJackson2JsonMessageConverter();}}
生产者代码如下:
importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;@ServicepublicclassMessageProducerService{privatefinalRabbitTemplate rabbitTemplate;@AutowiredpublicMessageProducerService(RabbitTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}@TransactionalpublicvoidsendMessage(String message){try{// 发送消息
rabbitTemplate.convertAndSend("exchange-name","routing-key", message);// 业务逻辑doBusinessLogic();}catch(Exception e){// 处理异常thrownewRuntimeException(e);}}privatevoiddoBusinessLogic(){// 业务逻辑代码}}
RabbitTemplate
在Spring AMQP中提供了多种机制来确认消息是否成功传递给RabbitMQ以及消息是否成功路由到预期的队列。
setConfirmCallback
和
setReturnCallback
是其中的两种重要机制,它们分别在不同的情况下被触发,用于处理消息的确认和返回。
setConfirmCallback
setConfirmCallback
方法用于设置一个回调,当消息被投递到RabbitMQ的Broker后,Broker会发送一个确认信号回给生产者,表明消息已经被接收。这个确认信号包含了消息的唯一标识符,称为
CorrelationData
,通常是一个UUID或其他可识别的标识符,以及一个布尔值
ack
,表明消息是否被成功接收。
触发时机
- 消息到达Broker:当消息到达Broker并被放置在队列中或在尝试放置消息时失败,Broker会发送确认信号。
- 消息持久化:如果消息被设置为持久化的,Broker在将消息写入磁盘后才发送确认信号。
使用场景
- 确认消息投递:在高可用性或关键业务流程中,需要确认每条消息都被Broker接收。
- 性能优化:在高吞吐量的应用中,可以使用发布确认的批量确认模式以减少网络开销。
setReturnCallback
setReturnCallback
方法用于设置一个回调,当消息无法被正确路由到任何一个队列时,Broker会将消息返回给生产者。这通常发生在以下情况:
- Routing Key不存在:如果使用Direct或Fanout交换机时,没有队列与指定的Routing Key绑定。
- Routing Key不匹配:如果使用Topic或Header交换机时,消息的Routing Key与队列绑定的模式不匹配。
- Mandatory标志位:当发送消息时设置了
mandatory
标志位为true
,并且消息不能被正确路由到队列时,Broker会返回消息。 - Immediate标志位:当发送消息时设置了
immediate
标志位为true
,并且消息不能立即被消费时,Broker会返回消息。
触发时机
- 消息未被路由:当消息因为上述原因未能被正确路由到队列时,Broker会返回消息。
使用场景
- 错误检测:在开发阶段或生产环境中,可以帮助检测和调试Routing Key的错误或配置问题。
- 消息重试或记录:对于未能路由的消息,可以设计逻辑来重新发送消息或记录日志以供后续分析。
综合使用
通常,
setConfirmCallback
和
setReturnCallback
会被一起使用,以实现更全面的消息确认和错误处理策略。这确保了消息不仅被Broker接收,而且也被正确地路由到预期的队列中。如果消息未能正确路由,可以通过
setReturnCallback
回调进行处理,如重新发送消息、记录错误或采取其他补救措施。
注意事项
- 当使用
setConfirmCallback
时,需要考虑网络延迟和Broker的确认延迟,这可能影响确认信号的及时性。 - 当使用
setReturnCallback
时,需要处理好返回的消息,避免无限循环发送或不当的处理导致的问题。
消息持久化机制(RabbitMQ服务)
生产者确认可以保证消息投递到RabbitMQ的队列之中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能会导致消息丢失。
所以想要确保消息在RabbitMQ中安全保存,必须开启消息持久化机制,这种机制可以分为三种。
- 交换机持久化: 交换机持久化意味着交换机的定义在 RabbitMQ 重启后仍然存在,不会丢失 。 即使 RabbitMQ 重启,所有的绑定关系、交换机类型及其属性都会保持不变,确保消息路由规则的连续性 。
- 队列持久化: 队列持久化意味着队列的定义在 RabbitMQ 重启后仍然存在,同样不会丢失, 如果在 RabbitMQ 停止前队列中有未被消费的消息,这些消息在 RabbitMQ 重启后仍然会存在于队列中,等待被消费 。
- 消息持久化: 消息持久化意味着消息在存储到 RabbitMQ 时会被写入磁盘,而不仅仅是内存中 , 即使 RabbitMQ 服务突然关闭,持久化消息不会丢失,确保了消息的完整性。
以下示例会将三种持久化一同配置:
//Config配置类@ConfigurationpublicclassRabbitConfig{publicstaticfinalStringDURABLE_QUEUE_NAME="durable.queue";publicstaticfinalStringDURABLE_EXCHANGE_NAME="durable.exchange";publicstaticfinalStringROUTING_KEY="durable.routing.key";@BeanQueuedurableQueue(){//通过QueueBuilder.durable创建出的队列就是持久化队列returnQueueBuilder.durable(DURABLE_QUEUE_NAME).build();}@BeanDirectExchangedurableExchange(){//三个参数DURABLE_EXCHANGE_NAME表示交换机名称//true 代表是持久化 默认为false//false 当没有queue与其绑定时不自动删除returnnewDirectExchange(DURABLE_EXCHANGE_NAME,true,false);}@BeanBindingbinding(Queue durableQueue,DirectExchange durableExchange){returnBindingBuilder.bind(durableQueue).to(durableExchange).with(ROUTING_KEY);}}
//生产者代码packagecom.example.demo.demos.service;importcom.example.demo.demos.config.RabbitConfig;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageDeliveryMode;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.UUID;@ServicepublicclassRabbitMQProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message){MessageProperties messageProperties =newMessageProperties();//利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode://1:非持久化//2:持久化//如下是设置delivery-mode为2
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message persistentMessage =newMessage(message.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.DURABLE_EXCHANGE_NAME,RabbitConfig.ROUTING_KEY, persistentMessage);}}
上述代码配置了交换机持久化、队列持久化以及消息的持久化。
ACK事务机制(消费者)
Ack事务机制用于确保消息被正确的消费。当消息成功被消费者处理之后,消费者发送确认ACK给RabbitMQ,告知消息可以被移除。整个过程是自动处理的,也可以关闭进行手动发送ACK。
如下是代码示例:
- yml配置文件中acknowledge-mode为manual 设置为手动确认
spring:rabbitmq:host: localhost
port:5672username: guest
password: guest
listener:simple:acknowledge-mode: manual
- Config配置类配置交换机以及队列
packagecom.example.demo.demos.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{publicstaticfinalStringQUEUE_NAME="myQueue";publicstaticfinalStringEXCHANGE_NAME="mynameExchange";@BeanQueuemyQueue(){returnnewQueue(QUEUE_NAME,true);// durable queue}@BeanDirectExchangemyExchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanBindingbinding(Queue queue,DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);}}
- 生产者类
packagecom.example.demo.demos.product;importcom.example.demo.demos.config.RabbitMQConfig;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;@ServicepublicclassMessageProducer{privatefinalRabbitTemplate rabbitTemplate;@AutowiredpublicMessageProducer(RabbitTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;}publicvoidsendMessage(String message){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.QUEUE_NAME, message);}}
- 消费者类
packagecom.example.demo.demos.consumer;importcom.example.demo.demos.config.RabbitMQConfig;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;importjava.io.IOException;@ComponentpublicclassMessageConsumer{@RabbitListener(queues =RabbitMQConfig.QUEUE_NAME)publicvoidreceiveMessage(Message message,Channel channel)throwsException{try{byte[] body = message.getBody();String messageContent =newString(body,"UTF-8");System.out.println("Received message: "+ messageContent);// 手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//可以通过主动抛异常验证手动ACK模式//throw new Exception();
channel.basicAck(deliveryTag,false);}catch(Exception e){// 如果发生异常,可以选择重新排队或拒绝消息long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag,false,true);// requeue=true}}}
RabbitMQ的消息堆积
消息堆积原因
- 消费者处理消息的速度太慢:消费者处理速度慢于生产者生产消息的速度,就会导致消息在消息队列中堆积。
- 队列容量太小:队列容量太小也会容易导致消息在队列中堆积。
- 网络故障:如果出现网络故障可能会导致消息丢失,并且在队列中堆积。
- 消费者故障:如果消费者发生故障,消息也会在队列中堆积
- 队列配置不当:错误的配置队列(如错误的消息确认模式或队列长度限制)可能导致处理速度不佳。
- 消息大小:大型的消息处理时间过长,可能导致处理速度降低。
- 业务逻辑复杂或耗时:如果消费者在处理消费的时候需要执行复杂的业务逻辑或者是耗时操作,处理速度也会收到影响。
解决方案:
- 消费者处理消息的速度太慢- 增加消费者数量:通过水平扩展,增加消费者的数量来提高消费者的处理速度。(有个东西需要特别注意:“预取计数” 在水平扩展消费者的同时,需要根据实际情况配置这个数,不然有可能会发生RabbitMQ只往一台消费者服务器发送消息,因为RabbitMQ默认的预取计数为0,也就是 这意味着没有预取计数限制,服务器可以自由地发送尽可能多的消息给消费者,直到消费者的内存限制被达到 )
#可以在yml配置文件中如下配置spring:rabbitmq:listener:simple:prefetch:10
- 优化消费者性能: 审查和优化消费者处理消息的业务逻辑。寻找可以改进的地方,如减少数据库查询次数、使用更高效的算法或数据结构、缓存结果等 。- 异步处理消息:结合Spring的@Async注解,可以使得消费者接收的消息在另外的线程中异步处理,也就是消费者只接收,不处理,处理则交给@Async注解配置的线程池中的线程处理。@Configuration@EnableAsyncpublicclassAsyncConfig{@BeanpublicTaskExecutortaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(20); executor.initialize();return executor;}}@ComponentpublicclassMessageConsumer8080{@Async("taskExecutor")@RabbitListener(queues ="durable.queue")publicvoidreceiveMessage(Message message,Channel channel)throwsException{try{byte[] body = message.getBody();String messageContent =newString(body,"UTF-8");System.out.println("8080 Received message: "+ messageContent);Thread.sleep(1000);long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,false);}catch(Exception e){long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicNack(deliveryTag,false,true);}}}
- 队列的容量大小:- 增加队列容量:调整队列设置,以允许更多的消息存储。
- 网络故障:- 监控和告警:通过监控网络状态并设置警告,确保在网络故障的时候快速发现并解决问题。- 持久化和高可用:确保消息和队列的持久化以避免消息丢失,并且可使用镜像队列,提高可用性。
- 消费者故障:- 使用死信队列:将无法处理的消息转移到死信队列上,防止阻塞主队列。- 容错机制:实现消费者的自动重启和错误处理逻辑。
版权归原作者 末、 所有, 如有侵权,请联系我们删除。