一、快速入门
1.1安装
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-p 15692:15692 \
--network emer \
-d \
rabbitmq:3.8-management
首先创建queue,再创建exchange交换机,再建立交换机和队列之间的绑定管理。
交换机只负责路由消息,不负责存储。
1.2快速入门
先创建两个队列hello.q1和hello.q。
然后将两个队列加入到amq.fanout交换机中。
进入队列中,可以看到发布的消息。
1.3数据隔离
- 创建用户
- 为用户创建virtual host
- 测试不同virtual host之间的数据隔离现象
先创建一个用户,默认没有分配虚拟主机。
然后分配虚拟主机
发现交换机多了/hello的虚拟地址,因此我们可以选择正在不同的虚拟主机发送不同的消息,从而达到了数据隔离的的效果。
1.4Java客户端
1.4.1 快速入门
项目结构如下
#两个yml配置
spring:
rabbitmq:
host: 192.168.88.129
port: 5672
virtual-host: /hello
username: zhangsan
password: 123456
//测试
@SpringBootTest
class AmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testQueue(){
//队列名
String queueName = "simple.queue";
//消息
String msg = "hello,spring amqp";
//发送
rabbitTemplate.convertAndSend(queueName,msg);
}
}
记得在/hello的虚拟主机添加simple.queue的队列。
发送后
接收消息
@Component
@Slf4j
public class RabbitListener {
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
log.info("监听到simple.queue的消息:{}",msg);
}
}
1.4.2 WorkQueues
一个队列多个消费者。
@Component
@Slf4j
public class RabbitListener {
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "work.queue")
public void listenSimpleQueue1(String msg){
log.info("{}:消费者1:{}", LocalDateTime.now(),msg);
}
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "work.queue")
public void listenSimpleQueue2(String msg){
log.info("{}:消费者2:{}", LocalDateTime.now(),msg);
}
}
发布50条消息到work.queue后。
观察可得:1.一条消息只能被一个消费者使用。2.消息被均匀分配。
消息推送限制
默认情况下,MQ会将消息依次轮询投递给消费者,并未考虑消费者是否处理完消息,可能出现消息堆积。
修改消费者yml,设置prefetch为1,确保同一时刻最多投递给消费者1条消息。
配置后:不再轮询,将会能者多劳。
1.4.3 Fanout交换机
交换机:Fanout广播,Direct定向,Topic话题
Fanout会把消息路由到每一个绑定的Queue。
演示:
1.声明队列fanout.q1,fanout.q2。
2.声明交换机hello.fanout,绑定两个队列。
3.编写消费者。
4.向交换机发布消息。
此处省略1和2。
@Test
public void testFanoutQueue(){
String exchangeName = "hello.fanout";
for (int i = 0; i < 50; i++) {
String msg ="hello_"+ i;
rabbitTemplate.convertAndSend(exchangeName,null,msg);
}
}
结果:
每个消费者都消费所有消息。
1.4.4 Direct交换机
Direct Exchange会将消息根据规则路由到指定的Queue,因此成为定向路由。
1.每一个Queue都与Exchange设置一个BindingKey。
2.发布消息时指定RoutingKey。
3.交换机将消息路由到BindingKey与消息RoutingKey一致的队列。
业务场景:支付成功->加积分、修改交易状态;取消支付->修改交易状态
绑定时指定RoutingKey
//给666发,两个队列都能收到
@Test
public void testFanoutQueue(){
String exchangeName = "hello.direct";
for (int i = 0; i < 10; i++) {
String msg ="hello_"+ i;
rabbitTemplate.convertAndSend(exchangeName,"666",msg);
}
}
给888发,只有1能收到。
1.4.5 Topic交换机
TopicExchange也是基于RoutingKey做消息路由,但是RoutingKey通过多个单词连接,以.分隔
通配符:
- #:指0个或多个单词
- *:指一个单词
1.声明队列topic.queue1和topic.queue2
2.声明交换机hello.topic并绑定队列。
3.编写消费者。
4.发布消息。
@Test
public void testTopicQueue(){
String exchangeName = "hello.topic";
for (int i = 0; i < 10; i++) {
String msg ="hello_"+ i;
rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}
}
1和2都能收到。
1.4.6 声明队列和交换机
1.基于Bean
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("hello.fanout").build();
}
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("fanout.queue1").build();
}
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}
直接启动就能生完成队列和交换机的声明。
2.基于注解
@org.springframework.amqp.rabbit.annotation.RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hello.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenSimpleQueue1(String msg){
log.info("{}:消费者1:{}", LocalDateTime.now(),msg);
}
1.4.7 消息转换器
建议手动指定消息转换器。
父工程引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
队列再次拿到消息的大小明显减小且具有可读性。
二、基本使用
2.1 发送者可靠性
2.1.1 发送者重连
spring:
rabbitmq:
connection-timeout: 1s
template:
retry:
enabled: true #开启超时重试
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次等待时长的倍数
max-attempts: 3 #最大重试次数
注意:
SpringAMQP的重试机制是阻塞式的,影响业务性能。可以考虑异步线程来执行消息发送的代码。
2.1.2 发送者确认
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。
- 消息投递到了MQ,但是路由失败,会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功。
- 临时消息(non durable)投递成功,入队成功,返回ACK,告知成功。
- 持久消息(durable)投递成功,入队成功,返回ACK,告知成功。
1.yml配置开启确认机制
spring:
rabbitmq:
publisher-confirm-type: correlated #开启publisher confirm机制,设置确认类型
publisher-returns: true #开启publisher return
确认类型
none:关闭confirm,simple:同步阻塞等待MQ的回执消息,correlated:MQ异步回调方式返回回执消息。
2.添加ReturnCallback
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MQConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("监听到了消息return callback");
log.debug("exchange:{}",returnedMessage.getExchange());
log.debug("routingKey:{}",returnedMessage.getRoutingKey());
log.debug("message:{}",returnedMessage.getMessage());
log.debug("replyCode:{}",returnedMessage.getReplyCode());
log.debug("replyText:{}",returnedMessage.getReplyText());
});
}
}
3.发送消息,指定消息ID,消息ConfirmCallback
@Test
public void testConfirmCallback() throws InterruptedException {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.info("AMQP处理确认结果异常:{}", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
//拿到结果了,判断是否成功
if (result.isAck()) {
log.info("收到ConfirmCallback ACK,消息发送成功");
} else {
log.info("收到ConfirmCallback NACK,消息发送失败:{}", result.getReason());
}
}
});
String exchangeName = "hello.direct";
String message = "Hello MQ";
rabbitTemplate.convertAndSend(exchangeName, "red", message,cd);
Thread.sleep(5000);
}
2.2 数据持久化
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
1.一旦MQ宕机,内存中的消息会丢失。
2.内存空间有限,会导致消息积压,引发MQ阻塞。
- 交换机持久化 Durable
- 队列持久化 Durable
- 消息持久化 Delivery mode = Persistent
2.2.1 Lazy Queue
RabbitMQ3.6增加Lazy Queue惰性队列。
- 接收到消息(临时和持久)都直接写入磁盘,不再存到内存。
- 消费者消费消息时才从磁盘中读取加载到内存(可提前缓存部分消息到内存,最多2048条)。
3.12之后所有队列都是Lazy Queue模式,无法更改。
可见LazyQueue的效率根本更好一些。
2.3 消费者可靠性
消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除该消息。
- nack:消息处理失败,RabbitMQ需要再次投递消息。
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。
spring:
rabbitmq:
listener:
simple:
prefetch:
acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack
- none:不处理。即消息投递给消费者后立刻ack,立刻删除。非常不安全,不建议使用。
- manual:手动模式。需要自己在业务代码中调用api发送ack或reject,存在业务入侵,但更灵活。
- auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:
如果是业务异常,会自动返回nack。
如果是消息处理或校验异常,自动返回reject。
2.4 失败重试机制
在消费者出现异常时利用本地重试,而不是无限queue到MQ,在yml文件中配置。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态(事务)。如果业务中包含事务,这里改为false
失败消息处理策略
根据上面的配置,如果重试三次后依然失败,默认直接reject,丢弃消息。因此需要配置最终失败后的处理策略。
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(频率比无限重试小一点)。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
RepublishMessageRecoverer策略
1.定义失败消息的交换机、队列、绑定关系。
2.定义RepublishMessageRecoverer。
@Configuration
public class ErrorMessageConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
手动抛出异常后。
可在错误消息队列中看到报错信息。
2.5 业务幂等性
业务幂等性是指同一个业务,执行一次和执行N次对业务的影响是一致的。如查询、删除业务。
方案一:唯一消息ID
①每一条消息都生成一个唯一的id,与消息一起投递给消费者。
②消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
接收消息用Message
public void listenSimpleQueue1(Message message){
log.info("消息ID:{}",message.getMessageProperties().getMessageId());
log.info("{}:消费者1:{}", LocalDateTime.now(),message.getBody());
}
问题:影响原来业务的健壮性和性能,要记录ID,不推荐。
方案二:业务判断
在交易服务内宕机了,重新连接之后先判断订单是否支付,在执行后续操作。
2.7 延迟消息
应用场景:限制15分钟的支付时间。15分钟后查询是否完成了支付。
2.7.1 死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机 。
关键就是给普通队列指定一个私信交换机。
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
}
//发送延迟消息
@Test
public void sendDelayMessage() {
rabbitTemplate.convertAndSend("normal.direct", "key", "hello", message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
指定一个message的后置处理器,在处理器中添加过期时间,单位默认毫秒。
2.7.2 延迟消息插件
插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
安装插件
在安装时已经把plugins目录挂载出来了,查看命令
docker volume inspect mq-plugins
把插件放到此目录下重新加载即可。
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//监听
@org.springframework.amqp.rabbit.annotation.RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",type = ExchangeTypes.DIRECT,delayed = "true"),
key = {"key"}
))
public void listenSimpleQueue1(Message message){
log.info("{}:消费者1:{}", LocalDateTime.now(),message.getBody());
}
//发送
@Test
public void sendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "key", "hello", message -> {
message.getMessageProperties().setDelay(10000);
return message;
});
}
注意:应避免同时使用大量延迟消息,会消耗CPU计时,缩短延迟时间。
三、RabbitMQ优点
3.1.为什么使用RabbitMQ
1.流量削峰
使用消息队列做缓冲,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
2.异步调用
A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
3.业务解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,下单用户感受不到物流系统的故障,提升系统的可用性。
版权归原作者 C和弦与炊烟 所有, 如有侵权,请联系我们删除。