前言
我们为什么要用消息队列技术?
有些复杂的业务系统,一次用户请求可能会同步调用N个系统的接口,需要等待所有的接口都返回了,才能真正的获取执行结果。
这样就导致了:
1.系统之间耦合性太高,如果调用的任何一个子系统出现异常,整个请求都会异常,对系统的稳定性非常不利
2.这种同步接口调用的方式总耗时比较长,非常影响用户的体验,特别是在网络不稳定的情况下,极容易出现接口超时问题
3.果用户突增,一时间所有的请求都到数据库,可能会导致数据库无法承受这么大的压力,响应变慢或者直接挂掉。
一、都有什么消息队列,各有什么特色?
1.ActiveMQ:它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
2.Kafka:主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
3.RocketMQ:RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
4.RabbitMQ:RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
二、安装RabbitMQ(Linux环境)
官方教程,由此进入
1.使用docker引入RabbitMQ镜像
docker pull rabbitmq:3-management
:3-management 是指定的版本,如果不写默认引入最新版
我们已经成功引入了镜像
2.启动rabbitmq容器,
镜像只是相当于硬盘上的应用文件,容器则是相当于运行的应用程序
执行如下代码:
docker run \
-e RABBITMQ_DEFAULT_USER=whr \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
解释下各项配置参数:
RABBITMQ_DEFAULT_USER=whr 是设置rabbitmq的默认用户名为whr
RABBITMQ_DEFAULT_PASS=123456 是设置abbitmq的默认用户密码为123456
--name mq 容器名字
--hostname mq1 主机名,集群部署要用
-p 15672:15672 UI界面端口
-p 5672:5672 连接端口 这两个都是开放端口-d 后台运行
(ps: / 是换行符)
3.访问UI界面
上面我们完成了启动,可以去UI界面看看 访问方式就是 ip:端口号
输入上面配置的用户名,密码 即可登录
原版网页是英文的,我这里翻译为了中文
这样我们的Rabbit就已经完成安装和启动了
三、springbot整合RabbitMq
1. 引入所需依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写配置文件
在application.yml配置文件中 载入如下配置:
spring:
rabbitmq:
host: 47.96.73.173 #ip
port: 5672 #连接端口
username: whr #用户名
password: 123456 #密码
virtual-host: / #虚拟主机默认是 '/'
listener:
simple:
prefetch: 1 #设置预取消息上限
这样整合就完成了,在正式开始使用之前我们先来了解下rabbitmq的组件和模式
四、RabbitMq的组成
1.Queue
队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列
2.Exchange
交换机,用来实现消息的路由
3.channels
通道,建立连接后,会形成通道,消息的投递获取依赖通道
4.connections
无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费
五、RabbitMq常用模式
1.简单模式
1个生产者 + 1个队列 + 1个消费者;
生产者代码:
@Autowired
private RabbitTemplate rabbitTemplate;
//simple
@Test
public void sendMsg_1(){
String queueName = "simple.queue";
String msg = "spring amqp 666";
rabbitTemplate.convertAndSend(queueName, msg);
}
消费者代码:
@Component
@Slf4j
public class SoringRabbitListener {
//simple
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue (String msg){
log.info("收到消息:{}", msg);
}
}
运行结果:
2.工作者模式
1个生产者 + 1个队列 + 多个消费者,一条消息只能被消费一次;
生产者代码:
@Test
public void sendMsg_2() throws InterruptedException {
String queueName = "simple.queue";
String msg = "spring amqp 666 ";
int i = 0;
while (i < 50){
rabbitTemplate.convertAndSend(queueName, "", msg + i);
Thread.sleep(20);
i++;
}
}
消费者代码:
// work
@Component
@Slf4j
public class SoringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue_1 (String msg) throws InterruptedException {
log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue_2 (String msg) throws InterruptedException {
log.error("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
Thread.sleep(100);
}
}
运行结果:
3.订阅消息模型之 fanout
个生产者 + 1个交换机 + 多个队列 + 多个消费者,一条消息可以被多个消费者消费;
生产者:
@Test
public void sendToFanoutExchange(){
String fanoutExchangeName = "itcast.fanout";
String msg = "spring amqp 666";
rabbitTemplate.convertAndSend(fanoutExchangeName,"", msg);
}
消费者:
@Component
@Slf4j
public class SoringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "whr.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1 (String msg){
log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue2"),
exchange = @Exchange(name = "whr.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2 (String msg){
log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
}
运行结果:
4.订阅消息模型之durect
1个生产者 + 1个交换机 + 多个队列 + 多个消费者 ,routingKey ,一条消息发送给符合 routingKey 的队列
生产者:
@Test
public void sendToDirectExchange(){
String fanoutExchangeName = "whr.direct";
String msg1 = "此消息发给category";
String msg2 = "此消息发给goods";
String msg3 = "此消息发给flavor";
rabbitTemplate.convertAndSend(fanoutExchangeName,"category", msg1);
rabbitTemplate.convertAndSend(fanoutExchangeName,"goods", msg2);
rabbitTemplate.convertAndSend(fanoutExchangeName,"flavor", msg3);
}
消费者:
@Component
@Slf4j
public class SoringRabbitListener {
//路由 direct
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "whr.direct"),
key = {"goods", "category"}
))
public void listenDirectQueue1 (String msg){
log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "whr.direct"),
key = {"flavor", "category"}
))
public void listenDirectQueue2 (String msg){
log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
}
结果:
5.订阅消息模型之topic
通配符,
#
:匹配一个或者多个
*
:一个词;
生产者:
@Test
public void sendToTopicExchange(){
String fanoutExchangeName = "whr.topic";
String msg1 = "此消息发给至:com.kk";
rabbitTemplate.convertAndSend(fanoutExchangeName,"com.kk", msg1);
}
消费者:
@Component
@Slf4j
public class SoringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "whr.topic", type = ExchangeTypes.TOPIC),
key = "com.#"
))
public void listenTopicQueue1 (String msg){
log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "whr.topic", type = ExchangeTypes.TOPIC),
key = "#.kk"
))
public void listenTopicQueue2 (String msg){
log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
}
}
结果:
结束语
以上就是RabbbitMq的基本使用了,新人作者,还有不足,欢迎大佬多多补充,多多关照哈~拜拜↑拜拜↓拜拜↑
版权归原作者 老公525 所有, 如有侵权,请联系我们删除。