SpringBoot整合实现RabbitMQ
本文大纲
一.RabbitMQ介绍
二.RabbitMQ的工作原理
2.1 RabbitMQ的基本结构
2.2 组成部分说明
2.3 生产者发送消息流程
2.4 消费者接收消息流程
三.SpringBoot 整合实现RabbitMQ
3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
3.1.2 配置application.yml
3.1.3 配置RabbitMQ常量类
3.1.4 创建RabbitMQConfig配置类
3.1.5 创建生产者用于发送消息
3.1.6 创建一个类,用于模拟测试
3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
3.2.2 配置application.yml
3.2.3 配置RabbitMQ常量类
3.2.4 创建RabbitMQConfig配置类
3.2.5 创建消费者消息监听
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息
一.RabbitMQ介绍
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
二.RabbitMQ的工作原理
2.1 RabbitMQ的基本结构:
2.2 组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
2.3 生产者发送消息流程:
1.生产者和Broker建立TCP连接。
2.生产者和Broker建立通道。
3.生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4.Exchange将消息转发到指定的Queue(队列)
2.4 消费者接收消息流程:
1.消费者和Broker建立TCP连接
2.消费者和Broker建立通道
3.消费者监听指定的Queue(队列)
4.当有消息到达Queue时Broker默认将消息推送给消费者。
三.SpringBoot 整合实现RabbitMQ
创建2个springboot项目,一个 mq-rabbitmq-producer(生产者),一个mq-rabbitmq-consumer(消费者)。
3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.1.2 配置application.yml
server:
port: 8080
spring:
application:
name: mq-rabbitmq-producer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
#注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器
username: guest
password: guest
#虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
virtual-host: /
#支持发布返回
publisher-returns: true
listener:
# Routing 路由模型(交换机类型:direct)
direct:
#消息确认:手动签收
acknowledge-mode: manual
#当前监听容器数
concurrency: 1
#最大数
max-concurrency: 10
#是否支持重试
retry:
enabled: true
#重试次数5,超过5次抛出异常
max-attempts: 5
#重试间隔 3s
max-interval: 3000
采用 Routing 路由模型(交换机类型:direct)方式,实现RabbitMQ消息队列。
3.1.3 配置RabbitMQ常量类
配置直连交换机名称、消息队列名称、routingkey
package com.example.mqrabbitmqproducer.util.rabbitmq;
/**
* RabbitMQ RoutingKey 常量工具类
* @author qzz
*/
public class RabbitMQConstantUtil {
/**
* 交换机名称
*/
public static final String DIRECT_EXCHANGE = "directExchange";
/**
* 取消订单 队列名称 routingkey
*/
public static final String CANCEL_ORDER = "cancel-order";
/**
* 自动确认订单 队列名称\routingkey
*/
public static final String CONFIRM_ORDER = "confirm-order";
}
注意:这里把消息队列名称和routingkey设置为同名。
3.1.4 创建RabbitMQConfig配置类
rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
package com.example.mqrabbitmqproducer.util.rabbitmq.config;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
* @author qzz
*/
@Configuration
@EnableRabbit
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/**
* 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
* 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
*/
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//开启手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate amqpTemplate(){
rabbitTemplate.setEncoding("UTF-8");
rabbitTemplate.setMandatory(true);
/**
* ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
* config : 需要开启rabbitmq发送失败回退
* yml配置publisher-returns: true
* 或rabbitTemplate.setMandatory(true);设置为true
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
byte[] message = returnedMessage.getMessage().getBody();
Integer replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
new String(message),messageId,replyCode,replyText,exchange,routingKey);
});
return rabbitTemplate;
}
/**
* 声明直连交换机 支持持久化
* @return
*/
@Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
public Exchange directExchange(){
return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
}
/**
* 取消订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CANCEL_ORDER)
public Queue cancelOrderQueue(){
return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
}
/**
* 把取消订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
}
/**
* 自动确认订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
public Queue confirmOrderQueue(){
return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
}
/**
* 把自动确认订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
}
}
3.1.5 创建生产者用于发送消息
package com.example.mqrabbitmqproducer.util.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* Routing 路由模型(交换机类型:direct)
* 消息生成者
* @author qzz
*/
@Component
public class DirectSender {
private static final Logger log = LoggerFactory.getLogger(DirectSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param routingKey
* @param msg
*/
public void send (String routingKey,String msg){
Message message = MessageBuilder.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID()+"").build();
log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE
,routingKey,message.getMessageProperties().getMessageId());
rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message);
}
}
3.1.6 创建一个类,用于模拟测试
package com.example.mqrabbitmqproducer.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 模拟测试消息发送
* @author qzz
*/
@RestController
@RequestMapping("/order")
public class TestRabbitMQSendMsg {
/**
* rabbitMQ消息发送
*/
@Autowired
private DirectSender directSender;
/**
* 测试取消订单,发送消息
*/
@GetMapping("/cancel")
public void cancel(){
//取消订单逻辑省略
//取消订单,发送消息
Map<String, Object> map = new HashMap<>();
map.put("order_number","4364756867987989");
map.put("product_id","1");
directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map));
}
/**
* 测试自动确认订单,发送消息
*/
@GetMapping("/confirm")
public void confirm(){
//自动确认订单,发送消息
String order_number="4364756867987989";
directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number);
}
}
启动项目,进行测试:
(1)在postman中输入 http://localhost:8080/order/cancel,进行测试:
(2)在postman中输入 http://localhost:8080/order/confirm,进行测试:
3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2.2 配置application.yml
server:
port: 8083
spring:
application:
name: mq-rabbitmq-consumer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
#注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器
username: guest
password: guest
#虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
virtual-host: /
3.2.3 配置RabbitMQ常量类
配置直连交换机名称、消息队列名称、routingkey
package com.example.mqrabbitmqconsumer.util.rabbitmq;
/**
* RabbitMQ RoutingKey 常量工具类
* @author qzz
*/
public class RabbitMQConstantUtil {
/**
* 交换机名称
*/
public static final String DIRECT_EXCHANGE = "directExchange";
/**
* 取消订单 队列名称 \routingkey
*/
public static final String CANCEL_ORDER = "cancel-order";
/**
* 自动确认订单 队列名称\routingkey
*/
public static final String CONFIRM_ORDER = "confirm-order";
}
注意:这里把消息队列名称和routingkey设置为同名。
3.2.4 创建RabbitMQConfig配置类
rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
package com.example.mqrabbitmqconsumer.util.rabbitmq.config;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
* @author qzz
*/
@Configuration
@EnableRabbit
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/**
* 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
* 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
*/
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//开启手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate amqpTemplate(){
rabbitTemplate.setEncoding("UTF-8");
rabbitTemplate.setMandatory(true);
/**
* ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
* config : 需要开启rabbitmq发送失败回退
* yml配置publisher-returns: true
* 或rabbitTemplate.setMandatory(true);设置为true
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
byte[] message = returnedMessage.getMessage().getBody();
Integer replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
new String(message),messageId,replyCode,replyText,exchange,routingKey);
});
return rabbitTemplate;
}
/**
* 声明直连交换机 支持持久化
* @return
*/
@Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
public Exchange directExchange(){
return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
}
/**
* 取消订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CANCEL_ORDER)
public Queue cancelOrderQueue(){
return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
}
/**
* 把取消订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
}
/**
* 自动确认订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
public Queue confirmOrderQueue(){
return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
}
/**
* 把自动确认订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
}
}
3.2.5 创建消费者消息监听
(1)监听取消订单
package com.example.mqrabbitmqconsumer.listener;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 监听取消订单
* @author qzz
*/
@Component
public class RabbitMQCancelOrderListener {
private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class);
/**
* 接受消息
* @param channel
* @param message
* @throws Exception
*/
@RabbitHandler
@RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER)
public void receiverMsg(Channel channel, Message message) throws Exception {
//body 即消息体
String msg = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);
try{
//如果有业务逻辑,则在这里编写
//告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("消息处理出现异常:{}",e.getMessage());
//告诉消息服务器 消息处理异常,消息需要重新再次发送!
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
}
}
}
(2)监听自动确认订单
package com.example.mqrabbitmqconsumer.listener;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 监听自动确认订单
* @author qzz
*/
@Component
public class RabbitMQConfirmOrderListener {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class);
/**
* 接受消息
* @param channel
* @param message
* @throws Exception
*/
@RabbitHandler
@RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER)
public void receiverMsg(Channel channel, Message message) throws Exception {
//body 即消息体
String msg = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);
try{
//如果有业务逻辑,则在这里编写
//告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("消息处理出现异常:{}",e.getMessage());
//告诉消息服务器 消息处理异常,消息需要重新再次发送!
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
}
}
}
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息
版权归原作者 jarenyVO 所有, 如有侵权,请联系我们删除。