0


Rabbitmq消费者保证幂等性

如图:

示例代码:

pom文件添加依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml配置rabbitmq连接

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /guoVirtualHost

    listener:
      simple:
        retry:
#          开启消费者(出现异常)会进行重试
          enabled: true
#          默认重试无限次,所以需指定重试次数
          max-attempts: 5
#          重试间隔时间
          initial-interval: 3000
#          保证消费者会消费消息,手动确认
        acknowledge-mode: manual

#定义交换机、路由、队列名称
guoguo:
  order:
    exchange: guoguo_order_exchange
    routingKey: guoguo.order
    queue: guoguo_order_queue

定义RabbitmqConfig配置类,Exchange,Routingkey,Queue

@Component
public class OrderConfig {

     
    @Value("${guoguo.order.exchange}")
    private String orderExchange;

    @Value("${guoguo.order.routingKey}")
    private String orderRoutingKey;

    @Value("${guoguo.order.queue}")
    private String orderQueue;

    @Bean
    public DirectExchange orderExchange(){
       return new DirectExchange(orderExchange);
    }
    @Bean
    public Queue orderQueue(){
        return new Queue(orderQueue,true,false,false);
    }
    @Bean
    public Binding orderBinding(Queue orderQueue,DirectExchange orderExchange){
        //方式1:Queue orderQueue,DirectExchange orderExchange 会去ioc容器中找到Bean
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
        //方式2:直接调用方法
   //return BindingBuilder.bind(orderQueue()).to( orderExchange()).with(orderRoutingKey);
    }

定义发送消息工具类:

@Component
@Slf4j
public class SendMessageUtlis {

    @Autowired
    RabbitTemplate amqpTemplate;
    //订单队列和交换机
    @Value("${guoguo.order.exchange}")
    private String orderExchange;

    @Value("${guoguo.order.routingKey}")
    private String orderRoutingKey;

    public void sendMessage(Order order){
        amqpTemplate.convertAndSend(orderExchange,orderRoutingKey,order,message -> {
            return message;
        });
    }

发送消息:

  @RequestMapping("/addOrder")
    public String addOrder(){
        String quanjuId = System.currentTimeMillis()+ "";
        //生产者投递消息时携带全局id发送到mq服务器端
        Order order = new Order(1, "mq演示", 66, "男",quanjuId);
        log.info("全局id:"+quanjuId);
        sendMessage.sendMessage(order );
        //返回全局id给客户端
        return quanjuId;
    }

消费者监听消息:

    @Autowired
    OrderDao orderDao;

    @RabbitListener(queues = "guoguo_order_queue")
    public void process(Order order, Message message,Channel channel) throws IOException {
        if (StringUtils.isEmpty(order.getQuanjuId())){
            return;
        }
        //拿到全局id
        Order orders = orderDao.getQuanjuId(order.getQuanjuId());
        if (order!= null){
            log.info("已经被消费");
            //如果不为null,则已经被消费过,也需要告诉mq服务器端,将该消息删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        }
        int result = orderDao.saveOrder(order);
        log.info("插入成功");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
标签: rabbitmq

本文转载自: https://blog.csdn.net/qq_56808014/article/details/126741088
版权归原作者 WontCode 所有, 如有侵权,请联系我们删除。

“Rabbitmq消费者保证幂等性”的评论:

还没有评论