0


springboot项目使用RabbitMQ

springboot项目使用RabbitMQ

1.引入依赖,配置yml文件

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring:rabbitmq:host: 192.168.19.3
    port:5672username: admin
    password: admin
    virtualHost: my_vhost #vhost名称

2.添加配置类(direct交换机为例)

生产者(一个交换机绑定两个队列)
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importstaticcom.rabbit.constant.RabbitConstant.*;@ConfigurationpublicclassRabbitMQConfig{privatefinalStringDIRECT_EXCHANGE_NAME="topicExchange";//交换机名称privatefinalStringDIRECT_ROUTE_KEY="topicRoute";//路由键privatefinalStringDIRECT_QUEUE_NAME="topicQueue";//队列名称/**
     * 交换机
     * @return direct交换机
     * 名称,是否持久化,无队列自动删除
     */@BeanpublicDirectExchangegetDirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME,true,false);}/**
     * 队列
     * @return 队列
     * 名称,是否持久化,是否独占,是否自动删除
     */@BeanpublicQueuedirectQueue(){returnnewQueue(DIRECT_QUEUE_NAME,true,false,false);}/**
     * 绑定交换机和队列
     * @param exchange 交换机
     * @param queue 队列
     * @return
     */@BeanpublicBindingbindingExchangeWithQueue(DirectExchange exchange,@Qualifier("directQueue")Queue queue){returnBindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTE_KEY);}}
消费者
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{privatefinalStringDIRECT_EXCHANGE_NAME="topicExchange";//交换机名称privatefinalStringDIRECT_ROUTE_KEY="topicRoute";//路由键privatefinalStringDIRECT_QUEUE_NAME="topicQueue";//队列名称/**
     * 交换机
     * @return direct交换机
     * 名称,是否持久化,无队列自动删除
     */@BeanpublicDirectExchangegetDirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME,true,false);}/**
     * 队列
     * @return 队列
     * 名称,是否持久化,是否独占,是否自动删除
     */@BeanpublicQueuedirectQueue(){returnnewQueue(DIRECT_QUEUE_NAME,true,false,false);}/**
     * 绑定交换机和队列
     * @param exchange 交换机
     * @param queue 队列
     * @return
     */@BeanpublicBindingbindingExchangeWithQueue(DirectExchange exchange,@Qualifier("directQueue")Queue queue){returnBindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTE_KEY);}}

3.消息发送

Logger logger =LoggerFactory.getLogger(ProducerServiceImpl.class);@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicStringsendMessage(){String message =UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message);//routeKey相等
        logger.info(DIRECT_ROUTE_KEY+":{}",message);
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"test",message);//routeKey不相等
        logger.info(DIRECT_ROUTE_KEY+"test"+":{}",message);return"消息发送成功!"+ message;}

在这里插入图片描述

消息发送后控制台可以看到消息(发送两条但队列中只有一条),因为第二条消息routeKey不匹配,无法路由到队列。

在这里插入图片描述

4.消息消费

4.1 自动ACK
importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.concurrent.TimeUnit;@ComponentpublicclassTopicConsumer{@RabbitListener(queues ="directQueue")publicvoidspendMessage(String msg,Channel channel,Message message){String bodyMessage =newString(message.getBody());System.out.println("bodyMessage = "+ bodyMessage);System.out.println("消费消息:"+ msg);}}

运行结果

在这里插入图片描述

控制台效果

在这里插入图片描述

4.2 手动ACK

添加yml配置

spring:rabbitmq:listener:simple:acknowledge-mode: manual #关闭自动ack

关闭自动ack后若不手动ack,消息消费后为Unacked状态

在这里插入图片描述

调用basicAck(long deliveryTag,boolean multiple)方法
参数名称作用deliveryTag消息的唯一标识。每条消息都有自己的ID号,用于标识该消息在channel中的顺序。当消费者接收到消息后,需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。multiple是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。

@RabbitListener(queues ="directQueue")publicvoidspendMessage(String msg,Channel channel,Message message)throwsIOException{String bodyMessage =newString(message.getBody());System.out.println("bodyMessage = "+ bodyMessage);System.out.println("消费消息:"+ msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

重启消费者服务后发现该条消息被再次接收,手动应答后队列中无消息。

在这里插入图片描述

5.消息可靠性

5.1 publisher---->exchange(发布者–>交换机) Confirm机制

Confirm机制用于向生产者确认消息是否成功送达交换机。当生产者向RabbitMQ发送消息时,RabbitMQ会返回一个唯一的交付标签(Delivery Tag),生产者可以通过这个标签来查询消息的投递状态。

要使用Confirm机制,需要在连接RabbitMQ时启用Confirm模式,并在生产者端使用相应的回调函数来接收确认消息。当消息成功送达队列时,RabbitMQ会向生产者发送一个确认消息;如果消息未能送达队列,则不会发送确认消息。
通过使用Confirm机制,生产者可以确保消息已被成功处理,并在必要时采取相应的措施,例如重试或记录错误。

启用confirm模式

spring:rabbitmq:#开启confirmpublisher-confirm:truepublisher-confirm-type: correlated

publisher-confirm-type 存在三种值
值说明none禁用发布确认模式,是默认值correlated发布消息成功到交换机触发回调方法simple1.发布消息成功到交换机触发回调方法。2.发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
回调配置

importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassConfirmBackConfigimplementsRabbitTemplate.ConfirmCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;//在类加载的时候会执行这个方法@PostConstructpublicvoidinitMethod(){
        rabbitTemplate.setConfirmCallback(this);}/**
     * confirm机制回调 携带消息发送结果和失败原因
     * @param correlationData 相关数据
     * @param isSuccess 是否发送成功
     * @param cause 失败原因
     */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean isSuccess,String cause){System.out.println("confirm回调成功!");if(isSuccess){System.out.println("correlationData = "+ correlationData);}else{System.out.println("消息发送失败:"+ cause);}}}

修改消息发送代码测试

publicStringsendMessage(){String message =UUID.randomUUID().toString();CorrelationData correlationData1 =newCorrelationData();
        correlationData1.setId(message);
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message,correlationData1);//routeKey相等
        logger.info(DIRECT_ROUTE_KEY+"发送:{}",message);

        message =UUID.randomUUID().toString();CorrelationData correlationData2 =newCorrelationData();
        correlationData2.setId(message);
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"xxx",message,correlationData2);//routeKey不相等
        logger.info(DIRECT_ROUTE_KEY+"test"+"发送:{}",message);return"消息发送成功!";}

在这里插入图片描述

可见两条消息都成功发送到交换机

5.2 exchange—>queue(交换机—>队列) Return机制

Return机制用于处理无法路由的消息。当生产者向RabbitMQ发送一条消息时,如果消息无法被正确路由(例如由于队列不存在或交换机配置错误),RabbitMQ会将该消息返回给生产者,并附带一个错误信息。

使用Return机制,需要在生产者端配置一个回调函数来处理返回的消息。根据消息进行重试、记录错误等操作。

启用Return机制

spring:rabbitmq:#开启returnpublisher-returns:true

回调配置

importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassReturnBackConfigimplementsRabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;//在类加载的时候会执行这个方法@PostConstructpublicvoidinitMethod(){
        rabbitTemplate.setReturnsCallback(this);}/**
     * return机制回调 消息分发到队列失败时才会回调
     * @param returnedMessage 回调信息
     */@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){System.out.println("returnedMessage = "+ returnedMessage);}}
publicStringsendMessage(){String message =UUID.randomUUID().toString();CorrelationData correlationData1 =newCorrelationData();
        correlationData1.setId(message);
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY,message,msg ->{
            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData1);//routeKey相等
        logger.info(DIRECT_ROUTE_KEY+"发送:{}",message);

        message =UUID.randomUUID().toString();CorrelationData correlationData2 =newCorrelationData();
        correlationData2.setId(message);
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_ROUTE_KEY+"xxx",message,msg ->{
            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData2);//routeKey不相等
        logger.info(DIRECT_ROUTE_KEY+"test"+"发送:{}",message);return"消息发送成功!";}

测试结果:错误的routeKey发送消息时回调

returnedMessage =ReturnedMessage[message=(Body:'be4937b0-f570-4047-a0a3-8b8949eb7221' MessageProperties[headers={spring_returned_message_correlation=be4937b0-f570-4047-a0a3-8b8949eb7221}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=directExchange, routingKey=directRoutexxx]
5.3 queue—>consumer(队列—>消费者) 手动ACK

参数意义deliveryTag消息标识multiple是否一次处理多条消息requeue消息是否重新入队
确认消息(确认后消息丢弃)
public void basicAck(long deliveryTag, boolean multiple);

不确认消息
public void basicNack(long deliveryTag, boolean multiple, boolean requeue);

拒绝消息(只能处理单条消息)
public void basicReject(long deliveryTag, boolean requeue);


本文转载自: https://blog.csdn.net/weixin_53231767/article/details/140122727
版权归原作者 weixin_53231767 所有, 如有侵权,请联系我们删除。

“springboot项目使用RabbitMQ”的评论:

还没有评论