0


Rabbitmq的使用

文章目录

mq作为常用的中间件,是被频繁使用的。根据技术选型有kafka,rabbitmq等等,如何进行技术选型是要根据实际情况来讲,可以参考
mq的技术选型。当前我们仅说rabbitmq。Rabbit应用在金融行业比较多,实现了
AMQP协议,是因为它的可靠性比较好。对于mq呢 我们就关注于如何接收消息,如何发送消息,消息是否会丢失。。。

MQ的优点:

1.应用解耦,在现在的开发场景中,微服务的开发限定了当前业务的范围,当你想调用别的服务的功能时可以使用MQ消息进行解耦,比如说我们的开户操作,需要扣减金额,而金额操作这个依赖于别人的api,所以我们可以通过调用rest接口来调用别人的api来执行操作,也可以把这个订单扣减信息告诉别人,让他们自己去实现扣减金额。就是我们就是把订单和扣减金额发送到mq,mq有消息了对方就会执行这个操作。两个操作是解耦的。
2.削峰限流 就是面对大的数据量来临时,我们可以指定这个消费者的速度,将消息堆积在mq中慢慢执行–针对时延性不高的产品
3异步 比如说发邮件这个操作,你可以完成以后发送给mq,消费者去执行操作,你直接进行下一步。

如何整合rabbitmq

引入starter

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>

使用RabbitTemplate ,

@Component
public class Consumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void testSend(String message){
       /*
       参数
           1.交换机(名),String
           2.routing key,String
           3.内容,String
       */
        rabbitTemplate.convertAndSend(MqConfig.EXCHANGE_NAME,"boot.send",message);
    }
}

之前我们讲过starter的逻辑,可以看看这个mq的starter的源码。

首先是spring的自动装配找到这个配置
然后找到这个配置类 配置类里面有rabbitTemplate对象可以初始化。
在这里插入图片描述

我写了一个demo 可以参考一下

https://gitee.com/sunweibo5/demo/tree/master/src/main/java/com/example/demo/service/rabbitMq

基本的概念

Producer:消息的生产者 一个向交换机发布消息的客户端应用程序,如下我封装了一个生产者发送消息的过程。其实就是引入RabbitTemplate发送消息。
Exchange : 下图中的EXCHANGE_NAME,这个和生产者绑定,生产者将消息发送给交换机。

public class Consumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void testSend(String message){
       /*
       参数
           1.交换机(名),String
           2.routing key,String
           3.内容,String
       */
        rabbitTemplate.convertAndSend(MqConfig.EXCHANGE_NAME,"boot.send",message);
        
    }
}

Consumer:消息的消费者 如下就是从boot_queue这个queue中获取消息。
queue :队列,消息是从由交换机根据规则发送给队列的,然后队列将消息发送给消费者。

@Component
public class rabbitListener {
    //监听的队列名
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(String message){
        //这里拿到的message就是队列中取出的数据,可任意操作,此处只是示范用来打印
        System.out.println("这是监听 mq 得到的消息");
        System.out.println(message);
    }
}

Binding :绑定 消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中。决定了从交换机到队列的规则

Connection: 生产者/消费者和RabbitMQ服务器之间建立的TCP连接。这个比较好理解,就是我们连接服务器的方式
Channel: 是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。Channel

我们通过分析mq的源码,上图的convertAndSend的底层就是这个。

public void send(final String exchange, final String routingKey,
            final Message message, @Nullable final CorrelationData correlationData)
            throws AmqpException {
        execute(channel -> {
            doSend(channel, exchange, routingKey, message,
                    (RabbitTemplate.this.returnsCallback != null
                            || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                            && isMandatoryFor(message),
                    correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    /**
     * Send the given message to the specified exchange.
     *
     * @param channel The RabbitMQ Channel to operate within.  发送的时候需要指定channel 
     * @param exchangeArg The name of the RabbitMQ exchange to send to.交换机名称
     * @param routingKeyArg The routing key.  其实就是Binding 的实现
     * @param message The Message to send.  消息体
     * @param mandatory The mandatory flag. mandatory 标志用于指示当发布的消息无法被直接路由到队列时,是否将消息返回给发布者
     * @param correlationData The correlation data.
     * @throws IOException If thrown by RabbitMQ API methods.
     */
    public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
            boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

Broker :消息队列服务器实体。即RabbitMQ服务器
Virtual Host:虚拟主机 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。一个用户下有多个host,可以给不同的host发消息
在这里插入图片描述

mq的工作模式

其实就是mq的收发消息的模式,比如说我们想了解一个consumer是否可以同时接收多条消息,一个消息是被接收一次还是多次?一个producer是否是否可以生产多条信息等等
特别具体的看看这个,我只是简单介绍 mq的工作模式
普通队列模式 只指定队列 一对一的形式

rabbitTemplate.convertAndSend(MqConfig.QUEUE_NAME,message);

public class EasyrabbitListener {
    //监听的队列名
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(String message){
        //这里拿到的message就是队列中取出的数据,可任意操作,此处只是示范用来打印
        System.out.println("这是第一个监听 mq 得到的消息");
        System.out.println(message);
    }
}

工作队列模式
多个消费者监听同一个queue就是工作队列模式

发布/订阅模式
Routing路由模式
Topic 主题模式

附 demo

https://gitee.com/sunweibo5/demo/tree/master/src/main/java/com/example/demo/service/rabbitMq/five

消息的可靠传输?如果消息丢了怎么办?

如何保证消息的顺序执行

消息的幂等性

参考这个MQ消息的幂等

时延队列是什么?

基于我浅薄的认知 mq还要学好多东西 晚点更新

基于mq的分布式事务

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_41949481/article/details/135619429
版权归原作者 qq_41949481 所有, 如有侵权,请联系我们删除。

“Rabbitmq的使用”的评论:

还没有评论