springboot项目使用RabbitMQ
1.引入依赖,配置yml文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring:rabbitmq:host: 192.168.19.3
port:5672username: admin
password: admin
virtualHost: my_vhost #vhost名称
2.添加配置类(direct交换机为例)
生产者(一个交换机绑定两个队列)
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importstaticcom.rabbit.constant.RabbitConstant.*;@ConfigurationpublicclassRabbitMQConfig{privatefinalStringDIRECT_EXCHANGE_NAME="topicExchange";//交换机名称privatefinalStringDIRECT_ROUTE_KEY="topicRoute";//路由键privatefinalStringDIRECT_QUEUE_NAME="topicQueue";//队列名称/**
* 交换机
* @return direct交换机
* 名称,是否持久化,无队列自动删除
*/@BeanpublicDirectExchangegetDirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME,true,false);}/**
* 队列
* @return 队列
* 名称,是否持久化,是否独占,是否自动删除
*/@BeanpublicQueuedirectQueue(){returnnewQueue(DIRECT_QUEUE_NAME,true,false,false);}/**
* 绑定交换机和队列
* @param exchange 交换机
* @param queue 队列
* @return
*/@BeanpublicBindingbindingExchangeWithQueue(DirectExchange exchange,@Qualifier("directQueue")Queue queue){returnBindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTE_KEY);}}
消费者
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{privatefinalStringDIRECT_EXCHANGE_NAME="topicExchange";//交换机名称privatefinalStringDIRECT_ROUTE_KEY="topicRoute";//路由键privatefinalStringDIRECT_QUEUE_NAME="topicQueue";//队列名称/**
* 交换机
* @return direct交换机
* 名称,是否持久化,无队列自动删除
*/@BeanpublicDirectExchangegetDirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME,true,false);}/**
* 队列
* @return 队列
* 名称,是否持久化,是否独占,是否自动删除
*/@BeanpublicQueuedirectQueue(){returnnewQueue(DIRECT_QUEUE_NAME,true,false,false);}/**
* 绑定交换机和队列
* @param exchange 交换机
* @param queue 队列
* @return
*/@BeanpublicBindingbindingExchangeWithQueue(DirectExchange exchange,@Qualifier("directQueue")Queue queue){returnBindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTE_KEY);}}
3.消息发送
Logger logger =LoggerFactory.getLogger(ProducerServiceImpl.class);@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicStringsendMessage(){String message =UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message);//routeKey相等
logger.info(DIRECT_ROUTE_KEY+":{}",message);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"test",message);//routeKey不相等
logger.info(DIRECT_ROUTE_KEY+"test"+":{}",message);return"消息发送成功!"+ message;}
消息发送后控制台可以看到消息(发送两条但队列中只有一条),因为第二条消息routeKey不匹配,无法路由到队列。
4.消息消费
4.1 自动ACK
importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.concurrent.TimeUnit;@ComponentpublicclassTopicConsumer{@RabbitListener(queues ="directQueue")publicvoidspendMessage(String msg,Channel channel,Message message){String bodyMessage =newString(message.getBody());System.out.println("bodyMessage = "+ bodyMessage);System.out.println("消费消息:"+ msg);}}
运行结果
控制台效果
4.2 手动ACK
添加yml配置
spring:rabbitmq:listener:simple:acknowledge-mode: manual #关闭自动ack
关闭自动ack后若不手动ack,消息消费后为Unacked状态
调用basicAck(long deliveryTag,boolean multiple)方法
参数名称作用deliveryTag消息的唯一标识。每条消息都有自己的ID号,用于标识该消息在channel中的顺序。当消费者接收到消息后,需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。multiple是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。
@RabbitListener(queues ="directQueue")publicvoidspendMessage(String msg,Channel channel,Message message)throwsIOException{String bodyMessage =newString(message.getBody());System.out.println("bodyMessage = "+ bodyMessage);System.out.println("消费消息:"+ msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
重启消费者服务后发现该条消息被再次接收,手动应答后队列中无消息。
5.消息可靠性
5.1 publisher---->exchange(发布者–>交换机) Confirm机制
Confirm机制用于向生产者确认消息是否成功送达交换机。当生产者向RabbitMQ发送消息时,RabbitMQ会返回一个唯一的交付标签(Delivery Tag),生产者可以通过这个标签来查询消息的投递状态。
要使用Confirm机制,需要在连接RabbitMQ时启用Confirm模式,并在生产者端使用相应的回调函数来接收确认消息。当消息成功送达队列时,RabbitMQ会向生产者发送一个确认消息;如果消息未能送达队列,则不会发送确认消息。
通过使用Confirm机制,生产者可以确保消息已被成功处理,并在必要时采取相应的措施,例如重试或记录错误。
启用confirm模式
spring:rabbitmq:#开启confirmpublisher-confirm:truepublisher-confirm-type: correlated
publisher-confirm-type 存在三种值
值说明none禁用发布确认模式,是默认值correlated发布消息成功到交换机触发回调方法simple1.发布消息成功到交换机触发回调方法。2.发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
回调配置
importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassConfirmBackConfigimplementsRabbitTemplate.ConfirmCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;//在类加载的时候会执行这个方法@PostConstructpublicvoidinitMethod(){
rabbitTemplate.setConfirmCallback(this);}/**
* confirm机制回调 携带消息发送结果和失败原因
* @param correlationData 相关数据
* @param isSuccess 是否发送成功
* @param cause 失败原因
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean isSuccess,String cause){System.out.println("confirm回调成功!");if(isSuccess){System.out.println("correlationData = "+ correlationData);}else{System.out.println("消息发送失败:"+ cause);}}}
修改消息发送代码测试
publicStringsendMessage(){String message =UUID.randomUUID().toString();CorrelationData correlationData1 =newCorrelationData();
correlationData1.setId(message);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message,correlationData1);//routeKey相等
logger.info(DIRECT_ROUTE_KEY+"发送:{}",message);
message =UUID.randomUUID().toString();CorrelationData correlationData2 =newCorrelationData();
correlationData2.setId(message);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"xxx",message,correlationData2);//routeKey不相等
logger.info(DIRECT_ROUTE_KEY+"test"+"发送:{}",message);return"消息发送成功!";}
可见两条消息都成功发送到交换机
5.2 exchange—>queue(交换机—>队列) Return机制
Return机制用于处理无法路由的消息。当生产者向RabbitMQ发送一条消息时,如果消息无法被正确路由(例如由于队列不存在或交换机配置错误),RabbitMQ会将该消息返回给生产者,并附带一个错误信息。
使用Return机制,需要在生产者端配置一个回调函数来处理返回的消息。根据消息进行重试、记录错误等操作。
启用Return机制
spring:rabbitmq:#开启returnpublisher-returns:true
回调配置
importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassReturnBackConfigimplementsRabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;//在类加载的时候会执行这个方法@PostConstructpublicvoidinitMethod(){
rabbitTemplate.setReturnsCallback(this);}/**
* return机制回调 消息分发到队列失败时才会回调
* @param returnedMessage 回调信息
*/@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){System.out.println("returnedMessage = "+ returnedMessage);}}
publicStringsendMessage(){String message =UUID.randomUUID().toString();CorrelationData correlationData1 =newCorrelationData();
correlationData1.setId(message);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message,msg ->{
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData1);//routeKey相等
logger.info(DIRECT_ROUTE_KEY+"发送:{}",message);
message =UUID.randomUUID().toString();CorrelationData correlationData2 =newCorrelationData();
correlationData2.setId(message);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"xxx",message,msg ->{
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData2);//routeKey不相等
logger.info(DIRECT_ROUTE_KEY+"test"+"发送:{}",message);return"消息发送成功!";}
测试结果:错误的routeKey发送消息时回调
returnedMessage =ReturnedMessage[message=(Body:'be4937b0-f570-4047-a0a3-8b8949eb7221' MessageProperties[headers={spring_returned_message_correlation=be4937b0-f570-4047-a0a3-8b8949eb7221}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=directExchange, routingKey=directRoutexxx]
5.3 queue—>consumer(队列—>消费者) 手动ACK
参数意义deliveryTag消息标识multiple是否一次处理多条消息requeue消息是否重新入队
确认消息(确认后消息丢弃)
public void basicAck(long deliveryTag, boolean multiple);
不确认消息
public void basicNack(long deliveryTag, boolean multiple, boolean requeue);
拒绝消息(只能处理单条消息)
public void basicReject(long deliveryTag, boolean requeue);
版权归原作者 weixin_53231767 所有, 如有侵权,请联系我们删除。