在讲述MQ之前我们先了解一下一些简单概念。
同步调用:比如打电话。优点:时效性很强。
支付服务要调用别的服务,调用了订单服务,在调用仓储服务,在以此调用别的,时间长。
服务流程如下:
缺点:
1.耦合度高。
2.性能下降。
3.资源浪费。
4.级联失败
异步调用:就比如微信发消息,可以和多个人发消息。
服务流程如下:
优点:1.服务解耦(比如支付之后,不发短信提醒了,不用修改代码,直接取消短信服务的订阅即可)
2.性能提高,吞吐量提高。
3.故障隔离,不担心级联失败。
4.流量削峰。
缺点:
1.对Broker的依赖性太强了。
2.架构复杂,业务没有明显的流程,不好管理
MQ:消息队列(MessageQueue),就是事件驱动架构中的Broker。主要用于分布式系统中进行异步任务调度和数据交换。
MQ产品主要有一下四种:
1.RabbitMQ
2.ActiveMQ
3.RocketMQ
4.Kafka
主要的区别如下:
RabbitMQ在Linux下的安装:
1.拉去容器
docker pull rabbitmq:management
2.创建镜像,并配置用户名和密码。(管理界面端口防火墙记得开发)
docker run -d --name my-rabbitmq \ 2 -p 5672:5672 \ # AMQP协议端口 3 -p 15672:15672 \ # 管理界面端口 4 -e RABBITMQ_DEFAULT_USER=admin \ 5 -e RABBITMQ_DEFAULT_PASS=admin \ 6 rabbitmq:management
3.进入管理页面(页面如下)
输入网址:[IP地址:管理界面端口]
成功进入之后,我们看一下上面这6列:
1.Overview:总览。主要展示的是
MQ
的概要信息 。
2.Connections:连接。无论消费者还是生产者都要建立连接。
3.Channels:通道。 消费者,生产者都要创建通道来执行事务。
4.Exchanges:交换机。消息的路由器,把消息路由到队列。
5.Queues:队列。做消息的存储。
6.Admin:管理。创建用户,创建虚拟主机。
MQ的大致架构如下:
AMQP:
(Advanced Message Queuing Protocol)是高级消息队列协议的缩写,它是一个开放标准的应用层协议,主要设计用于分布式系统之间高效、可靠地传输消息。
AMQP不是某个具体的软件产品或服务,而是一种通用的标准接口,任何遵循AMQP协议的软件系统都可以实现相互之间的互联互通,无论它们是由何种编程语言编写,运行在什么操作系统之上。
SpingAMQP:
Spring AMQP(Spring Advanced Messaging Queuing Platform)是Spring框架的一部分,它是为简化在Java应用程序中使用高级消息队列协议(AMQP)而设计的一个抽象和便捷工具集。
简而言之,Spring AMQP的目标是让开发者更易于在Spring应用程序中使用消息队列服务,降低消息驱动架构的复杂性,提高开发效率。
SpringAMQP的基本使用:
1.基于简单队列的使用
生成者模块:
1.导入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置application.yml:(服务器防火墙端口记得开发嗷)
spring:
rabbitmq:
host: 43.138.67.251 # rabbitMQ的ip地址
port: 5672 # 端口
username: root
password: 123456
virtual-host: /
3.用RabbitTemplate发送消息:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String routingKey = "simple.queues";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(routingKey, message);
}
消费者模块:
也是导入依赖,配置yml。配置一个Bean监听队列,spring项目启动时,加载Bean对队列进行监听,消息阅后即焚。
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple")
public void listenSimplequeue(String msg) {
System.out.println("接收到消息:"+msg);
}
}
2.WorkQueue:工作消息队列
和上面的简单队列使用相比,添加多个消费者。
发布订阅模式:
了解一下交换机Exchange:
交换机的作用是把消息按照一定的规则路由给队列,交换机不能缓存消息,路由失败,消息丢失。
下面我们介绍几种常用的交换机:
1.Fanout Exchange:会将接受到的消息路由到每一个绑定的队列queue。
Fanout Exchange的使用:
先把交换机和队列绑定起来
@Configuration
public class FanoutConfig {
//交换机 fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
//队列1 fanout1.queue
@Bean
public Queue fanoutqueue1(){
return new Queue("fanout1.queue");
}
//队列2 fanout2.queue
@Bean
public Queue fanoutqueue2(){
return new Queue("fanout2.queue");
}
//队列1绑定到交换机
@Bean
public Binding bind1(Queue fanoutqueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);
}
//队列2绑定到交换机
@Bean
public Binding bind2(Queue fanoutqueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);
}
}
消费者端监听消息:
//Fanout1
@RabbitListener(queues = "fanout1.queue")
public void FanoutexchengeQueue1(String msg){
System.err.println("fanout1.queue接收到消息:["+msg+"]");
}
//Fanout2
@RabbitListener(queues = "fanout2.queue")
public void FanoutexchengeQueue2(String msg){
System.err.println("fanout1.queue接收到消息:["+msg+"]");
}
生产者:
//Fanout交换机
@Test
public void senFanoutExchange() {
String exchangename = "fanout.exchange";
String msg = "Hello FanoutExchange";
rabbitTemplate.convertAndSend(exchangename,"",msg);
}
2.Direct Exchange:将接收到的消息,按照一定规则路由到指定的Queue。
Direct Exchange基本使用:
@RabbitListener里面指定参数会生成并绑定交换机和queue。每一个队列和交换机绑定时有一个BindingKye,生成者发送消息时带一个RoutingKey,交换机只把消息发到BindingKey和RoutingKey一致的queue。
//Direct1
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct1.queue"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenerDirectExchange1(String msg) {
System.err.println("direct1.queue接收到消息:["+msg+"]");
}
//Direct2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct2.queue"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenerDirectExchange2(String msg) {
System.err.println("direct2.queue接收到消息:["+msg+"]");
}
生成者:
//Direct交换机
@Test
public void senDirectExchange() {
String exchangename = "direct.exchange";
String msg = "Hello DirectExchange";
rabbitTemplate.convertAndSend(exchangename,"blue",msg);
}
3.Topic Exchange:类似Direct Exchange,区别是生产者发布消息时带的RoutingKey必须是多个单词的列表,并以**. ** 分割
Topic Exchange的使用:
消费者:
//Topic1
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic1.queue"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicExchange1(String msg) {
System.err.println("topic1.queue接收到消息:["+msg+"]");
}
//Topic2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic2.queue"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicExchange2(String msg) {
System.err.println("topic2.queue接收到消息:["+msg+"]");
}
生产者:
//Topic交换机
@Test
public void senTopicExchange() {
String exchangename = "topic.exchange";
String msg = "Hello TopicExchange";
rabbitTemplate.convertAndSend(exchangename,"aa.news",msg);
}
SpringAMQP的消息转化器:
SpringAMQP发消息到RabbitMQ,会将消息序列化,用的是JDK序列化,这种方式性能比较低下。
并且为了将Java对象转换为AMQP消息,以及将AMQP消息的内容反序列化回Java对象,通常我们会使用
所以我们要设置一个消息转换器,换一种序列化方式。
版权归原作者 花载酒779 所有, 如有侵权,请联系我们删除。