RabbitMQ的作用
**Message queue ****释义 **
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
问题思考
假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:
- 消息通知系统:通知商家,你有一笔新的订单,请及时发货
- 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
- 会员系统:更新用户的积分和等级信息
createOrder(...) {
// 完成订单服务
doCreateOrder(...);
// 调用其他服务接口
sendMsg(...);
updateUserInterestedGoods(...);
updateMemberCreditInfo(...);
}
存在的问题
- 过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,再追加一行代码
- 缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这时更新会员信息就会失败,我们需要一个地方,来暂时存放无法被消费的消息
优化方案
我们需要一个消息中间件,来实现解耦和缓冲的功能.
Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
Message Queue:消息队列,用于存储还未被消费者消费的消息.
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内
容.
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.
案例分析
小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走.久而久之,两人都觉得麻烦.
后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看.
书架就是一个消息队列,小红是生产者,小明是消费者
带来的好处
小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了.这样小红小明的时间都更自由.
小红相信小明的读书自觉和读书能力,不必亲眼观察小明的读书过程,小红只要做一个放书的动作,很节省时间.
当明天有另一个爱读书的小伙伴小强加入,小红仍旧只需要把书放到书架上,小明和小强从书架上取书即可
书架上的书放在那里,小明阅读速度快就早点看完,阅读速度慢就晚点看完,没关系,比起小红把书递给小明并监督小明读完的方式,小明的压力会小一些.
消息队列特点
解耦:每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系.
提速:小红选只要做一个放书的动作,为自己节省了大量时间.
广播:小红只需要劳动一次,就可以让多个小伙伴有书可读,这大大地节省了她的时间,也让新的小伙伴的加入成本很低.
错峰与流控:小红给书的频率不稳定,如果今明两天连给了五本,之后隔三个月才又给一本,那小明只要在三个月内从书架上陆续取走五本书读完就行了,压力就不那么大了.
Email邮件案例分析
有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题.
例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况.
导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了.面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请
求,不会影响用户使用其他功能
Docker安装部署RabbitMQ
1.下拉镜像
docker pull rabbitmq:management
注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
2.运行RabbitMQ
docker run -itd \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码
3.打开防火墙端口号并重新运行防火墙
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
4.容器启动后,可以通过 docker logs 容器 查看日志
docker logs my-rabbitmq
5.通过主机网址进入管理后台
虚拟机ip地址/15672
6.通过刚才填写的用户名和密码登录 admin
springboot连接配置
1.配置spring账号
切记需要授权
2.创建两个springboot项目publisher consumer 选择依赖
3.配置yml文件
#publisher
server:
port: 8888
spring:
rabbitmq:
host: 192.168.241.130
username: spring
password: 123456
port: 5672
virtual-host: my_vhost
#consumer
server:
port: 9999
spring:
rabbitmq:
host: 192.168.241.130
username: spring
password: 123456
port: 5672
virtual-host: my_vhost
4.生产者配置类
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("firstQueue");
}
@Bean
public Queue secondQueue() {
return new Queue("secondQueue");
}
}
5.生成者测试类
public class TestController {
@Autowired
private AmqpTemplate template;
@Autowired
private ObjectMapper objectMapper;
@RequestMapping("/send1")
public String send1(){
//向消息队列发送消息
template.convertAndSend("firstQueue","hello world");
return "🤣";
}
@RequestMapping("/send2")
public String send2() throws Exception{
User jack = new User("jack", "123");
String json = objectMapper.writeValueAsString(jack);
//向消息队列发送消息
template.convertAndSend("secondQueue",jack);
return "🤣";
}
}
因为消息队列支持的对象传参必须consumer 和 publisher 两个项目的pojo包路径完全一致所以使用:
@Autowired
private ObjectMapper objectMapper;
User jack = new User("jack", "123");
记得抛出异常 throws Exception 不然会报错
6.消费者接受信息
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {
@RabbitHandler
public void process(String msg) {
log.warn("接收到:" + msg);
}
}
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "secondQueue")
public class PojoReceiver {
@Autowired
private ObjectMapper objectMapper;
@RabbitHandler
public void process(String json) throws Exception{
User user=objectMapper.readValue(json,User.class);
log.warn("接收到:" + json);
}
}
版权归原作者 暴躁小段额 所有, 如有侵权,请联系我们删除。