手写超级好用的rabbitmq-spring-boot-start启动器
文章目录
1.前言
由于springBoot官方提供的默认的rabbitMq自动装配不是那么好用,一个项目中只能配置使用一个rabbitMq的服务器,队列也需要编码的方式定义,这种繁杂且不易使用,用一次需要写一次硬编码,之前有一个想法是,能不能使用springBoot官方提供的自动装配实现一个多rabbitMq多队列配置并且支持多种延迟队列的这种多对多关系的实现,但是左思右想,springBoot官方提供的这个rabbitMq自动装配不能满足我的需求,所以我在酝酿了很久,也把官方那个自动装配的源码看了一遍又一遍,随着我之前手写实现了好几个starter启动器,然后就想实现一个rabbitMq的starter,只要简单的配置即可轻松的实现上面的功能,然后提供了一套好用的api,使用的时候只需要在项目中引入这个启动器,节省很大的硬编码和配置灵活,配置改变只需要重启项目即可,对业务使用友好的一个starter启动器,再也不用为如何使用rabbitMq的集成而烦恼了,只需要简单的配置就可以实现好用的功能,让我们把精力放在业务上,而不是代码和代码集成上,大大的提升开发效率和节省我们宝贵的时间,让我们用宝贵的时间来享受时光,生命和生活,效率至上,远离加班,简约也简单,优雅也优美,简单配置就可以实现交换机、队列、绑定关系等根据配置自动装配,然后实现发送普通消息和3种延迟队列发送延迟消息,3中延迟队列实现如下:
一:延迟插件实现延迟队列
交换机类型必须CustomExchange
二:TTL + 死信队列/延迟交换机实现延迟队列
三: 延迟交换机 + 消息设置setHeader(“x-delay”, xxx)
还可以配置相同的rabbitMq服务器不同的虚拟机,单独配置,遵循下标递增不重复即可
实现思路如下。
2.工程目录结构
3.主要实现原理
3.1spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.zlf.config.RabbitConfig,\
com.zlf.config.ExchangeQueueConfig,\
com.zlf.starter.RabbitAutoConfiguration
3.2EnableZlfRabbitMq配置
packagecom.zlf.starter;importorg.springframework.context.annotation.Import;importjava.lang.annotation.Documented;importjava.lang.annotation.ElementType;importjava.lang.annotation.Inherited;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;/**
* 使用需要在启动类上加入@EnableZlfRabbit注解
* 启动类上排除默认的自动装配RabbitAutoConfiguration
*
* @author zlf
* 启动类上加入如下配置
* @SpringBootApplication(exclude = {
* RabbitAutoConfiguration.class})
* @Import(value = {RabbitService.class, ZlfMqSpringUtils.class})
*/@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(ZlfRabbitMqRegistrar.class)public@interfaceEnableZlfRabbitMq{}
3.3RabbitAutoConfiguration配置
packagecom.zlf.starter;importorg.springframework.amqp.rabbit.annotation.EnableRabbit;importorg.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;importorg.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;importorg.springframework.context.annotation.Configuration;@Configuration(proxyBeanMethods =false)@EnableRabbit@ConditionalOnMissingBean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)publicclassRabbitAutoConfiguration{}
3.4ZlfRabbitMqRegistrar配置
packagecom.zlf.starter;importcn.hutool.core.collection.CollectionUtil;importcom.alibaba.fastjson.JSON;importcom.rabbitmq.client.Channel;importcom.zlf.config.ExchangeQueueConfig;importcom.zlf.config.ExchangeQueueProperties;importcom.zlf.config.RabbitConfig;importcom.zlf.config.RabbitProperties;importcom.zlf.config.RabbitProperties.AmqpContainer;importcom.zlf.config.RabbitProperties.Cache;importcom.zlf.config.RabbitProperties.Cache.Connection;importcom.zlf.config.RabbitProperties.ContainerType;importcom.zlf.config.RabbitProperties.DirectContainer;importcom.zlf.config.RabbitProperties.ListenerRetry;importcom.zlf.config.RabbitProperties.Retry;importcom.zlf.config.RabbitProperties.SimpleContainer;importcom.zlf.config.RabbitProperties.Template;importcom.zlf.constants.ErrorExchangeQueueInfo;importcom.zlf.dto.ExchangeQueueDto;importcom.zlf.enums.DelayTypeEnum;importcom.zlf.enums.ExchangeTypeEnum;importcom.zlf.enums.FunctionTypeEnum;importcom.zlf.service.RabbitService;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.amqp.core.AbstractExchange;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.config.RetryInterceptorBuilder;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;importorg.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.amqp.rabbit.support.ValueExpression;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.BeansException;importorg.springframework.beans.MutablePropertyValues;importorg.springframework.beans.factory.BeanFactory;importorg.springframework.beans.factory.BeanFactoryAware;importorg.springframework.beans.factory.config.ConfigurableBeanFactory;importorg.springframework.beans.factory.config.ConstructorArgumentValues;importorg.springframework.beans.factory.support.BeanDefinitionRegistry;importorg.springframework.beans.factory.support.RootBeanDefinition;importorg.springframework.boot.autoconfigure.condition.ConditionalOnClass;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.boot.context.properties.EnableConfigurationProperties;importorg.springframework.boot.context.properties.PropertyMapper;importorg.springframework.boot.context.properties.bind.Binder;importorg.springframework.context.EnvironmentAware;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.ImportBeanDefinitionRegistrar;importorg.springframework.core.annotation.AnnotationUtils;importorg.springframework.core.env.Environment;importorg.springframework.core.type.AnnotationMetadata;importorg.springframework.retry.backoff.ExponentialBackOffPolicy;importorg.springframework.retry.policy.SimpleRetryPolicy;importorg.springframework.retry.support.RetryTemplate;importorg.springframework.util.CollectionUtils;importjava.time.Duration;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Objects;/**
* spring:
* rabbitmq:
* listener:
* simple:
* acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
* manual:手动ack,需要在业务代码结束后,调用api发送ack。
* auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
* none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
* <p>
* 原文链接:https://blog.csdn.net/m0_53142956/article/details/127792054
*
* @author zlf
*/@Slf4j@Configuration(proxyBeanMethods =false)@ConditionalOnClass({RabbitTemplate.class,Channel.class})@EnableConfigurationProperties(RabbitConfig.class)publicclassZlfRabbitMqRegistrarimplementsImportBeanDefinitionRegistrar,EnvironmentAware,BeanFactoryAware{privateBeanFactory beanFactory;privateRabbitConfig rabbitConfig;privateExchangeQueueConfig exchangeQueueConfig;@SneakyThrows@OverridepublicvoidregisterBeanDefinitions(AnnotationMetadata annotationMetadata,BeanDefinitionRegistry beanDefinitionRegistry){List<RabbitProperties> rps = rabbitConfig.getRps();if(CollectionUtils.isEmpty(rps)){thrownewRuntimeException("rabbitMq的rps配置不为空,请检查配置!");}
log.info("zlf.registerBeanDefinitions:rps.size:{},rps:{}", rps.size(),JSON.toJSONString(rps));List<ExchangeQueueProperties> eqps = exchangeQueueConfig.getEqps();if(CollectionUtils.isEmpty(eqps)){thrownewRuntimeException("rabbitMq的eqps配置不为空,请检查配置!");}
log.info("zlf.registerBeanDefinitions:eqps.size:{},rps:{}", eqps.size(),JSON.toJSONString(eqps));for(int i =0; i < rps.size(); i++){this.checkRabbitProperties(rps.get(i));CachingConnectionFactory cachingConnectionFactory =newCachingConnectionFactory(getRabbitConnectionFactoryBean(rps.get(i)).getObject());
cachingConnectionFactory.setAddresses(rps.get(i).determineAddresses());
cachingConnectionFactory.setPublisherReturns(rps.get(i).getPublisherReturns());
cachingConnectionFactory.setPublisherConfirmType(rps.get(i).getPublisherConfirmType());Cache.Channel channel = rps.get(i).getCache().getChannel();if(Objects.nonNull(channel.getSize())){
cachingConnectionFactory.setChannelCacheSize(channel.getSize());}if(Objects.nonNull(channel.getCheckoutTimeout())){Duration checkoutTimeout = channel.getCheckoutTimeout();
cachingConnectionFactory.setChannelCheckoutTimeout(checkoutTimeout.toMillis());}Connection connection = rps.get(i).getCache().getConnection();if(Objects.nonNull(connection.getMode())){
cachingConnectionFactory.setCacheMode(connection.getMode());}if(Objects.nonNull(connection.getSize())){
cachingConnectionFactory.setConnectionCacheSize(connection.getSize());}// 注册cachingConnectionFactory的bean((ConfigurableBeanFactory)this.beanFactory).registerSingleton(CachingConnectionFactory.class.getName()+ i, cachingConnectionFactory);
log.info("zlf.ConfigurableBeanFactory注册完成,beanName:{}",CachingConnectionFactory.class.getName()+ i);// 注册RabbitAdmin的beanRabbitAdmin rabbitAdmin =newRabbitAdmin(cachingConnectionFactory);((ConfigurableBeanFactory)this.beanFactory).registerSingleton(RabbitAdmin.class.getName()+ i, rabbitAdmin);
log.info("zlf.RabbitAdmin注册完成,beanName:{}",RabbitAdmin.class.getName()+ i);//构建发送的RabbitTemplate实例关联连接工厂Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();RabbitTemplate rabbitTemplate =newRabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);Template template = rps.get(i).getTemplate();ConfirmType publisherConfirmType = rps.get(i).getPublisherConfirmType();
log.info("第{}个配置的publisherConfirmType:{}", i,JSON.toJSONString(publisherConfirmType));//生产者confirm/**
* publish-confirm-type:开启publisher-confirm,这里支持两种类型:
* simple:【同步】等待confirm结果,直到超时(可能引起代码阻塞)
* correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
* publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
* template.mandatory:
* 定义当消息从交换机路由到队列失败时的策略。
* 【true,则调用ReturnCallback;false:则直接丢弃消息】
*/if(ConfirmType.CORRELATED.equals(publisherConfirmType)){
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(Objects.nonNull(correlationData)){if(Objects.nonNull(ack)&& ack){
log.info("消息发送成功->correlationData:{}",JSON.toJSONString(correlationData));}elseif(StringUtils.isNotBlank(cause)){
log.error("消息->correlationData:{}->发送失败原因->{}",JSON.toJSONString(correlationData), cause);}}if(Objects.nonNull(ack)&& ack){
log.info("消息发送成功ack:{}", ack);}if(StringUtils.isNotBlank(cause)){
log.error("消息发送失败原因->cause:{}", cause);}if(Objects.isNull(correlationData)&&Objects.isNull(ack)&&StringUtils.isEmpty(cause)){
log.info("消息发送成功,收到correlationData,ack,cause都是null");}});}Boolean publisherReturns = rps.get(i).getPublisherReturns();Boolean mandatory = template.getMandatory();
log.info("第{}个配置的publisherReturns:{},mandatory:{}", i, publisherReturns, mandatory);//消息回调//开启强制回调if(mandatory && publisherReturns){
rabbitTemplate.setMandatory(template.getMandatory());
rabbitTemplate.setMandatoryExpression(newValueExpression<>(template.getMandatory()));//设置消息回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
log.error("消息->{}路由失败", message);// 如果有需要的话,重发消息});}Retry retry = rps.get(i).getTemplate().getRetry();if(retry.isEnabled()){RetryTemplate retryTemplate =newRetryTemplate();SimpleRetryPolicy policy =newSimpleRetryPolicy();
retryTemplate.setRetryPolicy(policy);
policy.setMaxAttempts(retry.getMaxAttempts());ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy();
backOffPolicy.setMultiplier(retry.getMultiplier());if(Objects.nonNull(retry.getMaxInterval())){
backOffPolicy.setMaxInterval(retry.getMaxInterval().toMillis());}
rabbitTemplate.setRetryTemplate(retryTemplate);}Duration receiveTimeout = template.getReceiveTimeout();if(Objects.nonNull(receiveTimeout)){
rabbitTemplate.setReceiveTimeout(receiveTimeout.toMillis());}Duration replyTimeout = template.getReplyTimeout();if(Objects.nonNull(replyTimeout)){
rabbitTemplate.setReplyTimeout(replyTimeout.toMillis());}String exchange = template.getExchange();if(StringUtils.isNotBlank(exchange)){
rabbitTemplate.setExchange(exchange);}String routingKey = template.getRoutingKey();if(StringUtils.isNotBlank(routingKey)){
rabbitTemplate.setRoutingKey(routingKey);}String defaultReceiveQueue = template.getDefaultReceiveQueue();if(StringUtils.isNotBlank(defaultReceiveQueue)){
rabbitTemplate.setDefaultReceiveQueue(defaultReceiveQueue);}((ConfigurableBeanFactory)this.beanFactory).registerSingleton(RabbitTemplate.class.getName()+ i, rabbitTemplate);
log.info("zlf.RabbitTemplate注册完成,beanName:{}",RabbitTemplate.class.getName()+ i);// 不注册RabbitServiceRabbitService rabbitService =newRabbitService();//构建监听工厂实例并注入ContainerType type = rps.get(i).getListener().getType();if(ContainerType.SIMPLE.equals(type)){Map<String,String> errorExchangeQueueRelationship =this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);SimpleContainer simple = rps.get(i).getListener().getSimple();ConstructorArgumentValues cas3 =newConstructorArgumentValues();MutablePropertyValues values3 =newMutablePropertyValues();this.getAmqpContainer(simple, values3, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);if(Objects.nonNull(simple.getConcurrency())){
values3.add("concurrentConsumers", simple.getConcurrency());}if(Objects.nonNull(simple.getMaxConcurrency())){
values3.add("maxConcurrentConsumers", simple.getMaxConcurrency());}if(Objects.nonNull(simple.getBatchSize())){
values3.add("batchSize", simple.getBatchSize());}RootBeanDefinition rootBeanDefinition3 =newRootBeanDefinition(SimpleRabbitListenerContainerFactory.class, cas3, values3);
beanDefinitionRegistry.registerBeanDefinition(SimpleRabbitListenerContainerFactory.class.getName()+ i, rootBeanDefinition3);
log.info("zlf.SimpleRabbitListenerContainerFactory注册完成,beanName:{}",SimpleRabbitListenerContainerFactory.class.getName()+ i);}elseif(ContainerType.DIRECT.equals(type)){Map<String,String> errorExchangeQueueRelationship =this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);DirectContainer direct = rps.get(i).getListener().getDirect();ConstructorArgumentValues cas4 =newConstructorArgumentValues();MutablePropertyValues values4 =newMutablePropertyValues();this.getAmqpContainer(direct, values4, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);if(Objects.nonNull(direct.getConsumersPerQueue())){
values4.add("consumersPerQueue", direct.getConsumersPerQueue());}RootBeanDefinition rootBeanDefinition4 =newRootBeanDefinition(DirectRabbitListenerContainerFactory.class, cas4, values4);
beanDefinitionRegistry.registerBeanDefinition(DirectRabbitListenerContainerFactory.class.getName()+ i, rootBeanDefinition4);
log.info("zlf.DirectRabbitListenerContainerFactory注册完成,beanName:{}",DirectRabbitListenerContainerFactory.class.getName()+ i);}//解析注册交换机、队列和绑定关系ExchangeQueueProperties exchangeQueueProperties = eqps.get(i);
log.info("zlf.exchangeQueueProperties:{}",JSON.toJSONString(exchangeQueueProperties));Integer index = exchangeQueueProperties.getIndex();
log.info("zlf.exchangeQueueProperties.index:{}", index);if(Objects.isNull(index)){thrownewRuntimeException("exchangeQueueProperties.index不为空");}if(Objects.nonNull(exchangeQueueProperties)){
log.info("zlf.exchangeQueueProperties:{}",JSON.toJSONString(exchangeQueueProperties));List<ExchangeQueueDto> eqs = exchangeQueueProperties.getEqs();if(CollectionUtil.isNotEmpty(eqs)){for(int k =0; k < eqs.size(); k++){String bindingIndex = index.toString()+ k;
log.info("zlf.bindingIndex:{}", bindingIndex);ExchangeQueueDto exchangeQueueDto = eqs.get(k);
log.info("zlf.exchangeQueueDto:{}",JSON.toJSONString(exchangeQueueDto));String functionType = exchangeQueueDto.getFunctionType();
log.info("zlf.functionType:{}", functionType);if(FunctionTypeEnum.NORMAL.getFunctionType().equals(functionType)){this.createRelationship(FunctionTypeEnum.NORMAL, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex,false);}elseif(FunctionTypeEnum.DELAY.getFunctionType().equals(functionType)){Integer delayType = exchangeQueueDto.getDelayType();
log.info("zlf.delayType:{}", delayType);if(DelayTypeEnum.ONE.getDelayType().equals(delayType)){//延迟插件实现延迟队列String exchangeType = exchangeQueueDto.getExchangeType();if(!ExchangeTypeEnum.CUSTOM.getExchangeType().equals(exchangeType)){thrownewRuntimeException("延迟插件实现延迟队列交换机类型exchangeType必须定义为:custom");}this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex,false);}elseif(DelayTypeEnum.TWO.getDelayType().equals(delayType)){//TTL + 死信队列实现延迟队列this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex,false);}elseif(DelayTypeEnum.THREE.getDelayType().equals(delayType)){//延迟交换机 + 消息设置setHeader("x-delay", xxx)this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex,true);}}}}}}}/**
* 检查rabbitProperties配置的主要参数
*
* @param rabbitProperties
*/privatevoidcheckRabbitProperties(RabbitProperties rabbitProperties){String virtualHost = rabbitProperties.getVirtualHost();if(StringUtils.isEmpty(virtualHost)){thrownewRuntimeException("RabbitProperties.virtualHost不为空");}String addresses = rabbitProperties.getAddresses();if(StringUtils.isEmpty(addresses)){thrownewRuntimeException("RabbitProperties.addresses不为空");}Integer port = rabbitProperties.getPort();if(Objects.isNull(port)){thrownewRuntimeException("RabbitProperties.port不为空");}String username = rabbitProperties.getUsername();if(StringUtils.isEmpty(username)){thrownewRuntimeException("RabbitProperties.username不为空");}String password = rabbitProperties.getPassword();if(StringUtils.isEmpty(password)){thrownewRuntimeException("RabbitProperties.password不为空");}}/**
* 创建关系
*
* @param functionTypeEnum
* @param exchangeQueueDto
* @param rabbitService
* @param rabbitAdmin
* @param bindingIndex
*/privatevoidcreateRelationship(FunctionTypeEnum functionTypeEnum,ExchangeQueueDto exchangeQueueDto,RabbitService rabbitService,RabbitAdmin rabbitAdmin,String bindingIndex,Boolean isDelayed){String exchangeName = exchangeQueueDto.getExchangeName();String exchangeType = exchangeQueueDto.getExchangeType();HashMap<String,Object> exchangeArgs = exchangeQueueDto.getExchangeArgs();
log.info("zlf"+ functionTypeEnum.getFunctionType()+".exchangeName:{},exchangeType:{},exchangeArgs:{}", exchangeName, exchangeType,JSON.toJSONString(exchangeArgs));AbstractExchange exchange1 = rabbitService.createExchange(rabbitAdmin, exchangeName, exchangeType, exchangeArgs, isDelayed);
exchangeName = exchangeName + bindingIndex;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(exchangeName, exchange1);
log.info("zlf."+ functionTypeEnum.getFunctionType()+".Exchange注册完成,beanName:{}", exchangeName);String queueName = exchangeQueueDto.getQueueName();HashMap<String,Object> queueArgs = exchangeQueueDto.getQueueArgs();String routingKey1 = exchangeQueueDto.getRoutingKey();
log.info("zlf."+ functionTypeEnum.getFunctionType()+".queueName:{},queueArgs:{},routingKey1:{}", queueName,JSON.toJSONString(queueArgs), routingKey1);Queue queue = rabbitService.createQueue(rabbitAdmin, queueName, queueArgs);
queueName = queueName + bindingIndex;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(queueName, queue);
log.info("zlf."+ functionTypeEnum.getFunctionType()+".Queue注册完成,beanName:{}", queueName);Binding binding = rabbitService.binding(rabbitAdmin, exchange1, queue, routingKey1);((ConfigurableBeanFactory)this.beanFactory).registerSingleton(Binding.class.getName()+ bindingIndex, binding);
log.info("zlf."+ functionTypeEnum.getFunctionType()+".Binding注册完成bindingIndex:{},beanName:{}", bindingIndex,Binding.class.getName()+ bindingIndex);Integer delayType = exchangeQueueDto.getDelayType();if(DelayTypeEnum.TWO.getDelayType().equals(delayType)){String dlxExchangeName = exchangeQueueDto.getDlxExchangeName();if(StringUtils.isEmpty(dlxExchangeName)){thrownewRuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeName不为空!");}String dlxExchangeType = exchangeQueueDto.getDlxExchangeType();if(StringUtils.isEmpty(dlxExchangeType)){thrownewRuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeType不为空!");}AbstractExchange exchange2 = rabbitService.createExchange(rabbitAdmin, dlxExchangeName, dlxExchangeType, exchangeArgs,false);
dlxExchangeName = dlxExchangeName + bindingIndex;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(dlxExchangeName, exchange2);
log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机注册完成,beanName:{}", dlxExchangeName);String dlxQueueName = exchangeQueueDto.getDlxQueueName();Queue queue2 = rabbitService.createQueue(rabbitAdmin, dlxQueueName,null);
dlxQueueName = dlxQueueName + bindingIndex;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(dlxQueueName, queue2);
log.info("zlf.TTL + 死信队列实现延迟队列,死信队列注册完成,beanName:{}", dlxQueueName);String dlxKey = exchangeQueueDto.getDlxKey();Binding binding2 = rabbitService.binding(rabbitAdmin, exchange2, queue2, dlxKey);String dlxBeanName ="dlx"+Binding.class.getName()+ bindingIndex +1;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(dlxBeanName, binding2);
log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机绑定队列的绑定关系注册完成,beanName:{}", dlxBeanName);}}privateMap<String,String>createErrorExchangeQueueRelationship(String index,RabbitService rabbitService,RabbitAdmin rabbitAdmin){Map<String,String> resultMap =newHashMap<>();String exchangeName =ErrorExchangeQueueInfo.ERROR_EXCHANGE_PREFIX+ index;AbstractExchange exchange = rabbitService.createExchange(rabbitAdmin, exchangeName,ErrorExchangeQueueInfo.ERROR_EXCHANGE_TYPE,null,false);((ConfigurableBeanFactory)this.beanFactory).registerSingleton(exchangeName, exchange);
log.info("zlf.ErrorExchange注册完成,beanName:{}", exchangeName);String queueName =ErrorExchangeQueueInfo.ERROR_QUEUE_PREFIX+ index;Queue queue = rabbitService.createQueue(rabbitAdmin, queueName,null);((ConfigurableBeanFactory)this.beanFactory).registerSingleton(queueName, queue);
log.info("zlf.ErrorQueue注册完成,beanName:{}", queueName);String errorRoutingKey =ErrorExchangeQueueInfo.ERROR_KEY_PREFIX+ index;
log.info("zlf.errorRoutingKey:{}", errorRoutingKey);Binding errorBinding = rabbitService.binding(rabbitAdmin, exchange, queue, errorRoutingKey);String errorBingBeanName =ErrorExchangeQueueInfo.ERROR_BINDING_BANE_NAME_PREFIX+Binding.class.getSimpleName()+ index;((ConfigurableBeanFactory)this.beanFactory).registerSingleton(errorBingBeanName, errorBinding);
log.info("zlf.ErrorBing注册完成,beanName:{}", errorBingBeanName);
resultMap.put("errorExchange", exchangeName);
resultMap.put("errorRoutingKey", errorRoutingKey);return resultMap;}privatevoidgetAmqpContainer(AmqpContainer amqpContainer,MutablePropertyValues values,CachingConnectionFactory cachingConnectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter,RabbitTemplate rabbitTemplate,Map<String,String> errorExchangeQueueRelationship){
values.add("connectionFactory", cachingConnectionFactory);
values.add("autoStartup", amqpContainer.isAutoStartup());
values.add("messageConverter", jackson2JsonMessageConverter);if(Objects.nonNull(amqpContainer.getAcknowledgeMode())){
values.add("acknowledgeMode", amqpContainer.getAcknowledgeMode());}if(Objects.nonNull(amqpContainer.getPrefetch())){
values.add("prefetch", amqpContainer.getPrefetch());}if(Objects.nonNull(amqpContainer.getDefaultRequeueRejected())){
values.add("defaultRequeueRejected", amqpContainer.getDefaultRequeueRejected());}if(Objects.nonNull(amqpContainer.getIdleEventInterval())){
values.add("idleEventInterval", amqpContainer.getIdleEventInterval());}
values.add("missingQueuesFatal", amqpContainer.isMissingQueuesFatal());ListenerRetry retry2 = amqpContainer.getRetry();if(retry2.isEnabled()){RetryInterceptorBuilder<?,?> builder =(retry2.isStateless())?RetryInterceptorBuilder.stateless():RetryInterceptorBuilder.stateful();RetryTemplate retryTemplate =newRetryTemplate();SimpleRetryPolicy policy =newSimpleRetryPolicy();
retryTemplate.setRetryPolicy(policy);
policy.setMaxAttempts(retry2.getMaxAttempts());ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy();
backOffPolicy.setMultiplier(retry2.getMultiplier());if(Objects.nonNull(retry2.getMaxInterval())){
backOffPolicy.setMaxInterval(retry2.getMaxInterval().toMillis());}
builder.retryOperations(retryTemplate);/**
* 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
*
* RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
* ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
* RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
*///消息接受拒绝后发送到异常队列String errorExchange = errorExchangeQueueRelationship.get("errorExchange");String errorRoutingKey = errorExchangeQueueRelationship.get("errorRoutingKey");MessageRecoverer recoverer =newRepublishMessageRecoverer(rabbitTemplate, errorExchange, errorRoutingKey);
log.info("zlf.MessageRecoverer.errorExchange:{},errorRoutingKey:{}", errorExchange, errorRoutingKey);
builder.recoverer(recoverer);
values.add("adviceChain", builder.build());}}privateRabbitConnectionFactoryBeangetRabbitConnectionFactoryBean(RabbitProperties properties)throwsException{PropertyMapper map =PropertyMapper.get();RabbitConnectionFactoryBean factory =newRabbitConnectionFactoryBean();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
map.from(properties::determinePort).to(factory::setPort);
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
map.from(properties::getRequestedChannelMax).to(factory::setRequestedChannelMax);RabbitProperties.Ssl ssl = properties.getSsl();if(ssl.determineEnabled()){
factory.setUseSSL(true);
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
map.from(ssl::getKeyStore).to(factory::setKeyStore);
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
map.from(ssl::getTrustStore).to(factory::setTrustStore);
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
map.from(ssl::isValidateServerCertificate).to((validate)-> factory.setSkipServerCertificateValidation(!validate));
map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);}
map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();return factory;}@OverridepublicvoidsetEnvironment(Environment environment){// 通过Binder将environment中的值转成对象
rabbitConfig =Binder.get(environment).bind(getPropertiesPrefix(RabbitConfig.class),RabbitConfig.class).get();
exchangeQueueConfig =Binder.get(environment).bind(getPropertiesPrefix(ExchangeQueueConfig.class),ExchangeQueueConfig.class).get();}privateStringgetPropertiesPrefix(Class<?> tClass){returnObjects.requireNonNull(AnnotationUtils.getAnnotation(tClass,ConfigurationProperties.class)).prefix();}@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory)throwsBeansException{this.beanFactory = beanFactory;}}
4.总结
到此,手写rabbitMq的starter实现思路就已经全部分享完了,思路比代码更重要,代码只是一个参考,用这个思路实现更多更方便简单快捷高效的轮子,制造轮子也是一种提升和给你带来成就感的事情,累并快乐着,后面我会将我之前手写的starter全部开源出来,然后将gitHub或码云地址分享给大家,制造轮子,开源给大家使用,这本身就是一种开源的精神和乐趣,Java生态最缺的就不是轮子是好用的轮子,请在看我的文章或者是转发,请把本文的原出处和作者写上去,尊重版权,创作不易,禁止原模原样的搬过去就是自己的原创,这种是不道德的行为,见到请如实举报,或者联系本作者来举报,这个starter,说实话也是构思酝酿了好久,猛干了2-3天才干出来,颈椎都干的酸,我得休息加强锻炼身体了,说实话写这个starter还是挺累的,但是搞出来的成就感满满,也方便以后集成快速使用,配置多个的rabbitMq服务器也测试过了的,也是ok的,但是配置一个rabbitMq和多个交换机、队列和绑定关系以及3种延迟队列实现是亲自测试OK的,希望我的分享能给你帮助和启发,请一键三连,么么么哒!
版权归原作者 大飞哥~BigFei 所有, 如有侵权,请联系我们删除。