一、应用场景:
- 根据实际业务动态创建队列发送消息并消费,队列名称无法提前确定
- 业务处理完成后删除对应队列,避免资源占用
- 分布式部署时,需要自动拉起其它节点的消费队列进行业务处理
二、功能设计及实现:
(1)设计流程
开始创建队列时首先判断队列是否存在,后续将通知发送至Notice队列通知其它节点此队列已创建开启
监听,消费完毕后删除队列,并发送队列删除信息至其它节点,停止监听。对于整个生产、监听过程中
产生的失败消息通过死信队列进行处理,后续对失败消息进行再次处理或入库记录
具体项目代码目录结构可按如下结构进行设计,描述信息如下:
- config:配置RabbitMq相关配置信息(预取数量、自动重连时间等等)
- controller:接口层(提供对外API接口)
- model:模型类(业务定义模型类)
- sender:mq发送消息业务(发送消息)
- handler:mq消费处理消息业务
- service:业务类
(2)核心方法
RabbitMqConfig完成消息队列相关信息配置。其主要配置项如下:
(1)设置预取数量、自动重连时间间隔
@ConfigurationpublicclassRabbitMqConfig{@Bean@PrimarypublicSimpleRabbitListenerContainerFactoryproxyRabbitListenerContainerFactory(ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(5);// 设置预取数量
factory.setRecoveryInterval(5000L);// 自动重连间隔时间return factory;}}
(2)设置队列与消费处理类绑定
@BeanpublicSimpleMessageListenerContaineruserMessageListenerContainer(ConnectionFactory connectionFactory,MessageListenerAdapter userListenerAdapter){SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(USER_QUEUE_NAME);// 设置要监听的队列名称
container.setMessageListener(userListenerAdapter);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L);return container;}@BeanpublicMessageListenerAdapteruserListenerAdapter(UserHandlerWithMessage handler){returnnewMessageListenerAdapter(handler,"handleMessage");}
(3)配置死信队列
// 定义死信交换机@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE);}// 定义死信队列@BeanpublicQueuedeadLetterQueue(){returnnewQueue(DEAD_LETTER_QUEUE);}// 绑定死信队列到死信交换机@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}// 配置普通队列,绑定到死信交换机@BeanpublicQueueuserQueue(){returnQueueBuilder.durable(USER_QUEUE_NAME).withArgument("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE)// 设置死信交换机.withArgument("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY)// 设置死信路由键.build();}
(4)死信队列消息处理,此处不详细给出错误消息的代码逻辑,可根据业务需求进行处理,如:存入关系数据库进行记录,或创建消费者对队列信息进行消费都可
(5)配置并启动通知队列,确保分布式环境下所有节点都能进行消息消费,在每次队列创建或删除时通过Notice主题广播消息,通知其它节点开始消费或停止消费
publicvoidbroadcastQueue(String queueName,String routingKey,boolean isCreated){String message = queueName +":"+ routingKey +":"+(isCreated ?CommonConstants.PROXY_CREATED:CommonConstants.PROXY_DELETED);
rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);LogUtils.logInfo("Broadcasted new queue creation: "+ message);}
(6)队列管控业务类,此类主要提供队列的创建、删除、开启监听等操作,这里的RabbitAdmin、SimpleRabbitListenerContainerFactory等参数可根据业务需求自定义配置,使用时需要设置为自定义的参数,如:proxyRabbitListenerContainerFactory
publicDynamicQueueService(ConnectionFactory connectionFactory,@Qualifier("proxyRabbitAdmin")RabbitAdmin rabbitAdmin,@Qualifier("proxyRabbitListenerContainerFactory")SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,RabbitTemplate rabbitTemplate,TopicExchange exchange,FanoutExchange broadcastExchange){this.connectionFactory = connectionFactory;this.rabbitAdmin = rabbitAdmin;this.exchange = exchange;this.broadcastExchange = broadcastExchange;this.rabbitListenerContainerFactory = rabbitListenerContainerFactory;this.rabbitTemplate = rabbitTemplate;}/**
* 判断队列是否存在
* @param queueName 队列名称
* @return
*/publicbooleanisQueueExists(String queueName){returnBoolean.TRUE.equals(rabbitTemplate.execute(channel ->{try{
channel.queueDeclarePassive(queueName);returntrue;}catch(Exception e){returnfalse;}}));}/**
* 创建队列并开启监听
* @param queueName 队列名称
* @param routingKey 路由键
* @param handlerBeanName 消费beanName
*/publicvoidcreateQueueAndStartListener(String queueName,String routingKey,String handlerBeanName){// 创建持久化队列,如果队列已经存在则不会重复创建Queue queue =newQueue(queueName,true);if(!isQueueExists(queueName)){
rabbitAdmin.declareQueue(queue);
log.info("Declared queue: "+ queueName);// 绑定队列到交换器并指定路由键Binding binding =BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
log.info("Bound queue: "+ queueName +" to exchange: "+ exchange.getName()+" with routing key: "+ routingKey);}else{
log.info("Queue "+ queueName +" already exists.");}// 若队列名称不是接收广播队列,则启动监听if(!queueName.equals(CommonConstants.BROADCAST_EXCHANGE)){// 启动监听容器startListener(queueName, handlerBeanName);}}/**
* 开启监听
* @param queueName 队列名称
* @param handlerBeanName 消费处理bean名称
*/publicvoidstartListener(String queueName,String handlerBeanName){// 从 Spring 容器中获取 MessageHandler 实例Object messageHandler = applicationContext.getBean(handlerBeanName);// 创建消息监听容器SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(newMessageListenerAdapter(messageHandler));// 设置其他参数(例如并发消费者数量,预取数量等)
container.setConcurrentConsumers(concurrency);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L);// 自动重连间隔时间// 启动监听容器try{
container.start();
log.info("Started listener container for queue: "+ queueName);// 检查容器是否正在运行if(container.isRunning()){
log.info("Listener container is running.");}else{
log.warn("Listener container did not start properly.");}}catch(Exception e){
log.error("Failed to start listener container for queue: "+ queueName, e);// 处理启动失败的情况}
log.info("Started listener container for queue: "+ queueName);// 存储监听容器,以便后续管理
listenerContainers.put(queueName, container);}/**
* 删除队列停止监听
* @param queueName 队列名称
*/publicvoiddeleteQueueAndStopListener(String queueName){// 停止监听容器SimpleMessageListenerContainer container = listenerContainers.get(queueName);if(container !=null){
container.stop();
listenerContainers.remove(queueName);}// 删除队列
rabbitAdmin.deleteQueue(queueName);}/**
* 广播消息
* @param queueName 队列名
* @param routingKey 路由键
* @param isCreated 创建/删除 (true/false)
*/publicvoidbroadcastQueue(String queueName,String routingKey,boolean isCreated){String message = queueName +":"+ routingKey +":"+(isCreated ?CommonConstants.QUEUE_CREATE:CommonConstants.QUEUE_DELETE);
rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);
log.info("Broadcasted new queue creation: "+ message);}
注意事项:
上文所介绍的消费处理业务与队列绑定是通过在配置类中进行实现,针对当前队列设置单独container,
给container设置MessageListenerAdapter进行消费处理类绑定,还有另一种直接在队列创建启动时通
过借助spring的上下文进行实现,底层逻辑相同。
使用applicationContext,在启动队列监听时将具体处理消息的bean名称传入,通过spring容器获取实
例,调用SimpleMessageListenerContainer的setMessageListener方法来配置消息处理类
publicvoidstartListener(String queueName,String handlerBeanName){// 从 Spring 容器中获取 MessageHandler 实例Object messageHandler = applicationContext.getBean(handlerBeanName);// 创建消息监听容器SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(newMessageListenerAdapter(messageHandler));// 设置其他参数(例如并发消费者数量,预取数量等)
container.setConcurrentConsumers(concurrency);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L);// 自动重连间隔时间// 启动监听容器try{
container.start();
log.info("Started listener container for queue: "+ queueName);// 检查容器是否正在运行if(container.isRunning()){
log.info("Listener container is running.");}else{
log.warn("Listener container did not start properly.");}}catch(Exception e){
log.error("Failed to start listener container for queue: "+ queueName, e);// 处理启动失败的情况}
log.info("Started listener container for queue: "+ queueName);}
三、常见问题及解决方式
(1)如何保证消息不丢失
问题描述
从生产者到 mq,再从 mq 到消费者,都有可能因为网络,服务宕机等原因丢失消息。
解决方案
发布确认:保证生产者到交换机,交换机到队列。这里可以使用消息入库方案,将要发送的消息保存
到数据库中。
发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确
认;收到确认后将status设为1,表示RabbitMQ已收到消息。使用定时检索消息表,将status=0并且超
过固定时间后(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以
要设置时间)还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性
处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。
持久化:交换机、队列、消息持久化,确保mq上消息不会因为宕机丢失。
消息确认:ack模式,每条消息都要确认,保证消费端不丢失。
(2)保证消息幂等性
问题描述
MQ消息的幂等性问题是指在消息传输和处理过程中,确保对同一消息的重复处理不会导致不同的
结果或副作用。幂等性在分布式系统中非常重要,因为消息传递过程中可能会发生重复发送或处理
的情况。
产生原因
消息重复发送:由于网络故障或重试机制,生产者可能会发送相同的消息多次。
消息重复消费:消费者在处理消息时,如果在处理过程中发生故障,消息可能会被重新消费。
网络分区:在网络分区恢复后,可能会有重复的消息传递。
解决方案
使用唯一标识符(ID):给每条消息分配一个唯一的标识符(ID),并在消费时记录已处理过的
ID。对于重复的ID,直接跳过处理。
使用去重表:使用数据库的去重表(如MySQL的唯一索引)来记录已处理的消息ID,在插入消息ID
时,如果已经存在则忽略。
使用乐观锁:在更新数据时,确保数据未被修改过。如果数据被修改过,则重试或跳过处理。
使用分布式锁:使用分布式锁(如Redis的分布式锁)来保证同一时刻只有一个消费者在处理相同的
消息。
(3)保证消息有序性(部分业务需求)
使用单一消费者:在队列上只配置一个消费者,这样可以保证消息按顺序被消费
使用事务:使用事务来确保消息的顺序消费。确保每个消息处理操作在一个事务中完成,只有在事
务成功提交后,消息才被确认。
消息分区:将消息分区,每个分区由一个消费者处理,从而在每个分区内部保持顺序,但允许并行
处理不同分区的消息。
publicvoidsendMessage(String message,String partitionKey){String routingKey =getRoutingKey(partitionKey);
rabbitTemplate.convertAndSend("partitionExchange", routingKey, message);}privateStringgetRoutingKey(String partitionKey){// 根据分区键计算路由键return"partition"+(partitionKey.hashCode()%2+1);}
注意:对于rabbitmq实现消息有序性往往需要使用事务、单一消费者,会丧失很多消费性能,建议如果
业务要求消息必须有序可以使用kafka来进行替换
详细demo示例路径:
https://github.com/Itsuming/spring-rabbitmq-demo
版权归原作者 AI_Frank 所有, 如有侵权,请联系我们删除。