文章目录
mq作为常用的中间件,是被频繁使用的。根据技术选型有kafka,rabbitmq等等,如何进行技术选型是要根据实际情况来讲,可以参考
mq的技术选型。当前我们仅说rabbitmq。Rabbit应用在金融行业比较多,实现了
AMQP协议,是因为它的可靠性比较好。对于mq呢 我们就关注于如何接收消息,如何发送消息,消息是否会丢失。。。
MQ的优点:
1.应用解耦,在现在的开发场景中,微服务的开发限定了当前业务的范围,当你想调用别的服务的功能时可以使用MQ消息进行解耦,比如说我们的开户操作,需要扣减金额,而金额操作这个依赖于别人的api,所以我们可以通过调用rest接口来调用别人的api来执行操作,也可以把这个订单扣减信息告诉别人,让他们自己去实现扣减金额。就是我们就是把订单和扣减金额发送到mq,mq有消息了对方就会执行这个操作。两个操作是解耦的。
2.削峰限流 就是面对大的数据量来临时,我们可以指定这个消费者的速度,将消息堆积在mq中慢慢执行–针对时延性不高的产品
3异步 比如说发邮件这个操作,你可以完成以后发送给mq,消费者去执行操作,你直接进行下一步。
如何整合rabbitmq
引入starter
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
使用RabbitTemplate ,
@Component
public class Consumer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void testSend(String message){
/*
参数
1.交换机(名),String
2.routing key,String
3.内容,String
*/
rabbitTemplate.convertAndSend(MqConfig.EXCHANGE_NAME,"boot.send",message);
}
}
之前我们讲过starter的逻辑,可以看看这个mq的starter的源码。
然后找到这个配置类 配置类里面有rabbitTemplate对象可以初始化。
我写了一个demo 可以参考一下
https://gitee.com/sunweibo5/demo/tree/master/src/main/java/com/example/demo/service/rabbitMq
基本的概念
Producer:消息的生产者 一个向交换机发布消息的客户端应用程序,如下我封装了一个生产者发送消息的过程。其实就是引入RabbitTemplate发送消息。
Exchange : 下图中的EXCHANGE_NAME,这个和生产者绑定,生产者将消息发送给交换机。
public class Consumer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void testSend(String message){
/*
参数
1.交换机(名),String
2.routing key,String
3.内容,String
*/
rabbitTemplate.convertAndSend(MqConfig.EXCHANGE_NAME,"boot.send",message);
}
}
Consumer:消息的消费者 如下就是从boot_queue这个queue中获取消息。
queue :队列,消息是从由交换机根据规则发送给队列的,然后队列将消息发送给消费者。
@Component
public class rabbitListener {
//监听的队列名
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(String message){
//这里拿到的message就是队列中取出的数据,可任意操作,此处只是示范用来打印
System.out.println("这是监听 mq 得到的消息");
System.out.println(message);
}
}
Binding :绑定 消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中。决定了从交换机到队列的规则
Connection: 生产者/消费者和RabbitMQ服务器之间建立的TCP连接。这个比较好理解,就是我们连接服务器的方式
Channel: 是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。Channel
我们通过分析mq的源码,上图的convertAndSend的底层就是这个。
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {
doSend(channel, exchange, routingKey, message,
(RabbitTemplate.this.returnsCallback != null
|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
&& isMandatoryFor(message),
correlationData);
return null;
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}
/**
* Send the given message to the specified exchange.
*
* @param channel The RabbitMQ Channel to operate within. 发送的时候需要指定channel
* @param exchangeArg The name of the RabbitMQ exchange to send to.交换机名称
* @param routingKeyArg The routing key. 其实就是Binding 的实现
* @param message The Message to send. 消息体
* @param mandatory The mandatory flag. mandatory 标志用于指示当发布的消息无法被直接路由到队列时,是否将消息返回给发布者
* @param correlationData The correlation data.
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
Broker :消息队列服务器实体。即RabbitMQ服务器
Virtual Host:虚拟主机 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。一个用户下有多个host,可以给不同的host发消息
mq的工作模式
其实就是mq的收发消息的模式,比如说我们想了解一个consumer是否可以同时接收多条消息,一个消息是被接收一次还是多次?一个producer是否是否可以生产多条信息等等
特别具体的看看这个,我只是简单介绍 mq的工作模式
普通队列模式 只指定队列 一对一的形式
rabbitTemplate.convertAndSend(MqConfig.QUEUE_NAME,message);
public class EasyrabbitListener {
//监听的队列名
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(String message){
//这里拿到的message就是队列中取出的数据,可任意操作,此处只是示范用来打印
System.out.println("这是第一个监听 mq 得到的消息");
System.out.println(message);
}
}
工作队列模式
多个消费者监听同一个queue就是工作队列模式
发布/订阅模式
Routing路由模式
Topic 主题模式
附 demo
https://gitee.com/sunweibo5/demo/tree/master/src/main/java/com/example/demo/service/rabbitMq/five
消息的可靠传输?如果消息丢了怎么办?
如何保证消息的顺序执行
消息的幂等性
参考这个MQ消息的幂等
时延队列是什么?
基于我浅薄的认知 mq还要学好多东西 晚点更新
基于mq的分布式事务
版权归原作者 qq_41949481 所有, 如有侵权,请联系我们删除。