0


RabbitMQ从入门到精通(详细)

消息中间件的三大核心作用

异步处理

同步方式消耗150ms

异步方式消耗(等待注册邮件和注册短信中耗时最久的服务执行完响应给用户)100ms

耗时55ms,将注册邮件和注册短信服务写入消息,无需关注这两个服务,需要的时候去消息里面拿就行。

应用解耦

如果库存系统升级,订单系统业务跟着升级

订单系统只需写入消息,库存系统去订阅就行

流量控制

中间件概述

两种消息服务的对比

JMS(Java Message Service)

AMQP(Advanced Message Queuing Protocol)

定义

Java api

网络线级协议

跨语言

跨平台

Model

提供两种消息模型:

、Peer-2-Peer

、Pub/sub

提供了五种消息模型:

、direct exchange

、fanout exchange

、topic change

、headers exchange

、system exchange

本质来讲,后四种和JMS的pub/sub模型没有太大差别 仅是在路由机制上做了更详细的划分;

支持消息类 型

多种消息类型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage

Message (只有消息头和属性)

byte[]

当实际应用时,有复杂的消息,可以将消息序列化后发 送。

综合评价

JMS 定义了JAVA API层面的标准;在java体系中, 多个client均可以通过JMS进行交互,不需要应用修 改代码,但是其对跨平台的支持较差;

AMQP定义了wire-level层的协议标准;天然具有跨平 台、跨语言特性。

RabbitMQ

概念

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。路由键是有队列设置的,用来和交换见建立绑定关系的,在点对点中必须指定路由键,发布订阅模式可以不用指定。生成者发送消息的交换机需要携带路由键,在点对点模式中携带的路由键和交换机绑定的key相同时才会在相应队列获取到消息,发布订阅模式,如果队列没有指定路由键,生产者也无需携带路由键。

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。

Exchange和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都 是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时 指定,RabbitMQ 默认的vhost是/。

Broker

表示消息队列服务器实体

Docker 安装 RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

4369,25672(Erlang发现&集群端口)

5672,5671(AMQP端口)

15672 (web管理后台端口)

61613,61614(STOMP协议端口)

1883,8883(MQTT协议端口)

https://www.rabbitmq.com/networking.html

RabbitMQ运行机制

生产者发送消息到交换机,消费者从队列中获取信息,交换机会绑定队列

Exchange 类型

目前headers弃用,只学习direct、fanout、topic三种交换机

direct点对点交换机:如果路由键和binding中的bindingkey一致,交换机将消息发到对应的队列中。

fonout发布订阅交换机:(可以没有路由键,因为他是广播机制,只要消息发给交换机,交换机绑定的队列全部收到)交换机绑定的所有队列都可以拿到消息

topic发布订阅:按照特定规则,交换机绑定的满足规则的队列拿到消息。

三种交换机运用

RabbitMQ整合

AmqpAdmin创建队列,交换机,绑定关系

@Test
    public void createExchange() {
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        // 创建交换机
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[{}]创建成功",directExchange.getName());
    }
    @Test
    public void createQueue() {
        Queue queue = new Queue("hello-java-queue", true, false, false);
        // 创建队列
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功",queue.getName());
    }

    @Test
    public void createBinding() {
        // String destination 目的地, DestinationType destinationType 目的地类型
        // , String exchange 交换机, String routingKey,路由键
        //            Map<String, Object> arguments
        Binding binding = new Binding("hello-java-queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",
                null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding[{}]创建成功",binding.getRoutingKey());
    }

RabbitTemplate发送消息

@Test
    public void sendMsg() {

        //String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,
        //            CorrelationData correlationData
        // 1、发送的消息如果是对象,需要将对象序列化写出去,实现Serializable接口
        // 2、出了实现Serializable接口进行jdk序列化以外,对象还可以转为json进行传递(配置MessageConverter消息转换器)
        // 发送的都是同一类型消息
//        for (int i = 0; i < 10; i++) {
//            OrderEntity entity = new OrderEntity();
//            entity.setId((long) i);
//            entity.setCreateTime(new Date());
//            entity.setGrowth(99);
//            rabbitTemplate.convertAndSend("hello-java-exchange",
//                    "hello.java",
//                    entity);
//            log.info("消息发送完成[{}]",entity);
//        }
        // 发送不同类型的消息 这个时候使用@RabbitListener不能达到重载效果,需要配合@RabbitHandler使用
        for (int i = 0; i < 10; i++) {
            if(i%2==0) {
                OrderEntity entity = new OrderEntity();
                entity.setId((long) i);
                entity.setCreateTime(new Date());
                entity.setGrowth(99);
                rabbitTemplate.convertAndSend("hello-java-exchange",
                        "hello.java",
                        entity);
                log.info("消息发送完成[{}]",entity);
            }else{
                OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
                entity.setId((long) i);
                entity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange",
                        "hello.java",
                        entity);
                log.info("消息发送完成[{}]",entity);
            }

        }

@RabbitListener和@RabbitHandler接收消费消息

前期准备:接收消息要在启动类上添加@EnableRabbit注解,

@RabbitListener和可以作用在类和方法上

@RabbitHandler作用在方法上

只使用@RabbitListener也可作用在方法上也可接受消息,不过只能接收同种类型消息,配合@RabbitHandler进行重载可以接受多种不同消息。

@RabbitListener(queues = {"hello-java-queue"})
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {
    //    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMsg(Message message,OrderEntity entity) {
        byte[] messageBody = message.getBody();
        System.out.println("接收到的消息"+message+"==内容是"+entity);
    }
    @RabbitHandler
    public void receiveMsg(Message message, OrderEntity entity) {
        System.out.println("接收到的消息"  + entity);
    }
    @RabbitHandler
    public void receiveMsg2(Message message, OrderReturnReasonEntity entity) {
        System.out.println("接收到的消息"  + entity);
    }
}

RabbitMQ消息确认机制-可靠抵达

图解

confirmCallback发送端消息确认

1、在application.properties中配置:

# 开启发送端确认(消息代理broker确认收到消息)
spring.rabbitmq.publisher-confirms=true

2、定制rabbitTemplate,设置confirmCallback

 /**
     * 定制 RabbitTemplate
     */
    @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达消息代理broker ack即为true
             * @param correlationData 当前消息唯一关联数据(id)
             * @param ack 消息是否成功发出
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]");
            }
        });

ReturnCallback发送端消息确认

1、在application.properties中配置:

# 开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步的方式优先回调我们这个returnconfirm (可以不做配置)
spring.rabbitmq.template.mandatory=true

2、定制rabbitTemplate,设置ReturnCallback

/**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback
    public void initRabbitTemplate() {
        //设置broker收到消息确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达消息代理broker ack即为true
             * @param correlationData 当前消息唯一关联数据(id)
             * @param ack 消息是否成功发出
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]");
            }
        });

        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
                                          String exchange, String routingKey)->{
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }

ack消费端消息确认

 /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *      1、默认是自动确认的,只要消息接收到,客户端就会自动确认,broker服务端就会删除这个消息
     *      问题:我们收到很多消息,自动回复给服务端ack,只有一个消息处理成功,宕机了。发生消息丢失(其他消息还没消费,消息就从broker中删除了)
     *      解决方法手动确认# 开启手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *      手动确认只要我们没有明确告诉mq,消息被消费,没有ack,消息就一直保持unacked状态,即使生产者服务宕机,消息也会一直保存在broker队列中,不被删除,此时为ready(可用)状态
     *      2、手动确认如何确认消费消息
     *           // 消费确认消息
     *                 channel.basicAck(deliveryTag,false); 业务执行成功,消费消息
     *            // 拒绝消费消息
     *                  channel.basicNack(deliveryTag,false,true); 业务执行失败,拒绝消费,可以重新放入队列给被别人消费,也可直接丢弃
     *                 channel.basicReject(deliveryTag,true);
     *
     */
    @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback
    public void initRabbitTemplate() {
        //设置broker收到消息确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达消息代理broker ack即为true
             * @param correlationData 当前消息唯一关联数据(id)
             * @param ack 消息是否成功发出
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]");
            }
        });

        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
                                          String exchange, String routingKey)->{
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }

手动ack

@RabbitHandler
    public void receiveMsg(Message message, OrderEntity entity, Channel channel) {
        System.out.println("接收到的消息" + entity);
        System.out.println("消息处理完成=>"+entity.getId());
        // channel内顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag==>"+deliveryTag);
        // 签收消息 multiple是否批量
        try {
            if(deliveryTag%2==0) {
                // 消费确认消息
                channel.basicAck(deliveryTag,false);
                System.out.println("消费了消息"+deliveryTag);
            }else{
                // 拒绝消费消息,退回给broker消息代理服务器,requeue为true消息服务器重新入队,等待消息被消费确认
                // requeue为false,消息被丢弃
                channel.basicNack(deliveryTag,false,true);
                System.out.println("取消消费"+deliveryTag+"消息,重新入队");
                // 和basicNack类似
                channel.basicReject(deliveryTag,true);
            }
        } catch (IOException e) {
            // 网络中断
            e.printStackTrace();
        }
    }
标签: rabbitmq 中间件

本文转载自: https://blog.csdn.net/qq_43614322/article/details/124411404
版权归原作者 一只小猿i 所有, 如有侵权,请联系我们删除。

“RabbitMQ从入门到精通(详细)”的评论:

还没有评论