RabbitMQ= Message Queue 消息队列
素材来源(支持原创):
源码链接: link
官网链接: link
简介
RabbitMq是一个广泛使用的消息服务器,采用Erlang语言编写,是一种开源的实现AMQP(高级消息队列协议)的消息中间件
What is RabbitMQ?
常用的消息中间件
MQ的应用场景
异步处理
按照正常流程下订单操作需要700ms, 通过MQ异步执行 只需要200ms 向MQ发送完消息即可
系统解耦
以前系统之间是 A发送B、然后在发送C、D系统 如果再加一个E系统 A系统还需要再发送E系统... 麻烦
添加MQ后
1.A系统只需要发送消息到MQ上 谁需要消息 谁去MQ上拿去即可
2.系统A可以是Java编写 B系统可以使用Go编写 C系统使用Python编写
流量削峰(限流)
通过添加MQ实现限流的操作, 如果没有MQ,QPS请求全部打到数据库上,会造成DB服务器的压力,有可能宕机,现在将请求发送到MQ上 通过系统B来匀速消费请求
日志处理
在微服务环境中,系统可能有成千上百个,每个服务器都会记录大量日志 logger.info() 当我们查看日志的时候 ,需要到每一台服务器上去查看, 现在我们的请求全都发送到MQ上 查看起日志更加方便
主要通过ELK(elastic search +logstash +kabana)日志收集系统在kabana上查看
RabbitMQ的运行环境搭建
- Erlang以及RabbitMq的版本选择(要配套) 版本兼容说明地址:
下载安装Erlang(采用25.1.1版本) 下载好后上传到linux上
因为RabbitMQ是Erlang语言开发的 要先下载安装Erlang
[ErLang下载地址:](https://www.erlang.org/patches/otp-25.1.1)
- 由于Erlang需要其他依赖 所以安装Erlang需要先安装其他依赖
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
**说明:yum -y install 是linux安装依赖的命令 -y代表自动确认下载**
- 解压Erlang文件
tar -zxvf opt-src-25.1.1.tar.gz -C/usr/local
**说明:-C 是指定目录 不写就是当前目录**
- 配置 因为下载的Erlang是源代码包,需要把它编译成可以运行的
./configure
- 编译
make
说明:执行make命令 把源代码编译成可以运行的程序
- 安装
make install
说明: 执行make install 就是将编译后能运行的程序 添加到path下 类似配置java_home 执行后就可以在任何地方都可以运行Erlang了
- 验证Erlang是否安装成功 输入
erl
代表成功
下载安装RabbitMQ(3.10.11)
- 下载RabbitMq安装包 上传到linux上 RabbitMQ下载地址:
- 展开下载包
tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C/usr/local
说明: 不需要解压 tar -zxvf 直接展开就好 tar -xvf 添加 -C 就是展开在指定的目录
- 进入展开的位置 启动rabbitMQ
cd /usr/local
cd rabbitmq_server-3.10.11/
cd sbin/./rabbitmq-server //启动rabbitMq
- 启动、关闭RabbitMq 以及查看RabbitMQ状态
./rabbitmq-server -detached //后端启动
ps -ef|grep rabbit // 查看rabbitMQ进程./rabbitmqctl status // 查看rabbitMQ当前状态./rabbitmqctl shutdown //关闭rabbitMQ
- 配置RabbitMQ的环境变量 - 打开配置文件
vi +$ /etc/profile // 打开配置文件 添加+$ 表示光标选中文件的最后一行
- 添加环境变量
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11PATH=$PATH:$RABBIT_HOME/sbin
export RABBIT_HOMEPATH
- 保存退出 :wq!
- 刷新配置文件
source /etc/profile
- 然后就可以在任何地方启动rabbitMQ了
rabbitmq-server -detached //启动rabbitMQ
rabbitmqctl status //查看rabbitMQ状态
添加rabbitMQ用户 赋予角色+权限
rabbitmqctl add_user admin 123456// 添加用户:admin 密码:123456
rabbitmqctl list_users //查看添加用户列表
rabbitmqctl set_user_tags admin administrator // 设置用户:admin 角色:administrator
rabbitmqctl set_permissions ".*"".*"".*"//设置用户admin权限为:可读 可写 可配置
rabbitmqctl list_permissions //查看用户权限
启动RabbitMQ的web后台 需要添加插件
cd /usr/local/rabbitmq_server-3.10.11/sbin //切换到sbin下
rabbitmq-plugins list //查看rabbitMQ下的插件
rabbitmq-plugins enable rabbitmq_management //启动rabbitmq_management 插件
- 然后就可以执行 http://192.168.110.178:15672/ 来访问rabbitMQ了 这个时候不行 需要打开防火墙
systemctl status firewalld // 查看防火墙状态
systemctl disable firewalld //设置防火墙开启不自动启动
systemctl stop firewalld //关闭防火墙
出现下图表示成功
添加虚拟主机步骤如图
RabbitMQ的工作模型
交换机的类型
- 扇形交换机
- 群发策略 提供者发送消息到 交换机 上 交换机绑定几个队列 就将消息发送几遍
- 配置类
@ConfigurationpublicclassRabbitConfig{// rabbitMQ 三部曲// 1.定义交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("exchange.fanout");}// 2.定义队列@BeanpublicQueuequeueA(){returnnewQueue("queue.fanout.a");}@BeanpublicQueuequeueB(){returnnewQueue("queue.fanout.b");}// 3.绑定交换机和队列 扇形交换机不需要路由 直接绑定交换机就行了@BeanpublicBindingbingingA(FanoutExchange fanoutExchange,Queue queueA){returnBindingBuilder.bind(queueA).to(fanoutExchange);}@BeanpublicBindingbingingB(FanoutExchange fanoutExchange,Queue queueB){returnBindingBuilder.bind(queueB).to(fanoutExchange);}}
- 业务类
@Component@Slf4jpublicclassMessageService{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String exchange,String routingKey,String message){Message msg =newMessage(message.getBytes());
rabbitTemplate.convertAndSend(exchange,"",msg);
log.info("消息发送成功");}}
- 直连交换机
-一对一精确策略 根据路由精确发送消息
- 配置类
@Component@Slf4jpublicclassDirectConfig{@Value("${local.exchangeName}")privateString exchangeName;@Value("${local.queueA}")privateString queueA;@Value("${local.queueB}")privateString queueB;// 定义直连交换机@BeanpublicDirectExchangedirectExchange(){// 使用建造者模式创建 交换机returnExchangeBuilder.directExchange(exchangeName).build();}// 定义队列@BeanpublicQueuequeueA(){returnQueueBuilder.durable(queueA).build();}@BeanpublicQueuequeueB(){returnQueueBuilder.durable(queueB).build();}// 定义绑定关系@BeanpublicBindingbingingA(DirectExchange directExchange,Queue queueA){returnBindingBuilder.bind(queueA).to(directExchange).with("error");}@BeanpublicBindingbingingB1(DirectExchange directExchange,Queue queueB){returnBindingBuilder.bind(queueB).to(directExchange).with("info");}@BeanpublicBindingbingingB2(DirectExchange directExchange,Queue queueB){returnBindingBuilder.bind(queueB).to(directExchange).with("error");}@BeanpublicBindingbingingB3(DirectExchange directExchange,Queue queueB){returnBindingBuilder.bind(queueB).to(directExchange).with("warning");}}
- 直连交换机发送消息业务
@Service@Slf4jpublicclassMessageService{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message){// 使用建造者模式 创建消息Message msg =MessageBuilder.withBody(message.getBytes()).build();// 直连交换机 必须指定路由key
rabbitTemplate.convertAndSend("exchange.direct","error",msg);
log.info("发送消息:{}",message);}}
- 主题交换机
- 模糊策略 根据路由模糊匹配
# 匹配多个单词 例如: beijing.# == beijing.queue, beijing.a.b
* 匹配单个单词 例如: beijing.* == beijing.a, beijing.b, beijing.c
- 创建主题配置类
@Configuration@Slf4jpublicclassTopicConfig{@Value("${local.exchangeName}")privateString exchangeName;@Value("${local.queueA}")privateString queueA;@Value("${local.queueB}")privateString queueB;// 定义交换机@BeanpublicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange(exchangeName).build();}// 定义队列@BeanpublicQueuequeueA(){// .durable() 方法设置队列持久化, 默认为false rabbitMQ重新启动 队列不会丢失(因为把队列从内存保存到硬盘上了)returnQueueBuilder.durable(queueA).build();}@BeanpublicQueuequeueB(){// .durable() 方法设置队列持久化, 默认为false rabbitMQ重新启动 队列不会丢失(因为把队列从内存保存到硬盘上了)returnQueueBuilder.durable(queueB).build();}// 定义绑定关系 topic绑定路由时 类似与 模糊匹配@BeanpublicBindingbingingA(TopicExchange topicExchange,Queue queueA){// *.orange.* 一个词或多个词都可以匹配returnBindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@BeanpublicBindingbingingB1(TopicExchange topicExchange,Queue queueB){// *.*.rabbit 一个词或多个词都可以匹配returnBindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");}@BeanpublicBindingbingingB2(TopicExchange topicExchange,Queue queueB){// lazy.# 一个词或多个词都可以匹配returnBindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}}
- 发送消息业务类
@Service@Slf4jpublicclassMessageService{@ResourceprivateAmqpTemplate amqpTemplate;publicvoidsend(String message){Message msg =MessageBuilder.withBody(message.getBytes()).build();
amqpTemplate.convertAndSend("exchange.topic","lazy.orange.a",msg);}}
- 头部交换机(不太经常使用) 代码如下- 创建配置类
@Component@Slf4jpublicclassHeaderConfig{@Value("${local.exchangeName}")privateString exchangeName;@Value("${local.queueA}")privateString queueA;@Value("${local.queueB}")privateString queueB;// 定义交换机@BeanpublicHeadersExchangeheaderExchange(){returnExchangeBuilder.headersExchange(exchangeName).build();}// 定义队列@BeanpublicQueuequeueA(){returnQueueBuilder.durable(queueA).build();}@BeanpublicQueuequeueB(){returnQueueBuilder.durable(queueB).build();}// 定义绑定关系@BeanpublicBindingbingingA(HeadersExchange headerExchange,Queue queueA){Map<String,Object> map =newHashMap<>();
map.put("type","m");
map.put("status",1);returnBindingBuilder.bind(queueA).to(headerExchange).whereAll(map).match();}@BeanpublicBindingbingingB(HeadersExchange headerExchange,Queue queueB){Map<String,Object> map =newHashMap<>();
map.put("type","s");
map.put("status",0);returnBindingBuilder.bind(queueB).to(headerExchange).whereAll(map).match();}}
- 发送消息业务类
@Service@Slf4jpublicclassMessageService{@ResourceprivateAmqpTemplate amqpTemplate;publicvoidsend(String message){// 创建消息属性MessageProperties messageProperties =newMessageProperties();Map<String,Object> headerMap =newHashMap<>();
headerMap.put("type","s");
headerMap.put("status",0);// 设置消息头
messageProperties.setHeaders(headerMap);Message msg =MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();// 发送消息
amqpTemplate.convertAndSend("exchange.headers","",msg);
log.info("send message success");}}
设置消息的过期时间 ttl
方式一:单条消息设置过期时间
publicvoidsendMessage(String exchange,String routingKey,String message){MessageProperties messageProperties =newMessageProperties();
messageProperties.setExpiration("10000");//设置秒10秒自动过期Message msg =MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(exchange,"",msg);
log.info("消息发送成功");}
方式二:在队列中设置过期时间
// 2.定义队列@BeanpublicQueuequeueA(){Map<String,Object> arguments =newHashMap<>();
arguments.put("x-message-ttl",60000);// 设置队列的过期时间60秒returnnewQueue("queue.fanout.a",true,false,false,arguments);}
说明:如果两种方式同时设置过期时间 则按照时间小的为准
死信队列 (DLX: Deal-Letter-Exchange)
举例: 携程给用户打电话说 抢到票了 请在10分钟后下单
当携程给用户打电话的同时,订单信息存放到4.队列中 用来供旅客支付,10分钟后消息转到死信交换机提交给死信队列 这个时候监听器监听到死信队列有消息,然后去DB判断旅客是否下单,下单直接不处理,没下单(取消订单、放票等等操作)
一:队列过期
@Configuration@Slf4jpublicclassDlxConfig{@Value("${local.exchangeNormalName}")privateString exchangeNormalName;@Value("${local.queueNormalA}")privateString queueNormalA;@Value("${local.exchangeDlxName}")privateString exchangeDlxName;@Value("${local.queueDlxA}")privateString queueDlxA;//定义直连正常交换机@BeanpublicDirectExchangeexchangeNormalName(){returnExchangeBuilder.directExchange(exchangeNormalName).build();}//定义正常队列@BeanpublicQueuequeueNormalA(){Map<String,Object> arguments =newHashMap<>();// 重点三个参数设置
arguments.put("x-message-ttl",20000);// 设置过期时间
arguments.put("x-dead-letter-exchange",exchangeDlxName);// 设置该队列的 死信交换机
arguments.put("x-dead-letter-routing-key","order");// 设置该队列的 死信路由keyreturnQueueBuilder.durable(queueNormalA).withArguments(arguments).build();}// 绑定正常交换机和正常队列@BeanpublicBindingbingingA(DirectExchange exchangeNormalName,Queue queueNormalA){returnBindingBuilder.bind(queueNormalA).to(exchangeNormalName).with("order");}//定义死信交换机@BeanpublicDirectExchangeexchangeDlxName(){returnExchangeBuilder.directExchange(exchangeDlxName).build();}//定义死信队列@BeanpublicQueuequeueDlxA(){returnQueueBuilder.durable(queueDlxA).build();}// 绑定死信交换机和死信队列@BeanpublicBindingbingingB(DirectExchange exchangeDlxName,Queue queueDlxA){returnBindingBuilder.bind(queueDlxA).to(exchangeDlxName).with("order");}}
二:单条消息过期
publicvoidsend(String message){//重点MessageProperties messageProperties =newMessageProperties();
messageProperties.setExpiration("10000");// andProperties(messageProperties)Message msg =MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
amqpTemplate.convertAndSend("normal.exchange.a","order",msg);
log.info("send message success");}
三: 队列达到最大长度时(先入队的消息会被发送到死信队列)
设置长度为10 假如有12个消息 第1个和第2个消息会被送到死信
@BeanpublicQueuequeueNormalA(){Map<String,Object> arguments =newHashMap<>();// TODO 重点三个参数 设置长度为10 假如有12个消息 第1个和第2个消息会被送到死信队列
arguments.put("x-max-length",10);// 设置队列的最大长度
arguments.put("x-dead-letter-exchange", exchangeDlxName);// 设置该队列的 死信交换机
arguments.put("x-dead-letter-routing-key","order");// 设置该队列的 死信路由keyreturnQueueBuilder.durable(queueNormalA).withArguments(arguments).build();}
消费者手动确认删除消息
一般情况:消费者消费完消息 MQ就直接删除了 如果消费者在往DB数据库中执行更新操作 没有成功,不想MQ把消息删除 就需要消费者手动删除消息了
- 首先在pom.xml中设置手动确认消息 manual
spring:application:name: dlx-receiver-exchange
rabbitmq:host: 192.168.110.178
username: admin
password:123456port:5672virtual-host: powernode #配置虚拟主机listener:simple:acknowledge-mode: manual #重点 :开启消费者手动确认删除MQ消息
通过Channel 确认消息
// 消费者手动确认删除消息 告诉MQ可以删除 参数二:false:只确认当前消息 true:批量确认 把之前的消息都确认了可以删除
channel.basicAck(deliveryTag,false);
// 消费者执行业务操作出现问题 告诉MQ的信息不要删 参数3 true:重新入队 false:不重新入队
channel.basicNack(deliveryTag,false,false);
// 消费者拒绝策略 告诉MQ我不需要此消息了 参数2: false 不可以重新入队
channel.basicReject(deliveryTag,false);
@Component@Slf4jpublicclassMessageReceive{@RabbitListener(queues ={"normal.queue.tow"})publicvoidreceive(Message message,Channel channel)throwsIOException{MessageProperties messageProperties = message.getMessageProperties();// 消息唯一标识long deliveryTag = messageProperties.getDeliveryTag();try{byte[] bytes = message.getBody();String s =newString(bytes);//TODO : 消费者业务逻辑处理 业务异常会抛出异常 业务异常会被消费者捕获�int a =1/0;
log.info("接收到的消息:{},时间:{}",s,newDate());// 消费者手动确认删除消息 false:只确认当前消息 true:批量确认 把之前的消息都确认了可以删除
channel.basicAck(deliveryTag,false);}catch(Exception e){// 消费者 出现问题 将MQ的信息不要删 参数2:可以批量处理 参数3 true:重新入队 false:不重新入队
channel.basicNack(deliveryTag,false,false);// 消费者拒绝策略 参数2: false 不可以重新入队
channel.basicReject(deliveryTag,false);
log.error("消费者出现问题:{}",e.getMessage());thrownewRuntimeException(e);}}}
版权归原作者 走楠闯北 所有, 如有侵权,请联系我们删除。