0


MQ消息队列(主要介绍RabbitMQ)

消息队列概念:是在消息的传输过程中保存消息的容器。

作用:异步处理、应用解耦、流量控制.....

RabbitMQ:

SpringBoot继承RabbitMQ步骤:

    1.加入依赖
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
     2.配置
spring:
  rabbitmq:
    host: 192.168.127.129
    virtual-host: /  # 指定虚拟主机
    port: 5672
    3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)
@EnableRabbit
    4.创建队列、交换机、以及绑定它们之间的关系
@Configuration
public class MyMQConfig {

    

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue createQueue(){
        //String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
        Queue queue = new Queue("order.queue",true,false,false);
        return queue;
    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public Exchange createExchange(){
        //因为这个交换机需要根据路由进行发送  所以使用TopicExchange

        //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);
        return  topicExchange;
    }

    /**
     * 通过路由绑定交换机和队列之间的关系
     * @return
     */
    @Bean
    public Binding createBinding(){
        //String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
        Binding binding = new Binding("order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.route",
                null
        );
        return binding;
    }

}
    4.发送消息
    @Autowired
    RabbitTemplate rabbitTemplate;

    @ResponseBody
    @GetMapping("/sendmq")
    public String sendmq(){
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setOrderSn(UUID.randomUUID().toString());
        //发送消息  String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData
        rabbitTemplate.convertAndSend("order-event-exchange","order.route",orderEntity);
        return "ok";
    }
    5.消费消息(监听消息)
@Component
@RabbitListener(queues = "create.queue")
public class OrderCloseListener {

   
    @RabbitHandler
    public void orderClose(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        System.out.println("消费消息");

    }
}

问题1:以上消息发送和消费中,如果传输的数据是java对象,默认使用的jdk序列化机制,我们经常需要使用json传递就需要修改传输格式json

修改方法如下:

@Configuration
public class RabbitConfig {

    //发送消息为对象的时候  使用json的格式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

}

问题2:在消息的发送和消费还有消息储存过程中,我们需要保证消息的可靠性,避免消息的丢失保证业务数据的正确

    1.消息储存:使用持久化

    1.消息发送:开启消息投靠确认机制
spring:
  rabbitmq:
    host: 192.168.127.129
    virtual-host: /  # 指定虚拟主机
    port: 5672
#    publisher-confirms: true
    publisher-confirm-type: simple # 开启生产者消息确认模式
    publisher-returns: true
@Configuration
public class RabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 定制rabbitTemplate
     * 消息发送确认
     */
    @PostConstruct //表示RabbitConfig对象创建之后执行该方法
    public void initRabbitTemplate(){
        //消息成功发送到服务器之后的成功回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData  消息的唯一id
             * @param b  消息是否成功
             * @param s  消息失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm===correlationData:"+ correlationData+ "ack:"+ b);
            }
        });

        //消息发送到队列queue失败执行的回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message  消息的内容
             * @param i    回复的状态码
             * @param s     回复的文本内容
             * @param s1    那个交换机
             * @param s2    那个路由key
             *
             *              最常见的就是路由key不对
             */
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("fail====>message:"+ message+"状态码:"+i + "错误提示:"+ s+ "交换机:"+s1 + "路由:"+ s2);
            }
        });
    }
}

异常操作之后可以达到消息发送端确认机制

    3.消息消费端的确认机制
spring:
  rabbitmq:
    host: 192.168.127.129
    virtual-host: /  # 指定虚拟主机
    port: 5672
#    publisher-confirms: true
    publisher-confirm-type: simple # 开启生产者消息确认模式
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual # 开启消费者 手动签收消息功能
@Service
@RabbitListener(queues = "create.queue")
public class OrderCloseListener {

    @RabbitHandler
    public void orderClose(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        System.out.println("消费消息。。。.");
        try{
            //业务逻辑
            //手动确认消息消费成功,消息不在写人队列
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            //消息消费失败(业务失败),将消息在次写到队列避免消息丢失
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }

    }
}

通过以上消息的发送和消费端都确认之后我们消息一定的是可靠的。

案例:

    在实际的开发中我们经常会有取消订单的功能,就可以使用消息队列延迟消费消息,具体实现通过个死信队列,把消息先放到死信队列,当消息到期之后转到到期队列,监听到期队列然后达到订单取消功能
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_42223524/article/details/132453748
版权归原作者 地中海未来 所有, 如有侵权,请联系我们删除。

“MQ消息队列(主要介绍RabbitMQ)”的评论:

还没有评论