0


SpringBoot整合RabbitMq广播模式动态队列名称

优雅的实现多实例广播

  • 在业务场景中我们需要广播通知一个服务的所有实例,如多实例内存缓存数据准实时同步等
  • RabbitMq有Exchange的概念,一个Exchange可以绑定多个队列,它的广播模式是依靠广播交换机FanoutExchange实现的,投递消息时我们将消息投递给FanoutExchange, FanoutExchange 再将消息发送给每一个与之绑定的队列中,也就是说我们在实际场景中同一个服务的多个实例需要用不同的队列名绑定到同一个FanoutExchange上,从而实现消息广播。
  • 那么问题来了,在使用@RabbitListener注解时,一个服务多个实例如何使用不同的队列名呢?
  • @RabbitListener支持使用配置,可以从配置中获取队列名,但是这又需要我们在启动不同实例的时候修改不同的配置来启动,有没有更好的方式解决这个问题呢?有,本文将介绍一个优雅的方式来解决这个问题。

注意!!容器部署场景不推荐使用,容器ip不固定,会出现每次启动就注册新队列问题,导致MQ队列过多

版本说明

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.7.3.RELEASE</version>
</dependency>

项目配置

spring:rabbitmq:# mq ip或hosthost: 127.0.0.1
    # mq 端口,默认5672port:5672

配置类编写

  • 定义一个QueueExchangeBindConfig 类用来初始化队列、交换机以及绑定队列和交换机。
  • 声明队列时在队列名称末尾拼上当前机器IP地址,以保证每个实例的队列名称不重复(队列名称重复的话仅会有一台实例消费)。
  • 当前机器IP获取用的InetUtils,一般你有用到注册中心整合都会在spring容器中有这个类,如果项目里没有的话可以参考spring-cloud-commons包里的实现自己写一个(注意排除回环ip),或者引入这个包。
  • 队列名称以#dynamic#开头是为了使用@RabbitListener监听的时候拦截做处理(监听的时候也需要将当前机器IP地址拼上),我们后面还需要一个后置处理器处理监听队列名。
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.cloud.commons.util.InetUtils;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration@Slf4jpublicclassQueueExchangeBindConfig{publicstaticfinalStringDYNAMIC_HOST_NAME_EXCHANGE="dynamic_host_name_exchange";//交换器publicstaticfinalStringDYNAMIC_HOST_NAME_ROUTING="dynamic_host_name_routing";//路由publicstaticfinalStringDYNAMIC_HOST_NAME_QUEUE="#dynamic#_dynamic_host_name_queue_";// 消息队列名@AutowiredInetUtils inetUtils;@Bean("dynamicHostNameQueue")publicQueueDynamicHostNameQueue(){InetUtils.HostInfo firstNonLoopbackHostInfo = inetUtils.findFirstNonLoopbackHostInfo();String ipAddress = firstNonLoopbackHostInfo.getIpAddress();String queueName =DYNAMIC_HOST_NAME_QUEUE.concat(ipAddress);Queue queue =newQueue(queueName,true);
        log.info("DynamicHostNameQueue init success queueName:{}", queueName);return queue;}@Bean("dynamicHostNameFanoutExchange")publicFanoutExchangeDynamicHostNameFanoutExchange(){FanoutExchange fanoutExchange =newFanoutExchange(DYNAMIC_HOST_NAME_ROUTING);
        log.info("DynamicHostNameFanoutExchange init success:{}", fanoutExchange.getName());return fanoutExchange;}@BeanpublicBindingDynamicHostNameBinding(@Qualifier("dynamicHostNameQueue")Queue queue,@Qualifier("dynamicHostNameFanoutExchange")FanoutExchange fanoutExchange){Binding binding =BindingBuilder.bind(queue).to(fanoutExchange);
        log.info("DynamicHostNameBinding bing {} to {} success.", queue.getName(), fanoutExchange.getName());return binding;}}

队列监听类编写(消费者)

  • 注意,@RabbitListener注解中填的队列名称跟上面配置类的名称一致,以#dynamic#开头, 后面我们会解析这个开头标记拼接当前机器IP
  • 这里我们用的是自动ack,请根据实际场景需要切换手动模式
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ={QueueExchangeBindConfig.DYNAMIC_HOST_NAME_QUEUE})@Slf4jpublicclassDynamicHostNameQueueListener{@RabbitHandlerpublicvoidprocess(String msg){
        log.info("[DynamicHostNameQueueListener] receive sync msg:{}", msg);if(StringUtils.isRealEmpty(msg)){return;}try{// process the msg}catch(Exception e){
            log.error("[DynamicHostNameQueueListener] process failed, msg:{}", msg ,e);}}}

后置处理监听队列名称

  • 使用BeanPostProcessor拦截所有使用了@RabbitListener的bean,反射获取队列名,当队列名以#dynamic#开头时,拼入当前机器IP
  • 注意,这个类的order为 2147483646,是因为spring整合rabbitMq时,注册监听队列用的后置处理类RabbitListenerAnnotationBeanPostProcessor 的order为2147483647,我们必须在注册之前将队列名称修改
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.annotation.RabbitListeners;importorg.springframework.aop.support.AopUtils;importorg.springframework.beans.BeansException;importorg.springframework.beans.factory.BeanFactory;importorg.springframework.beans.factory.BeanFactoryAware;importorg.springframework.beans.factory.config.BeanPostProcessor;importorg.springframework.cloud.commons.util.InetUtils;importorg.springframework.core.Ordered;importorg.springframework.core.annotation.AnnotationUtils;importorg.springframework.stereotype.Component;importorg.springframework.util.CollectionUtils;importjava.lang.reflect.Field;importjava.lang.reflect.InvocationHandler;importjava.lang.reflect.Proxy;importjava.util.*;@Component@Slf4jpublicclassRabbitListenerDynamicQueueBeanPostProcessorimplementsBeanPostProcessor,Ordered,BeanFactoryAware{privateBeanFactory beanFactory;@OverridepublicObjectpostProcessBeforeInitialization(Object bean,String beanName)throwsBeansException{Class<?> targetClass =AopUtils.getTargetClass(bean);Collection<RabbitListener> listenerAnnotations =findListenerAnnotations(targetClass);if(!CollectionUtils.isEmpty(listenerAnnotations)){for(RabbitListener rabbitListener : listenerAnnotations){boolean isDynamic =false;String[] queues = rabbitListener.queues();if(queues.length >0){for(int i =0; i < queues.length;++i){String queue = queues[i];if(queue.startsWith("#dynamic#")){
                            queues[i]=resolveDynamicQueueName(queues[i]);
                            isDynamic =true;}}}if(isDynamic){try{InvocationHandler invocationHandler =Proxy.getInvocationHandler(rabbitListener);Field memberValues = invocationHandler.getClass().getDeclaredField("memberValues");
                        memberValues.setAccessible(true);HashMap memberValuesValue =(HashMap)memberValues.get(invocationHandler);
                        memberValuesValue.put("queues", queues);}catch(Exception e){
                        log.error("RabbitListenerDynamicQueueBeanPostProcessor can't process dynamic queue.", e);}}}}return bean;}privateStringresolveDynamicQueueName(String queue){InetUtils inetUtils =this.beanFactory.getBean(InetUtils.class);InetUtils.HostInfo firstNonLoopbackHostInfo = inetUtils.findFirstNonLoopbackHostInfo();String ipAddress = firstNonLoopbackHostInfo.getIpAddress();
        queue = queue.concat(ipAddress);
        log.info("RabbitListenerDynamicQueueBeanPostProcessor modify queue:{}", queue);return queue;}privateCollection<RabbitListener>findListenerAnnotations(Class<?> clazz){Set<RabbitListener> listeners =newHashSet<>();RabbitListener ann =AnnotationUtils.findAnnotation(clazz,RabbitListener.class);if(ann !=null){
            listeners.add(ann);}RabbitListeners anns =AnnotationUtils.findAnnotation(clazz,RabbitListeners.class);if(anns !=null){Collections.addAll(listeners, anns.value());}return listeners;}@OverridepublicObjectpostProcessAfterInitialization(Object bean,String beanName)throwsBeansException{return bean;}@OverridepublicintgetOrder(){return2147483646;}@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory)throwsBeansException{this.beanFactory = beanFactory;}}

编写消息生产者类

生产者发送消息时只需要指定广播交换机名称即可

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassMsgFanoutProvider{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidfanoutMsg(){String msg ="hello world.";try{
            rabbitTemplate.convertAndSend(QueueExchangeBindConfig.DYNAMIC_HOST_NAME_EXCHANGE,QueueExchangeBindConfig.DYNAMIC_HOST_NAME_ROUTING, msg);}catch(Exception e){
            log.error("MsgFanoutProvider failed to publish fanout msg:{}", msg, e);}}}

至此,一个基于rabbitMq优雅的多实例广播方案就实现啦。


本文转载自: https://blog.csdn.net/v_lquanlin/article/details/128728711
版权归原作者 铨✌� 所有, 如有侵权,请联系我们删除。

“SpringBoot整合RabbitMq广播模式动态队列名称”的评论:

还没有评论