0


SpringBoot: RabbitMQ消息队列之同时消费多条消息

文章目录


一、RabbotMQ接口介绍

1. basicQos预取方法参数解析

basicQos(int prefetchCount)
basicQos(int prefetchCount, boolean global)
basicQos(int prefetchSize, int prefetchCount, boolean global)

参数:

  • prefetchSize:可接收消息的大小
  • prefetchCount:处理消息最大的数量。
  • global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的

2. basicConsumer消费方法参数解析

basicConsumer(String queue, Consumer consumer)
basicConsumer(String queue, boolean autoAck, Consumer consumer)

参数:

  • queue:监听的队列名称
  • autoAck:是否自动消费消息
  • consumer:使用的消费者类

二、非Spring项目集成-失败不重试,直接确认

Consumer.java 消费者类

packagecom.lmc.mq.nospring;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeoutException;/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:06
 * @version: 1.0
 */publicclassConsumer{privatefinalstaticStringQUEUE_NAME="lmc-test";//队列名称publicstaticvoidmain(String[] args){initModule();}publicstaticvoidinitModule(){//创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("xx.xx.xx.xx");//设置rabbitmq-server的地址
        connectionFactory.setPort(5672);//使用的端口号
        connectionFactory.setVirtualHost("/");//使用的虚拟主机
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");//由连接工厂创建连接Connection connection =null;try{
            connection = connectionFactory.newConnection();//通过连接创建信道finalChannel channel = connection.createChannel();
            channel.basicQos(0,3,true);//创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替DefaultConsumer consumer =newDefaultConsumer(channel){//监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写@OverridepublicvoidhandleDelivery(java.lang.String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{MqMessageDispatcher.doDispatch(newString(body,"UTF-8"), channel, envelope);}};//监听指定的queue。会一直监听。//参数:要监听的queue、是否自动确认消息、使用的Consumer
            channel.basicConsume(QUEUE_NAME,false, consumer);}catch(IOException e){
            e.printStackTrace();}catch(TimeoutException e){
            e.printStackTrace();}}}

MqMessageDispatcher.java 多线程类:同时并发处理多个消息

packagecom.lmc.mq.nospring;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Envelope;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */publicclassMqMessageDispatcher{publicstaticLogger logger =LoggerFactory.getLogger(MqMessageDispatcher.class);publicstaticExecutorService msgHandleService =Executors.newFixedThreadPool(5);static{Runtime.getRuntime().addShutdownHook(newThread(){@Overridepublicvoidrun(){
                msgHandleService.shutdown();}});}publicstaticvoiddoDispatch(String message,Channel channel,Envelope envelope){
        msgHandleService.execute(newMessageHandleTask(message, channel, envelope));}privatestaticclassMessageHandleTaskimplementsRunnable{String message;Channel channel;Envelope envelope;publicMessageHandleTask(String message,Channel channel,Envelope envelope){this.message = message;this.channel = channel;this.envelope = envelope;}@Overridepublicvoidrun(){long start =System.currentTimeMillis();
            logger.info("Received message: "+ message);try{Thread.sleep(5000);}catch(InterruptedException e){// TODO Auto-generated catch block
                e.printStackTrace();}try{// 手动确认消息,若自动确认则不需要写以下该行
                channel.basicAck(envelope.getDeliveryTag(),false);}catch(IOException e){System.err.println("fail to confirm message:"+ message);}}}}

三、非Spring项目集成-失败重试5次,再直接确认

MqMessageDispatcher.java

packagecom.lmc.mq.nospring;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Envelope;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */publicclassMqMessageDispatcher{publicstaticfinalLogger logger =LoggerFactory.getLogger(MqMessageDispatcher.class);publicstaticExecutorService msgHandleService =Executors.newFixedThreadPool(5);publicstaticMap<String,Integer> cacheMap =newHashMap(5);static{Runtime.getRuntime().addShutdownHook(newThread(){@Overridepublicvoidrun(){
                msgHandleService.shutdown();}});}publicstaticvoiddoDispatch(String message,Channel channel,Envelope envelope){
        msgHandleService.execute(newMessageHandleTask(message, channel, envelope));}privatestaticclassMessageHandleTaskimplementsRunnable{String message;Channel channel;Envelope envelope;publicMessageHandleTask(String message,Channel channel,Envelope envelope){this.message = message;this.channel = channel;this.envelope = envelope;}@Overridepublicvoidrun(){int currentTimes =0;// 当前重试次数boolean isSuccess =false;// 消息是否处理成功// 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识)if(cacheMap.containsKey(message)){
                currentTimes = cacheMap.get(message);}else{
                cacheMap.put(message,0);}long start =System.currentTimeMillis();
            logger.info("Received message: "+ message);try{Thread.sleep(5000);}catch(InterruptedException e){// TODO Auto-generated catch block
                e.printStackTrace();}try{if(isSuccess){// 手动确认消息
                    logger.info("message["+ message +"] consumer success.(Ack)");
                    cacheMap.put(message,0);
                    channel.basicAck(envelope.getDeliveryTag(),false);}else{if(currentTimes >=5){// 手动确认消息,若自动确认则不需要写以下该行
                        logger.warn("message["+ message +"] consumer fail,have retry 5 times.(Ack)");
                        cacheMap.put(message,0);
                        channel.basicAck(envelope.getDeliveryTag(),false);}else{// 处理失败,重试未5次,重新处理
                        cacheMap.put(message,++currentTimes);
                        logger.warn("message["+ message +"] consumer fail,prepare to retry "+ currentTimes +" times...(Nack)");
                        channel.basicNack(envelope.getDeliveryTag(),false,true);}}}catch(IOException e){System.err.println("fail to confirm message:"+ message);}}}}

四、SpringBoot集成

使用springboot同时处理多个消息,只需要在配置文件中,添加以下配置:

spring:rabbitmq:host: localhost
    port:5672username: guest
    password: guest
    virtual-host: /
    listener:simple:acknowledge-mode: manual # 开启手动确认concurrency:1#消费者最小数量max-concurrency:3#消费之最大数量prefetch:3#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)

监听类 LmcTestConsumer:

packagecom.lmc.mq.spring.consumer;importcom.rabbitmq.client.Channel;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;importjava.io.IOException;/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-18 19:32
 * @version: 1.0
 */@ComponentpublicclassLmcTestConsumer{publicstaticfinalLogger logger =LoggerFactory.getLogger(LmcTestConsumer.class);@RabbitHandler@RabbitListener(queues ="lmc-test")publicvoidhandler(@PayloadMessage message,Channel channel){try{String msg =newString(message.getBody(),"UTF-8");MqMessageDispatcher.doDispatch(msg, channel, message.getMessageProperties().getDeliveryTag());}catch(IOException e){
            logger.error(e.getMessage());}catch(NullPointerException e1){
            logger.error(e1.getMessage());}catch(Exception e){
            logger.error(e.getMessage());}}}

其他

参考:https://gitee.com/lmchh/lmc-tools/tree/master/tools-message-queue


本文转载自: https://blog.csdn.net/zhanggqianglovec/article/details/128938967
版权归原作者 Freedom3568 所有, 如有侵权,请联系我们删除。

“SpringBoot: RabbitMQ消息队列之同时消费多条消息”的评论:

还没有评论