文章目录
一、RabbotMQ接口介绍
1. basicQos预取方法参数解析
basicQos(int prefetchCount)
basicQos(int prefetchCount, boolean global)
basicQos(int prefetchSize, int prefetchCount, boolean global)
参数:
- prefetchSize:可接收消息的大小
- prefetchCount:处理消息最大的数量。
- global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
2. basicConsumer消费方法参数解析
basicConsumer(String queue, Consumer consumer)
basicConsumer(String queue, boolean autoAck, Consumer consumer)
参数:
- queue:监听的队列名称
- autoAck:是否自动消费消息
- consumer:使用的消费者类
二、非Spring项目集成-失败不重试,直接确认
Consumer.java 消费者类
packagecom.lmc.mq.nospring;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeoutException;/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:06
* @version: 1.0
*/publicclassConsumer{privatefinalstaticStringQUEUE_NAME="lmc-test";//队列名称publicstaticvoidmain(String[] args){initModule();}publicstaticvoidinitModule(){//创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("xx.xx.xx.xx");//设置rabbitmq-server的地址
connectionFactory.setPort(5672);//使用的端口号
connectionFactory.setVirtualHost("/");//使用的虚拟主机
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");//由连接工厂创建连接Connection connection =null;try{
connection = connectionFactory.newConnection();//通过连接创建信道finalChannel channel = connection.createChannel();
channel.basicQos(0,3,true);//创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替DefaultConsumer consumer =newDefaultConsumer(channel){//监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写@OverridepublicvoidhandleDelivery(java.lang.String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{MqMessageDispatcher.doDispatch(newString(body,"UTF-8"), channel, envelope);}};//监听指定的queue。会一直监听。//参数:要监听的queue、是否自动确认消息、使用的Consumer
channel.basicConsume(QUEUE_NAME,false, consumer);}catch(IOException e){
e.printStackTrace();}catch(TimeoutException e){
e.printStackTrace();}}}
MqMessageDispatcher.java 多线程类:同时并发处理多个消息
packagecom.lmc.mq.nospring;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Envelope;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:45
* @version: 1.0
*/publicclassMqMessageDispatcher{publicstaticLogger logger =LoggerFactory.getLogger(MqMessageDispatcher.class);publicstaticExecutorService msgHandleService =Executors.newFixedThreadPool(5);static{Runtime.getRuntime().addShutdownHook(newThread(){@Overridepublicvoidrun(){
msgHandleService.shutdown();}});}publicstaticvoiddoDispatch(String message,Channel channel,Envelope envelope){
msgHandleService.execute(newMessageHandleTask(message, channel, envelope));}privatestaticclassMessageHandleTaskimplementsRunnable{String message;Channel channel;Envelope envelope;publicMessageHandleTask(String message,Channel channel,Envelope envelope){this.message = message;this.channel = channel;this.envelope = envelope;}@Overridepublicvoidrun(){long start =System.currentTimeMillis();
logger.info("Received message: "+ message);try{Thread.sleep(5000);}catch(InterruptedException e){// TODO Auto-generated catch block
e.printStackTrace();}try{// 手动确认消息,若自动确认则不需要写以下该行
channel.basicAck(envelope.getDeliveryTag(),false);}catch(IOException e){System.err.println("fail to confirm message:"+ message);}}}}
三、非Spring项目集成-失败重试5次,再直接确认
MqMessageDispatcher.java
packagecom.lmc.mq.nospring;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Envelope;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:45
* @version: 1.0
*/publicclassMqMessageDispatcher{publicstaticfinalLogger logger =LoggerFactory.getLogger(MqMessageDispatcher.class);publicstaticExecutorService msgHandleService =Executors.newFixedThreadPool(5);publicstaticMap<String,Integer> cacheMap =newHashMap(5);static{Runtime.getRuntime().addShutdownHook(newThread(){@Overridepublicvoidrun(){
msgHandleService.shutdown();}});}publicstaticvoiddoDispatch(String message,Channel channel,Envelope envelope){
msgHandleService.execute(newMessageHandleTask(message, channel, envelope));}privatestaticclassMessageHandleTaskimplementsRunnable{String message;Channel channel;Envelope envelope;publicMessageHandleTask(String message,Channel channel,Envelope envelope){this.message = message;this.channel = channel;this.envelope = envelope;}@Overridepublicvoidrun(){int currentTimes =0;// 当前重试次数boolean isSuccess =false;// 消息是否处理成功// 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识)if(cacheMap.containsKey(message)){
currentTimes = cacheMap.get(message);}else{
cacheMap.put(message,0);}long start =System.currentTimeMillis();
logger.info("Received message: "+ message);try{Thread.sleep(5000);}catch(InterruptedException e){// TODO Auto-generated catch block
e.printStackTrace();}try{if(isSuccess){// 手动确认消息
logger.info("message["+ message +"] consumer success.(Ack)");
cacheMap.put(message,0);
channel.basicAck(envelope.getDeliveryTag(),false);}else{if(currentTimes >=5){// 手动确认消息,若自动确认则不需要写以下该行
logger.warn("message["+ message +"] consumer fail,have retry 5 times.(Ack)");
cacheMap.put(message,0);
channel.basicAck(envelope.getDeliveryTag(),false);}else{// 处理失败,重试未5次,重新处理
cacheMap.put(message,++currentTimes);
logger.warn("message["+ message +"] consumer fail,prepare to retry "+ currentTimes +" times...(Nack)");
channel.basicNack(envelope.getDeliveryTag(),false,true);}}}catch(IOException e){System.err.println("fail to confirm message:"+ message);}}}}
四、SpringBoot集成
使用springboot同时处理多个消息,只需要在配置文件中,添加以下配置:
spring:rabbitmq:host: localhost
port:5672username: guest
password: guest
virtual-host: /
listener:simple:acknowledge-mode: manual # 开启手动确认concurrency:1#消费者最小数量max-concurrency:3#消费之最大数量prefetch:3#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
监听类 LmcTestConsumer:
packagecom.lmc.mq.spring.consumer;importcom.rabbitmq.client.Channel;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;importjava.io.IOException;/**
* @author lmc
* @Description: TODO
* @Create 2021-09-18 19:32
* @version: 1.0
*/@ComponentpublicclassLmcTestConsumer{publicstaticfinalLogger logger =LoggerFactory.getLogger(LmcTestConsumer.class);@RabbitHandler@RabbitListener(queues ="lmc-test")publicvoidhandler(@PayloadMessage message,Channel channel){try{String msg =newString(message.getBody(),"UTF-8");MqMessageDispatcher.doDispatch(msg, channel, message.getMessageProperties().getDeliveryTag());}catch(IOException e){
logger.error(e.getMessage());}catch(NullPointerException e1){
logger.error(e1.getMessage());}catch(Exception e){
logger.error(e.getMessage());}}}
其他
参考:https://gitee.com/lmchh/lmc-tools/tree/master/tools-message-queue
版权归原作者 Freedom3568 所有, 如有侵权,请联系我们删除。