MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。
几种常见MQ的对比:
RabbitMQActiveMQRocketMQ****Kafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScala&Java协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
MQ解决什么问题
MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具,有以下几种主要的作用:
异步处理:用户注册后,发送注册邮件和注册短信。用户注册完成后,提交任务到 MQ,发送模块并行获取 MQ 中的任务。
系统解耦:比如用注册完成,再加一个发送微信通知。只需要新增发送微信消息模块,从 MQ 中读取任务,发送消息即可。无需改动注册模块的代码,这样注册模块与发送模块通过 MQ 解耦。
流量削峰:秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。
日志处理:日志采集方收集日志写入 kafka 的消息队列中,处理方订阅并消费 kafka 队列中的日志数据。
消息通讯:点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。
今天介绍RabbitMQ
RabbitMQ是MQ消息队列的一种,我们一般使用的是Spring集合后的SpringAMQP.
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
AMQP 是一种高级消息队列协议.而SpringAMQ是基于AMQP协议制订的一套api规范,提供了模范来发送和接收消息.
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系 (绑关系)
- 基于注解的监听器模式,异步接收消息 (接收消息)
- 封装了RabbitTemplate工具,用于发送消息 (发送消息)
SpringAMQP的简单使用步骤:
1.在父工程中引入依赖,(依赖中包含了RabbitMQ的依赖)
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.编写消息发送方和消息接收方的配置文件.这里是让rabbitMQ服务端和消息的发送方和接收方建立了联系
spring:rabbitmq:host: 192.168.150.100 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: username # 用户名password:1234# 密码
3.在消息的接收方,新建一个类用来监听发送方发出的消息.
@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{System.out.println("spring 消费者接收到消息:【"+ msg +"】");}}
注意这个类是要交给Spring管理的所以加上@Component.监听的动作交给Spring,如果监听到这个队列中有消息,就会接收到,不用我们自己进行任何的操作
4.在消息的发送方,使用RabbitTemplet来发送消息到消息队列
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
rabbitTemplate.convertAndSend(queueName, message);}}
这是个简单的发送和接收的例子.
完整的消息的发送涉及到的对象及步骤,消息生产者Producer首先和RabbitMQ服务器建立连接,获取通道channel, 然后生产者发送消息给指定的虚拟机中的交换机,交换机交换机根据消息的routingKey将消息路由(转发)给指定的队列.然后消费者Consumer首先也是和Rabbit建立连接,获取通道channel,然后消费者监听指定的队列,如果监听的队列Queue中有消息了,就可以从消息队列中获取到Producer发送的的\消息了.
完整架构如下所示:
发布/订阅
发布订阅的模型如图:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型: - Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
具体的实现流程:
- 导入依赖 作用:引入一个Rabbit服务器,
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 编写消息发送者和消息接收者(消费者)的yml配置文件 作用: 和引入的Rabbit服务器建立联系,获取通道channel (Spring自动完成)
spring:rabbitmq:host: 192.168.100.100 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: usname # 用户名password:123456# 密码
- 在消息的接收方这边创建监听. (通过注解@RabbitListener 声明出队列和交换机)
@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="direct.queue2"), exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT), key ={"red","blue"}//队列的标签key 与交换机的key做匹配))
- 在消息生产者这边引用RabbitTemplate对象发送消息(Spring管理着这个RabbitTemplate的Bean,自动注入就可以使用)给交换机
- 交换机是消费者的接收方声明的.发送发也可以声明,这里有常用的几种类型的交换机Exchange:常用以下3种类型:- Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- 交换机根据不同的routingkey把消息转发给匹配的消息队列,消费者监听者监听的通道中有了交换机转发的消息就获取消息.描述下Direct交换机与Topic交换机的差异?- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割- Topic交换机与队列绑定时的bindingKey可以指定通配符-#
:代表0个或多个词-*
:代表1个词这里边消息的发送存在一个消息转化的问题,Spring使用的是jdk的消息序列化器,会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。DK序列化存在下列问题:- 数据体积过大- 有安全漏洞- 可读性差JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。具体的实现 就是导入依赖在消息接收的和发送的都需要导入,一个序列化,一个反序列化<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
在消息接收的和发送的启动类中添加一个Bean即可,Spring启动的时候,发现有这个bean就不创建这个了,直接管理这个bean@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}
在spring中有一个简单的配置,可以解决多个消息接收器处理能力不同的的情况下,我们想让能力强的多处理,也就是处理完了消息就从消息对列中拿消息去处理,能力少的根据能力处理多少.默认是多个消息处理器轮询分发消息,平均每个处理器获取到的消息是一样的,但是我们可以通过设置prefetch来控制消费者预取的消息数量,从而达到能者多劳的效果rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息
学习随笔记录>>>>有不对.欢迎指教
版权归原作者 洛水|天依 所有, 如有侵权,请联系我们删除。