同步通讯: 类似于两个人用手机打电话,需要实时响应(实时)
优点:
- 实时性强,可以立即得到结果
缺点:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败的问题
异步通讯:类似于两个人发微信消息、发短信,不需要马上回复(不是实时的)
好处:
- 吞吐量提升
- 故障隔离
- 调用时不会阻塞,不会造成无效的资源占用
- 耦合度极低
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
坏处:
- 结构变复杂,业务没有明显流程, 不好管理
- 需要依赖于Broker的可靠、安全、性能
注:同步与异步通讯有各自的使用场景
几种常见MQ的对比:
ActiveMQRabbitMQRocketMQ****Kafka公司/社区
Apache
Rabbit阿里Apache开发语言JavaErlangJavaScala&Java协议支持OpenWire,STOMP,REST,XMPP,AMQPAMQP,XMPP,SMTP,STOMP自定义私有协议自定义私有协议可用性一般高高高单机吞吐量差一般高非常高消息延迟毫秒级微秒级毫秒级毫秒以内消息可靠性一般高高一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
下载镜像
#在线拉取
docker pull rabbitmq:3.8-management
安装RabbitMQ:
#举例 运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins9:/plugins \
--name mq9 \
--hostname mq9 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
运行启动后:访问 页面 如192.168.136.128:15672 进入RabbitMQ控制页面:
MQ的基本结构:
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
RabbitMQ消息模型(5种):
简单模型
- 基本消息队列 BasicQueue :有一个消费者
- 工作消息队列 WorkQueue : 有多个消费者
发布订阅模型
- 广播(Fanout Exchange) : 交换机把消息发送给所有队列
- 路由(Direct Exchange) : 交换机根据路由匹配将消息发送给匹配的队列
- 主题(Topic Exchange) : 支持通配符的路由
什么是SpringAMQP?
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp 的官方地址:Spring AMQP
SpringAMQP的功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
SpringAMQP的实现:
1、BasicQueue简单队列模型:
在pom文件中导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息发送:
首先在yml配置文件中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
在发送服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收:
在配置文件中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
在消费者服务中,新建一个类:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
最后运行开启消费者服务,调用消息发送的测试类发送消息,进行测试;
2、WorkQueue
消息发送:
我们在测试类中编写一个测试方法,通过循环向队列不停的发送消息,模拟消息堆积:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消息接收:
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20); //模拟任务耗时
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200); //模拟任务耗时
}
这种默认的情况 交换机会将消息平均分配给每个队列,消费者处理能力不统一,导致资源浪费;
我们在配置文件中添加配置 实现“能者多劳”(通过设置prefetch来控制消费者预取的消息数量)
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 消费者处理完一条消息后交换机才会发送下一条数据
3、什么是发布订阅模型?
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:- Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
注:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
4、Fanout广播模式:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
Fanout的实现:
实现如图效果:一台交换机绑定两个消费队列
消息发送:
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收: 添加两个消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "itcast.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue2"),
exchange = @Exchange(name = "itcast.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
5、Direct
在Fanout的基础上需要指定一个
RoutingKey
(路由key)
消息发送:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
消息发送:
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello, direct";
rabbitTemplate.convertAndSend(exchangeName, "red", message); //两个队列都有red的key所有都能接收到消息
//rabbitTemplate.convertAndSend(exchangeName, "blue", message); //只有队列1能接收到消息
//rabbitTemplate.convertAndSend(exchangeName, "yellow", message); //队列2接收到消息
}
6、Topic
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个单词
*
:匹配不多不少恰好1个单词
举例:
item.#
:能够匹配
item.spu.insert
或者
item.spu
item.*
:只能匹配
item.spu
如图:
消息接收样例:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。
JDK序列化存在的问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
通过配置JSON转换器可以让消息的体积更小,可读性更高;
1)配置消息转换器 --在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2)在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
注: 配置完JSON转换器之后,JDK序列化就不会生效了
版权归原作者 小小莴苣 所有, 如有侵权,请联系我们删除。