文章目录
安装消息中间件
Windows安装ErLang
https://github.com/erlang/otp/releases/tag/OTP-25.0
Windows安装RabbitMq
https://www.rabbitmq.com/install-windows.html
安装RabbitMq UI界面
打开RabbitMQ Command Prompt 进入命令行
# 查看mq服务状态
rabbitmqctl.bat status
# 安装ui界面
rabbitmq-plugins enable rabbitmq_management
访问http://localhost:15672/
默认账号密码guest/guest
安装延时消息插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将 ez文件拷贝到安装目录rabbitmq_server-3.10.2\plugins下
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
SpringBoot整合
这里我直接用我先前建好的微服务
order-service作为消息发送者,storage-service作为消息接收者
消息发送端order-service
添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
application.yml
spring:rabbitmq:username: guest
password: guest
host: 127.0.0.1
port:5672# 消息确认(ACK)publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)publisher-returns:true#确认消息已发送到队列(Queue)
RabbitMqConfig
packagetop.fate.config;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 11:41
*/@ConfigurationpublicclassRabbitMqConfig{privatestaticfinalLogger LOG =LogManager.getLogger();publicstaticfinalString DIRECT_QUEUE ="direct_queue";//Direct队列名称publicstaticfinalString DIRECT_EXCHANGE ="direct_exchange";//交换器名称publicstaticfinalString DIRECT_ROUTING_KEY ="direct_routing_key";//路由键publicstaticfinalString DELAY_QUEUE ="delay_queue";//延时队列名称publicstaticfinalString DELAY_EXCHANGE ="delay_exchange";//交换器名称publicstaticfinalString DELAY_ROUTING_KEY ="delay_routing_key";//路由键@AutowiredprivateCachingConnectionFactory connectionFactory;@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);//设置Json转换器
rabbitTemplate.setMessageConverter(jsonMessageConverter());//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);//确认消息送到交换机(Exchange)回调
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
LOG.info("\n确认消息送到交换机(Exchange)结果:");
LOG.info("相关数据:"+ correlationData);
LOG.info("是否成功:"+ ack);
LOG.info("错误原因:"+ cause);}});//确认消息送到队列(Queue)回调
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
LOG.info("\n确认消息送到队列(Queue)结果:");
LOG.info("发生消息:"+ returnedMessage.getMessage());
LOG.info("回应码:"+ returnedMessage.getReplyCode());
LOG.info("回应信息:"+ returnedMessage.getReplyText());
LOG.info("交换机:"+ returnedMessage.getExchange());
LOG.info("路由键:"+ returnedMessage.getRoutingKey());}});return rabbitTemplate;}/**
* Json转换器
*/@BeanpublicJackson2JsonMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}/**
* Direct交换器
*/@BeanpublicDirectExchangedirectExchange(){/**
* 创建交换器,参数说明:
* String name:交换器名称
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
*/returnnewDirectExchange(DIRECT_EXCHANGE,true,false);}/**
* 队列
*/@BeanpublicQueuedirectQueue(){/**
* 创建队列,参数说明:
* String name:队列名称。
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
* boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
* 当没有生产者或者消费者使用此队列,该队列会自动删除。
* Map<String, Object> arguments:设置队列的其他一些参数。
*/returnnewQueue(DIRECT_QUEUE,true,false,false,null);}/**
* 绑定
*/@BeanBindingdirectBinding(DirectExchange directExchange,Queue directQueue){//将队列和交换机绑定, 并设置用于匹配键:routingKey路由键returnBindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}/******************************延时队列******************************/@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false, args);}@BeanpublicQueuedelayQueue(){Queue queue =newQueue(DELAY_QUEUE,true);return queue;}@BeanpublicBindingdelaybinding(Queue delayQueue,CustomExchange delayExchange){returnBindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}
实体对象
packagetop.fate.entity;importlombok.Data;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:26
*/@DatapublicclassTestEntity{privateString username;privateString password;publicTestEntity(String username,String password){this.username = username;this.password = password;}publicTestEntity(){}}
生产者服务接口
packagetop.fate.service;importjava.util.Map;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:29
*/publicinterfaceProducerService{/**
* 发送json格式数据
*
* @param o
*/voidsendTestJson(Object o);/**
* 延时发送map格式数据
*
* @param map
*/voidsendDelayTestMap(Map map);}
生产者服务实现类
packagetop.fate.service.impl;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importtop.fate.config.RabbitMqConfig;importtop.fate.service.ProducerService;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.Map;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:30
*/@ServicepublicclassProducerServiceImplimplementsProducerService{privatestaticfinalLogger LOG =LogManager.getLogger();SimpleDateFormat formatter =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 发送json格式数据
*
* @param o
*/@OverridepublicvoidsendTestJson(Object o){
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE,RabbitMqConfig.DIRECT_ROUTING_KEY, o);
LOG.info("json格式的数据发送成功 发送时间为"+ formatter.format(newDate()));}/**
* 延时发送map格式数据
*
* @param map
*/@OverridepublicvoidsendDelayTestMap(Map map){
rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE,RabbitMqConfig.DELAY_ROUTING_KEY, map,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setHeader("x-delay",5000);return message;}});
LOG.info("map格式的数据发送成功 发送时间为"+ formatter.format(newDate()));}}
测试Controller
packagetop.fate.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importtop.fate.entity.TestEntity;importtop.fate.service.ProducerService;importjava.util.HashMap;importjava.util.Map;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:43
*/@RestController@RequestMapping(value ="producer")publicclassProducerController{@AutowiredprivateProducerService producerService;@GetMapping("sendObject")publicvoidsendObject(){
producerService.sendTestJson(newTestEntity("user","123456"));}@GetMapping("sendMap")publicvoidsendMap(){Map map =newHashMap();
map.put("user1",newTestEntity("user1","123"));
map.put("user2",newTestEntity("user2","123"));
map.put("user3",newTestEntity("user3","123"));
producerService.sendDelayTestMap(map);}}
消息接收端storage-service
添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
RabbitMqConfig
packagetop.fate.config;importorg.springframework.amqp.core.AcknowledgeMode;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importtop.fate.service.impl.AckReceiver;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:46
*/@ConfigurationpublicclassRabbitMqConfig{publicstaticfinalString DIRECT_QUEUE ="direct_queue";//Direct队列名称publicstaticfinalString DELAY_QUEUE ="delay_queue";//延时队列名称/**
* 消息接收确认处理类
*/@AutowiredprivateAckReceiver ackReceiver;@AutowiredprivateCachingConnectionFactory connectionFactory;/**
* 客户端配置
* 配置手动确认消息、消息接收确认
*/@BeanpublicSimpleMessageListenerContainersimpleMessageListenerContainer(){//消费者数量,默认10int DEFAULT_CONCURRENT =10;//每个消费者获取最大投递数量 默认50int DEFAULT_PREFETCH_COUNT =50;SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(DEFAULT_CONCURRENT);
container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//添加队列,可添加多个队列
container.addQueues(newQueue(DIRECT_QUEUE,true));
container.addQueues(newQueue(DELAY_QUEUE,true));//设置消息处理类
container.setMessageListener(ackReceiver);return container;}}
消息接收接口
packagetop.fate.service;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importjava.io.IOException;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:52
*/publicinterfaceConsumerReceiver{voidreceiverJson(Message message,Channel channel)throwsIOException;voidreceiverMap(Message message,Channel channel)throwsIOException;}
消息接收实现类
packagetop.fate.service.impl;importcom.alibaba.fastjson.JSON;importcom.rabbitmq.client.Channel;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.Message;importorg.springframework.stereotype.Service;importtop.fate.entity.TestEntity;importtop.fate.service.ConsumerReceiver;importjava.io.IOException;importjava.util.Map;importjava.util.Set;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:54
*/@ServicepublicclassConsumerReceiverImplimplementsConsumerReceiver{privatestaticfinalLogger LOG =LogManager.getLogger();@OverridepublicvoidreceiverJson(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//将JSON格式数据转换为实体对象TestEntity testEntity = JSON.parseObject(message.getBody(),TestEntity.class);
LOG.info("接收者收到JSON格式消息:");System.out.println("账号:"+ testEntity.getUsername());System.out.println("密码:"+ testEntity.getPassword());/**
* 确认消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
*/
channel.basicAck(deliveryTag,true);/**
* 否定消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*///channel.basicNack(deliveryTag, true, false);}catch(Exception e){/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag,false);
e.printStackTrace();}}@OverridepublicvoidreceiverMap(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//将JSON格式数据转换为Map对象Map map = JSON.parseObject(message.getBody(),Map.class);
LOG.info("接收者收到Map格式消息:");
LOG.info(map.get("user1"));
LOG.info(map.get("user2"));
LOG.info(map.get("user3"));//确认消息
channel.basicAck(deliveryTag,true);//否定消息//channel.basicNack(deliveryTag, true, false);}catch(Exception e){//拒绝消息
channel.basicReject(deliveryTag,false);
e.printStackTrace();}}}
消息分发处理类
packagetop.fate.service.impl;importcom.rabbitmq.client.Channel;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importtop.fate.config.RabbitMqConfig;importtop.fate.service.ConsumerReceiver;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @auther:Wangxl
* @Emile:[email protected]
* @Time:2022/5/26 14:50
*/@ServicepublicclassAckReceiverimplementsChannelAwareMessageListener{privatestaticfinalLogger LOG =LogManager.getLogger();/**
* 用户消息接收类
*/@AutowiredprivateConsumerReceiver consumerReceiver;@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{//时间格式SimpleDateFormat dateFormat =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LOG.info("消息接收成功,接收时间:"+ dateFormat.format(newDate())+"\n");//获取队列名称String queueName = message.getMessageProperties().getConsumerQueue();//接收用户信息Json格式数据if(queueName.equals(RabbitMqConfig.DIRECT_QUEUE)){
consumerReceiver.receiverJson(message, channel);}//延时接收用户信息Map格式数据if(queueName.equals(RabbitMqConfig.DELAY_QUEUE)){
consumerReceiver.receiverMap(message, channel);}//多个队列的处理,则如上述代码,继续添加方法....}}
启动测试
项目启动的时候创建交换机绑定路由key,及创建队列
版权归原作者 fate急速出击 所有, 如有侵权,请联系我们删除。