0


SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息

文章目录

安装消息中间件

Windows安装ErLang

https://github.com/erlang/otp/releases/tag/OTP-25.0
在这里插入图片描述

在这里插入图片描述

Windows安装RabbitMq

https://www.rabbitmq.com/install-windows.html
在这里插入图片描述
在这里插入图片描述

安装RabbitMq UI界面

在这里插入图片描述
打开RabbitMQ Command Prompt 进入命令行

# 查看mq服务状态
rabbitmqctl.bat status

在这里插入图片描述

# 安装ui界面
rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

访问http://localhost:15672/
默认账号密码guest/guest
在这里插入图片描述

安装延时消息插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
在这里插入图片描述
将 ez文件拷贝到安装目录rabbitmq_server-3.10.2\plugins下

# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

SpringBoot整合

这里我直接用我先前建好的微服务
order-service作为消息发送者,storage-service作为消息接收者
在这里插入图片描述
在这里插入图片描述

消息发送端order-service

添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.yml

spring:rabbitmq:username: guest
    password: guest
    host: 127.0.0.1
    port:5672# 消息确认(ACK)publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)publisher-returns:true#确认消息已发送到队列(Queue)

RabbitMqConfig

packagetop.fate.config;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 11:41
 */@ConfigurationpublicclassRabbitMqConfig{privatestaticfinalLogger LOG =LogManager.getLogger();publicstaticfinalString DIRECT_QUEUE ="direct_queue";//Direct队列名称publicstaticfinalString DIRECT_EXCHANGE ="direct_exchange";//交换器名称publicstaticfinalString DIRECT_ROUTING_KEY ="direct_routing_key";//路由键publicstaticfinalString DELAY_QUEUE ="delay_queue";//延时队列名称publicstaticfinalString DELAY_EXCHANGE ="delay_exchange";//交换器名称publicstaticfinalString DELAY_ROUTING_KEY ="delay_routing_key";//路由键@AutowiredprivateCachingConnectionFactory connectionFactory;@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);//设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);//确认消息送到交换机(Exchange)回调
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                LOG.info("\n确认消息送到交换机(Exchange)结果:");
                LOG.info("相关数据:"+ correlationData);
                LOG.info("是否成功:"+ ack);
                LOG.info("错误原因:"+ cause);}});//确认消息送到队列(Queue)回调
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
                LOG.info("\n确认消息送到队列(Queue)结果:");
                LOG.info("发生消息:"+ returnedMessage.getMessage());
                LOG.info("回应码:"+ returnedMessage.getReplyCode());
                LOG.info("回应信息:"+ returnedMessage.getReplyText());
                LOG.info("交换机:"+ returnedMessage.getExchange());
                LOG.info("路由键:"+ returnedMessage.getRoutingKey());}});return rabbitTemplate;}/**
     * Json转换器
     */@BeanpublicJackson2JsonMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}/**
     * Direct交换器
     */@BeanpublicDirectExchangedirectExchange(){/**
         * 创建交换器,参数说明:
         * String name:交换器名称
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         */returnnewDirectExchange(DIRECT_EXCHANGE,true,false);}/**
     * 队列
     */@BeanpublicQueuedirectQueue(){/**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */returnnewQueue(DIRECT_QUEUE,true,false,false,null);}/**
     * 绑定
     */@BeanBindingdirectBinding(DirectExchange directExchange,Queue directQueue){//将队列和交换机绑定, 并设置用于匹配键:routingKey路由键returnBindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}/******************************延时队列******************************/@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
        args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false, args);}@BeanpublicQueuedelayQueue(){Queue queue =newQueue(DELAY_QUEUE,true);return queue;}@BeanpublicBindingdelaybinding(Queue delayQueue,CustomExchange delayExchange){returnBindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}

实体对象

packagetop.fate.entity;importlombok.Data;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:26
 */@DatapublicclassTestEntity{privateString username;privateString password;publicTestEntity(String username,String password){this.username = username;this.password = password;}publicTestEntity(){}}

生产者服务接口

packagetop.fate.service;importjava.util.Map;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:29
 */publicinterfaceProducerService{/**
     * 发送json格式数据
     *
     * @param o
     */voidsendTestJson(Object o);/**
     * 延时发送map格式数据
     *
     * @param map
     */voidsendDelayTestMap(Map map);}

生产者服务实现类

packagetop.fate.service.impl;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importtop.fate.config.RabbitMqConfig;importtop.fate.service.ProducerService;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.Map;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:30
 */@ServicepublicclassProducerServiceImplimplementsProducerService{privatestaticfinalLogger LOG =LogManager.getLogger();SimpleDateFormat formatter =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");@AutowiredprivateRabbitTemplate rabbitTemplate;/**
     * 发送json格式数据
     *
     * @param o
     */@OverridepublicvoidsendTestJson(Object o){

        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE,RabbitMqConfig.DIRECT_ROUTING_KEY, o);
        LOG.info("json格式的数据发送成功 发送时间为"+ formatter.format(newDate()));}/**
     * 延时发送map格式数据
     *
     * @param map
     */@OverridepublicvoidsendDelayTestMap(Map map){
        rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE,RabbitMqConfig.DELAY_ROUTING_KEY, map,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{

                message.getMessageProperties().setHeader("x-delay",5000);return message;}});
        LOG.info("map格式的数据发送成功 发送时间为"+ formatter.format(newDate()));}}

测试Controller

packagetop.fate.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importtop.fate.entity.TestEntity;importtop.fate.service.ProducerService;importjava.util.HashMap;importjava.util.Map;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:43
 */@RestController@RequestMapping(value ="producer")publicclassProducerController{@AutowiredprivateProducerService producerService;@GetMapping("sendObject")publicvoidsendObject(){
        producerService.sendTestJson(newTestEntity("user","123456"));}@GetMapping("sendMap")publicvoidsendMap(){Map map =newHashMap();
        map.put("user1",newTestEntity("user1","123"));
        map.put("user2",newTestEntity("user2","123"));
        map.put("user3",newTestEntity("user3","123"));
        producerService.sendDelayTestMap(map);}}

消息接收端storage-service

添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

RabbitMqConfig

packagetop.fate.config;importorg.springframework.amqp.core.AcknowledgeMode;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importtop.fate.service.impl.AckReceiver;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:46
 */@ConfigurationpublicclassRabbitMqConfig{publicstaticfinalString DIRECT_QUEUE ="direct_queue";//Direct队列名称publicstaticfinalString DELAY_QUEUE ="delay_queue";//延时队列名称/**
     * 消息接收确认处理类
     */@AutowiredprivateAckReceiver ackReceiver;@AutowiredprivateCachingConnectionFactory connectionFactory;/**
     * 客户端配置
     * 配置手动确认消息、消息接收确认
     */@BeanpublicSimpleMessageListenerContainersimpleMessageListenerContainer(){//消费者数量,默认10int DEFAULT_CONCURRENT =10;//每个消费者获取最大投递数量 默认50int DEFAULT_PREFETCH_COUNT =50;SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(DEFAULT_CONCURRENT);
        container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);// RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//添加队列,可添加多个队列
        container.addQueues(newQueue(DIRECT_QUEUE,true));
        container.addQueues(newQueue(DELAY_QUEUE,true));//设置消息处理类
        container.setMessageListener(ackReceiver);return container;}}

消息接收接口

packagetop.fate.service;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importjava.io.IOException;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:52
 */publicinterfaceConsumerReceiver{voidreceiverJson(Message message,Channel channel)throwsIOException;voidreceiverMap(Message message,Channel channel)throwsIOException;}

消息接收实现类

packagetop.fate.service.impl;importcom.alibaba.fastjson.JSON;importcom.rabbitmq.client.Channel;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.Message;importorg.springframework.stereotype.Service;importtop.fate.entity.TestEntity;importtop.fate.service.ConsumerReceiver;importjava.io.IOException;importjava.util.Map;importjava.util.Set;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:54
 */@ServicepublicclassConsumerReceiverImplimplementsConsumerReceiver{privatestaticfinalLogger LOG =LogManager.getLogger();@OverridepublicvoidreceiverJson(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//将JSON格式数据转换为实体对象TestEntity testEntity = JSON.parseObject(message.getBody(),TestEntity.class);

            LOG.info("接收者收到JSON格式消息:");System.out.println("账号:"+ testEntity.getUsername());System.out.println("密码:"+ testEntity.getPassword());/**
             * 确认消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             */
            channel.basicAck(deliveryTag,true);/**
             * 否定消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             *///channel.basicNack(deliveryTag, true, false);}catch(Exception e){/**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag,false);

            e.printStackTrace();}}@OverridepublicvoidreceiverMap(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//将JSON格式数据转换为Map对象Map map = JSON.parseObject(message.getBody(),Map.class);

            LOG.info("接收者收到Map格式消息:");

            LOG.info(map.get("user1"));
            LOG.info(map.get("user2"));
            LOG.info(map.get("user3"));//确认消息
            channel.basicAck(deliveryTag,true);//否定消息//channel.basicNack(deliveryTag, true, false);}catch(Exception e){//拒绝消息
            channel.basicReject(deliveryTag,false);

            e.printStackTrace();}}}

消息分发处理类

packagetop.fate.service.impl;importcom.rabbitmq.client.Channel;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importtop.fate.config.RabbitMqConfig;importtop.fate.service.ConsumerReceiver;importjava.text.SimpleDateFormat;importjava.util.Date;/**
 * @auther:Wangxl
 * @Emile:[email protected]
 * @Time:2022/5/26 14:50
 */@ServicepublicclassAckReceiverimplementsChannelAwareMessageListener{privatestaticfinalLogger LOG =LogManager.getLogger();/**
     * 用户消息接收类
     */@AutowiredprivateConsumerReceiver consumerReceiver;@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{//时间格式SimpleDateFormat dateFormat =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        LOG.info("消息接收成功,接收时间:"+ dateFormat.format(newDate())+"\n");//获取队列名称String queueName = message.getMessageProperties().getConsumerQueue();//接收用户信息Json格式数据if(queueName.equals(RabbitMqConfig.DIRECT_QUEUE)){
            consumerReceiver.receiverJson(message, channel);}//延时接收用户信息Map格式数据if(queueName.equals(RabbitMqConfig.DELAY_QUEUE)){
            consumerReceiver.receiverMap(message, channel);}//多个队列的处理,则如上述代码,继续添加方法....}}

启动测试

在这里插入图片描述
项目启动的时候创建交换机绑定路由key,及创建队列
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_43627706/article/details/124981001
版权归原作者 fate急速出击 所有, 如有侵权,请联系我们删除。

“SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息”的评论:

还没有评论