0


RabbitMQ 中保证消息不被重复消费

RabbitMQ 如何保证消息不被重复消费及消费幂等性

简介

在分布式系统中,消息的可靠传输和处理至关重要。RabbitMQ 作为一个常用的消息队列中间件,提供了丰富的功能来保证消息的可靠传输。但是,由于网络故障、服务宕机等原因,消息有可能被重复消费。如果没有处理好重复消费问题,可能会导致系统产生不一致性。为了解决这个问题,RabbitMQ 提供了一些机制来防止消息被重复消费,同时,在设计消费者时也需要考虑消费的幂等性。

消息重复消费的场景

在 RabbitMQ 中,消息重复消费可能发生在以下场景:

  1. 网络故障:消费者成功处理消息后,在向 RabbitMQ 发送确认(ACK)时,网络故障导致 RabbitMQ 没有收到确认,RabbitMQ 认为消息未被处理,会重新投递该消息。
  2. 消费者故障:消费者在处理消息后,在发送 ACK 之前发生宕机或崩溃,导致消息未被确认,RabbitMQ 会重新投递该消息。
  3. 消息重试:某些场景下,消息消费失败时需要重试,这也可能导致消息被多次处理。

为了防止这些情况导致的数据不一致,必须确保消息消费的幂等性,即多次处理相同的消息,最终结果保持一致。

RabbitMQ 保证消息不被重复消费的方法
1. 手动确认消息

默认情况下,RabbitMQ 消费者会自动确认消息,即当消费者收到并处理消息后,RabbitMQ 认为消息已经成功处理,并将其从队列中删除。然而,这种机制无法防止消费者在处理消息过程中发生故障。

为了解决这个问题,可以使用 手动确认模式(Manual Acknowledgment)。在手动确认模式下,消费者在成功处理完消息后显式地向 RabbitMQ 发送 ACK,RabbitMQ 收到 ACK 后才会将消息从队列中删除。如果消费者未发送 ACK 或发送 NACK,RabbitMQ 会重新投递该消息。

importcom.rabbitmq.client.*;publicclassManualAckConsumer{privatefinalstaticString QUEUE_NAME ="manual_ack_queue";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.basicConsume(QUEUE_NAME,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsjava.io.IOException{String message =newString(body,"UTF-8");try{// 处理消息System.out.println(" [x] Received '"+ message +"'");// 手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){// 如果处理消息时发生异常,拒绝该消息,重新入队
                    channel.basicNack(envelope.getDeliveryTag(),false,true);}}});}}

在上述代码中:

  • basicAck 用于手动确认消息处理成功。
  • basicNack 用于拒绝消息,并可选择是否重新入队处理。
2. 消息去重

为确保消息不被重复处理,可以在消费者端实现消息去重机制。常见的方法是在消息中携带唯一标识(如 UUID 或者业务 ID),然后在消费端检查这个标识是否已经被处理过。如果已经处理过,则跳过处理;如果没有处理过,则处理消息并记录这个标识。

importjava.util.HashSet;importjava.util.Set;importcom.rabbitmq.client.*;publicclassIdempotentConsumer{privatefinalstaticString QUEUE_NAME ="idempotent_queue";privatestaticSet<String> processedMessageIds =newHashSet<>();publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.basicConsume(QUEUE_NAME,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsjava.io.IOException{String message =newString(body,"UTF-8");String messageId = properties.getMessageId();// 假设消息中带有唯一的MessageIdif(!processedMessageIds.contains(messageId)){try{// 处理消息System.out.println(" [x] Processing message '"+ message +"'");// 将消息ID添加到已处理集合中
                        processedMessageIds.add(messageId);// 手动确认消息
                        channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){// 如果处理消息时发生异常,拒绝该消息,重新入队
                        channel.basicNack(envelope.getDeliveryTag(),false,true);}}else{System.out.println(" [x] Duplicate message '"+ message +"' skipped.");// 已经处理过的消息直接确认
                    channel.basicAck(envelope.getDeliveryTag(),false);}}});}}

在这个例子中:

  • 消费者在处理消息前,检查消息的唯一标识 MessageId 是否已经处理过。
  • 如果未处理过,处理消息并将 MessageId 存入 processedMessageIds 集合。
  • 如果处理过,跳过处理并确认消息。
3. 消费者幂等性设计

实现幂等性的方法取决于业务逻辑和数据处理方式。以下是一些常见的设计策略:

  1. 数据库唯一约束:在数据库中通过唯一约束防止重复插入数据。例如,如果一条消息的处理结果会导致数据库插入一条记录,可以在数据库中对插入的关键字段设置唯一约束。
java    try {        // 插入数据库,若存在唯一约束冲突会抛出异常        String sql = "INSERT INTO orders (order_id, product, quantity) VALUES (?, ?, ?)";        jdbcTemplate.update(sql, orderId, product, quantity);    } catch (DuplicateKeyException e) {        // 处理唯一约束异常        System.out.println("Order already processed: " + orderId);    }    
  1. 乐观锁:使用乐观锁机制确保同一数据只会被更新一次。可以通过数据库中的版本号字段或 CAS(Compare And Swap)机制实现。
java    String sql = "UPDATE orders SET status = ? WHERE order_id = ? AND version = ?";    int rowsAffected = jdbcTemplate.update(sql, newStatus, orderId, currentVersion);    if (rowsAffected == 0) {        // 更新失败,说明该订单已经被处理过        System.out.println("Order already processed or version mismatch: " + orderId);    }    
  1. 外部存储(如 Redis):使用 Redis 等外部存储来记录已经处理过的消息 ID,并设置过期时间,防止重复消费。
java    String redisKey = "processed:message:" + messageId;    if (redisTemplate.opsForValue().setIfAbsent(redisKey, "1", 1, TimeUnit.HOURS)) {        // 若插入成功,说明此消息未被处理过        // 处理消息逻辑    } else {        // 已经处理过的消息        System.out.println("Message already processed: " + messageId);    }    
4. 使用事务保证消息一致性

为了确保消息的处理和数据操作的一致性,RabbitMQ 可以结合事务或数据库的事务机制。通过将消息的消费和数据库操作放在一个事务中,确保在事务提交之前,消息不会被确认。

importorg.springframework.transaction.annotation.Transactional;@TransactionalpublicvoidprocessMessage(String message){// 执行业务逻辑,包含数据库操作// 如果数据库操作成功
    channel.basicAck(deliveryTag,false);}

在这种情况下,如果事务提交失败,消息不会被确认,RabbitMQ 会重新投递该消息,保证数据一致性。

总结

在 RabbitMQ 中,防止消息重复消费和保证消费幂等性是保证系统数据一致性的重要手段。通过手动确认消息、实现消息去重机制、设计幂等性消费逻辑以及结合事务控制,可以有效防止消息的重复消费,确保系统的稳定性和可靠性。

这些机制和策略在实际应用中可以根据具体的业务场景进行组合和优化,确保在复杂分布式系统中,消息传递的可靠性和数据处理的一致性得以保障。

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/Flying_Fish_roe/article/details/143843845
版权归原作者 Flying_Fish_Xuan 所有, 如有侵权,请联系我们删除。

“RabbitMQ 中保证消息不被重复消费”的评论:

还没有评论