1、RabbitMQ 介绍
MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ则是一个开源的消息中间件,是一个具体的软件产品。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
1. 消息中间件
- 中间件是一种独立的系统软件服务程序,位于客户机服务器的操作系统之上,管理着计算资源和网络通信。 分布式应用系统可以借助中间件这种软件在不同的技术之间共享资源。
- 消息中间件是支持在分布式系统之间发送和接收消息的软件。
2. AMQP 协议
AMQP 不是一个具体的消息中间件产品,而是一个协议规范。他是一个开放的
消息产地协议
,是一种应用层的标准协议,为面向消息的中间件设计。AMQP 提供了一种统一的消息服务,使得不同程序之间可以通过消息队列进行通信。 SpringBoot 框架默认就提供了对 AMQP 协议的支持。
- AMQP协议概念
概念描述Broker接收和分发消息的应用,RabbitMQ Server就是Message Broker。Virtual Host为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts)的概念,当多个不同的用户使用同一个RabbitMQ时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等。Connection生产者、消费者和Broker之间的TCP连接。Channel是在Connection内部建立的逻辑连接,它作为轻量级的Connection,极大减少了操作系统建立TCP连接的开销。Exchange交换机,用于接收消息,根据分发规则,匹配Routing Key,分发消息到队列中去。Queue队列,消息最终被送到这里等待消费者取走。Binding用于描述消息队列与交换机之间的关系。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则。因此可以将交换器看成一个由绑定构成的路由表Routing Key路由规则,可用来确定如何路由一个特定消息。Message消息Publisher消息发布者Consumer消费者3. 应用场景
- 应用解耦
用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。当库存系统出现故障时,订单就会失败。库存服务和订单服务高度耦合。
引入消息队列,用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存服务从消息队列中消费消息,消息队列做持久化处理,就算库存服务出现故障,也能保证消息的不丢失。
- 异步消息通信
用户注册后,需要发注册邮件和注册短信,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。
- 流量削峰
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。等系统压力小了在执行消息队列中的消息。
2、RabbitMQ 工作原理
1. RabbitMQ 消息模型
基本消息队列-
P
是生产者。-C
是消费者。- 中间的Queue
区域是一个队列,表示消息缓冲区。工作消息队列在使用消息系统时,一般情况下生产者往队列里插入数据的速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据的速度。因此,如果一个队列只有一个消费者的话,很容易导致大量的消息堆积在队列里,这时,就可以使用工作队列,这样一个队列可以有多个消费者同时消费数据。当队列有多个消费者时,消息会被哪个消费者消费呢?这里主要有两种模式:1. 轮询分发:一个消费者消费一条;2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;
2. RabbitMQ 交换机
Direct exchange(直连交换机)- 直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)- 当一个携带着路由值为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。- 交换机和队列之间产生关联需要使用binding,可理解为交换机只向与其绑定的队列中发送消息,这里的绑定关系需要使用Routing Key;相对比fanout交换机,direct交换机在其基础上,多加了一层秘钥(Routing Key)。
Fanout exchange(扇型交换机)- 扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的N个队列- 在RabbitMQ中,一个队列上的一条消息只能被一个消费者消费,要实现一条消息被多个消费者同时消费,需要借助多个队列(也就是需要将这一条消息发送到多个队列)。fanout交换非常简单,它仅将收到的所有消息广播到所有队列。fanout交换机不设置路由键,我们只需要将队列绑定到交换机上,生产者发送到fanout交换机的消息都会被转发到与该交换机绑定的所有队列上,很像子网广播,每台子网内的主机都能获得了一份消息。
Topic exchange(主题交换机)- 主题交换机(topic exchanges)中,队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。- topic交换机是在direct交换机的基础上,支持了对routing key的通配符匹配(
*
号和#
号),以满足更加复杂的消息分发场景。*
:精确匹配一个词(一个字母、一个单词)#
:匹配零个或多个词(多个字母、多个单词,都以**"点"分割)注:**- 如果队列绑定的路由键是#
,那么这个队列可以接收所有数据,就类似于fanout交换机了- 如果队列绑定的路由键键当中没有#
和*
,那么是direct交换机了
3、RabbitMQ 使用
- 配置Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><!-- 整合RabbitMQ --><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 编写
application.ym
配置文件server: port: 8080spring: #配置rabbitMq 服务器 rabbitmq: # IP 地址 host: 192.168.136.128 # 端口号 port: 5672 # 虚拟主机 virtual-host: / # 用户名 username: et # 密码 password: et
1. 扇型交换机(Fanout Exchang)
fanout交换机不设置路由键,我们只需要将队列绑定到交换机上,生产者发送到fanout交换机的消息都会被转发到与该交换机绑定的所有队列上,很像子网广播,每台子网内的主机都能获得了一份消息。
配置交换机和队列
import jakarta.servlet.http.PushBuilder;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig { public static final String EXCHANGE = "etoak"; public static final String QUEUE_ET2402 = "ET2402"; public static final String QUEUE_ET2403 = "ET2403"; @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE); } @Bean public Queue et2402(){ //默认持久化 return new Queue(QUEUE_ET2402); } @Bean public Queue et2403(){ return new Queue(QUEUE_ET2403); } @Bean public Binding et2402Binding(){ return BindingBuilder.bind(et2402()).to(fanoutExchange()); } @Bean public Binding et2403Binding(Queue et2403,FanoutExchange fanoutExchange){ return BindingBuilder.bind(et2403).to(fanoutExchange); }}
发送消息
import com.etoak.config.FanoutConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/fanout")public class FanoutController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/send") public String send(@RequestParam String msg){ rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE,"",msg); return "success"; }}
执行结果
消费消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class ConsumerService { @RabbitListener(queues = "ET2402") public void consume(String msg){ System.out.println("消费ET2402消息:"+msg); } @RabbitListener(queues = "ET2403") public void consume2(String msg){ System.out.println("消费ET2403消息:"+msg); }}
2. 直连交换机(direct exchange)
配置交换机和队列
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectConfig { public static final String EXCHANGE = "amqp.color"; public static final String QUEUE_1 = "Q1"; public static final String QUEUE_2 = "Q2"; public static final String ORANGE = "orange"; public static final String BLACK = "black"; public static final String GREEN = "green"; @Bean public DirectExchange colorExchange(){ return new DirectExchange(EXCHANGE); } @Bean public Queue queue1(){ return new Queue(QUEUE_1); } @Bean public Queue queue2(){ return new Queue(QUEUE_2); } // 指定的routing key 为 orange 时 路由到Q1队列 @Bean public Binding orangeBinding(Queue queue1,DirectExchange colorExchange){ return BindingBuilder.bind(queue1).to(colorExchange).with(ORANGE); } // 指定的routing key 为 black 时 路由到Q2队列 @Bean public Binding blackBinding(Queue queue2,DirectExchange colorExchange){ return BindingBuilder.bind(queue2).to(colorExchange).with(BLACK); } // 指定的routing key 为 green 时 路由到Q2队列 @Bean public Binding greenBinding(Queue queue2,DirectExchange colorExchange){ return BindingBuilder.bind(queue2).to(colorExchange).with(GREEN); }}
发送消息
import com.etoak.config.DirectConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/direct")public class DirectController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/send") public String send(@RequestParam String key,@RequestParam String msg){ rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE,key,msg); return "success"; }}
执行结果
3. 主题交换机(Topic Exchange)
topic交换机是在direct交换机的基础上,支持了对routing key的通配符匹配(
*
号和
#
号)
配置交换机和队列
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class TopicConfig { public static final String EXCHANGE = "topic"; public static final String QUEUE_3 = "Q3"; public static final String QUEUE_4 = "Q4"; public static final String ORANGE = "*.orange.*"; public static final String RABBIT = "*.*.rabbit"; public static final String LAZY = "lazy.#"; @Bean public TopicExchange topicExchange(){ return new TopicExchange(EXCHANGE); } @Bean public Queue queue3(){ return new Queue(QUEUE_3); } @Bean public Queue queue4(){ return new Queue(QUEUE_4); } @Bean public Binding queue3Binding(Queue queue3,TopicExchange topicExchange){ return BindingBuilder.bind(queue3).to(topicExchange).with(ORANGE); } @Bean public Binding rabbitBinding(Queue queue4,TopicExchange topicExchange){ return BindingBuilder.bind(queue4).to(topicExchange).with(RABBIT); } @Bean public Binding lazyBinding(Queue queue4,TopicExchange topicExchange){ return BindingBuilder.bind(queue4).to(topicExchange).with(LAZY); }}
发送消息
import com.etoak.config.TopicConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/topic")public class TopicController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/send") public String send(){ // 匹配 Q3 rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE,"an.orange.juice","橙汁"); //匹配 Q4 rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE,"big.white.rabbit","大白兔"); // 匹配 Q4 Q3 rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE,"an.orange.rabbit","橙色兔子"); // 匹配 Q4 rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE,"lazy.rabbit","懒兔子"); return "success"; }}
4、延迟队列
延迟队列存储的消息一般都是 延时消息 ,所谓 延时消息 是指当消息被发送以后,并不想让消费者立即消费消息,而是等待指定时间 后,才允许消费者来消费这条消息;
1. 应用场景
- 用户下单后30分钟未支付,使用延迟队列功能取消超时的订单
- 注册一个网站,24小时(或几天)内未登录,发送邮件或短信通知
2.安装延迟交换机插件
- 插件安装目录: /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.22/plugins
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.22/plugin
- 将 rabbitmq_delayed_message_exchange-3.8.0.ez 插件上传到插件安装目录
- 安装命令,在plugins目录下执行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 安装完成之后,RabbitMQ的Web控制台Exchanges选项卡下会出现一个交换机 ,如下图所示
配置延迟交换机和队列
package com.etoak.config;
import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
public static final String EXCHANGE = "amqp.delayed";
public static final String QUEUE = "queue.delayed";
public static final String KEY = "delay";
/**
* RabbitMQ 没有一个类型是表示延迟交换机
* 交换机只有四个类型:fanout、direct、topic、headers
* 可以使用四个中的其中一个来模拟延迟交换机
* 使用CustomExchange定制延迟交换机
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// 使用哪种交换机模拟延迟交换机
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE,
"x-delayed-message", // 类型 - 延迟交换机
true,
false,
args);
}
@Bean
public Queue delayedQueue() {
return new Queue(QUEUE);
}
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(KEY).noargs();
}
}
消息生产者
import cn.hutool.core.date.DateUtil;
import com.etoak.config.DelayedConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String send(int second, @RequestParam String msg) {
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE,
DelayedConfig.KEY,
msg,
message -> {
// 设置延迟时间(不是过期时间), 单位ms
message.getMessageProperties().setDelay(1000 * second);
return message;
});
System.out.println(msg + "--发送时间--" + DateUtil.now());
return "success";
}
}
消息消费者
import cn.hutool.core.date.DateUtil;
import com.etoak.config.DelayedConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class ConsumerService {
@RabbitListener(queues = DelayedConfig.QUEUE)
public void consume(Channel channel, Message message) throws IOException {
String msg = new String(message.getBody());
System.out.println(msg + "--处理时间--" + DateUtil.now());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
}
版权归原作者 ✎ℳ₯㎕.baby℡825 所有, 如有侵权,请联系我们删除。