0


RabbitMQ顺序消费

一、需要保证MQ顺序消费的场景

实际项目中,比如订单系统要同步订单表的数据到大数据部门的MySQL库中,通常做法是通过Canal这样的中间件去监听binlog,然后再把这些binlog 发送到MQ中, 然后消费者从MQ中获取binlog数据落地到大数据部门的MySQL中。

在这个过程,可能有订单的增删改操作, 需要保证binlog数据一定是有序的,比如 binlog 执行顺序是 增加、修改。但是消费者可能拿到的顺序是修改、增加,这就导致数据发生异常

二、发生消息乱序的原因

1、多个消费者消费一个队列中的消息

1)原因
一个queue,但是有多个consumer去消费, 因为我们无法保证先读到消息的 consumer 一定先完成操作,所以可能会导致消息顺序错乱
在这里插入图片描述

** 2) 解决:**
出现这个问题的主要原因是,不同消息都发送到了一个queue 中,然后多个消费者消费同一个queue的消息。所以我们可以给 RabbitMQ 创建多个queue, 每个消费者只消费一个queue, 生产者根据订单号,把订单号相同的消息放入一个同一个queue。这样同一个订单号的消息就只会被同一个消费者顺序消费。

2、一个消费者,但是以多线程的方式进行消费

1)原因
一个queue,一个consumer去消费, 但是 consumer 里面进行了多线程消费, 无法保证哪个线程先执行完,可能导致顺序错乱

2)demo演示

生产者

publicclassTask1{privatestaticfinalString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsException{try(Channel channel =RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME,null, message.getBytes());System.out.println("发送消息完成:"+ message);}}}}

消费者多线程进行消费

publicclassWork01{privatestaticfinalString QUEUE_NAME ="hello";privatestaticLogger log =LoggerFactory.getLogger(Work01.class);publicstaticvoidmain(String[] args)throwsException{Thread thread1 =newThread(newRunnable(){@Overridepublicvoidrun(){try{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, delivery)->{String receivedMessage =newString(delivery.getBody());System.out.println(Thread.currentThread().getName()+"接收到消息:"+ receivedMessage);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");};System.out.println(Thread.currentThread().getName()+"准备消费");
                    channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}catch(Exception e){thrownewRuntimeException(e);}}});Thread thread2 =newThread(newRunnable(){@Overridepublicvoidrun(){try{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, delivery)->{try{Thread.sleep(30000);}catch(InterruptedException e){
                            e.printStackTrace();}String receivedMessage =newString(delivery.getBody());System.out.println(Thread.currentThread().getName()+"接收到消息:"+ receivedMessage);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");};System.out.println(Thread.currentThread().getName()+"准备消费");
                    channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}catch(Exception e){thrownewRuntimeException(e);}}});Thread thread3 =newThread(newRunnable(){@Overridepublicvoidrun(){try{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, delivery)->{try{Thread.sleep(30000);}catch(InterruptedException e){
                            e.printStackTrace();}String receivedMessage =newString(delivery.getBody());System.out.println(Thread.currentThread().getName()+"接收到消息:"+ receivedMessage);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");};System.out.println(Thread.currentThread().getName()+"准备消费");
                    channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}catch(Exception e){thrownewRuntimeException(e);}}});

        thread1.start();
        thread2.start();
        thread3.start();}}

发送消息:
在这里插入图片描述
消费者消费消息的顺序
在这里插入图片描述

3)解决方式
针对这种情况可以引入多个内存队列,同一个订单号的消息放入一个队列中(也就是消费消息没有速度差异),线程不直接消费消息,而是从队列中取出消息去消费

3、网络延时

生产者到MQ中间,消息由于网络延迟或者出现重试,导致原本 binlog 顺序是 1 2 3,发送到 MQ 的 queue 中变成了 1 3 2

标签: rabbitmq

本文转载自: https://blog.csdn.net/hc1285653662/article/details/129665173
版权归原作者 坚持每天学习 所有, 如有侵权,请联系我们删除。

“RabbitMQ顺序消费”的评论:

还没有评论