0


07. RabbitMQ消息成功确认机制

07. RabbitMQ消息成功确认机制

  • 在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
    • 事务机制- 发布确认机制

1.事务机制

  • AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
  • 并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
    • channel.txSelect(): 开启事务- channel.txCommit() :提交事务
    • channel.txRollback() :回滚事务
  • Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示

2.生产者代码

packagetrascation;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;/**
 * @author WeiHong
 * @date 2021 -  09 - 15 19:59
 */publicclassSender{publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection connection =ConnectionUtil.getConnection();//2.在连接中创建信道Channel channel = connection.createChannel();//3.声明路由
        channel.exchangeDeclare("trascation_exchange_topic","topic");//4.发送消息
        channel.txSelect();//开启事务try{
            channel.basicPublish("trascation_exchange_topic","user.weihong",null,"商品1-降价".getBytes());System.out.println(3/0);//模拟异常
            channel.basicPublish("trascation_exchange_topic","user.libai",null,"商品2-降价".getBytes());
            channel.basicPublish("trascation_exchange_topic","users123.wangwu",null,"商品3-降价".getBytes());System.out.println("生产者已发送!");
            channel.txCommit();//事务提交}catch(Exception e){System.out.println("由于系统异常,消息全部撤回!");
            channel.txRollback();//事务回滚
            e.printStackTrace();}finally{
            channel.close();
            connection.close();}}}

3.消费者代码

packagetrascation;importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;/**
 * @author WeiHong
 * @date 2021 -  09 - 15 20:10
 */publicclassRecer1{publicstaticvoidmain(String[] args)throwsException{//1.创建连接Connection connection =ConnectionUtil.getConnection();//2.在连接中创建信道Channel channel = connection.createChannel();//3.声明队列
        channel.queueDeclare("trscation_queue_topic1",false,false,false,null);//4.绑定路由
        channel.queueBind("trscation_queue_topic1","trascation_exchange_topic","user.#");//5.定义内部类接收消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("消费者1="+msg);}};
        channel.basicConsume("trscation_queue_topic1",true,consumer);}}

4.实验结果

img

标签: RabbitMQ MQ

本文转载自: https://blog.csdn.net/qq_41239465/article/details/123676629
版权归原作者 程序员阿红 所有, 如有侵权,请联系我们删除。

“07. RabbitMQ消息成功确认机制”的评论:

还没有评论