一 引入
在我们的日常开发中,消息中间件已经成为了java研发工程师的一项必备技能,本文主要是基于对springboot原生组件的扩展开发,基于模板设计模式和静态代理模式,简化了队列路由的绑定,交由公共模板进行统一的绑定,并在公用模板中保证了消息的幂等性和消息的可靠性投递,将这些类似的代码抽离出来,让开发者只专注于业务逻辑的开发.
整体实现思路:
- 开发者申明路由交换机等基础元数据后,交由元数据解析器完成交换机,路由的申明及相应关系的绑定
- 开发者申明消息监听器后,通过消息侦听容器完成队列与监听器的绑定
- 开发者通过消息发射器,发布消息后,系统通过不通模板完成消息的发送
二 逻辑实现
2.1 元数据解析器的构建
申明基础信息接口
/**
* 用于定义交换机队列等核心参数 便于队列交换机等初始化
* @author likun
* @date 2022/6/17 11:22
*/publicinterfaceMessageMetaData{/**
* 获取队列名称
* @return
*/StringgetQueue();/**
* 交换机类型
* @return
*/ExchangeTypeEnumgetExchangeType();/**
* 队列配置
* @return
*/defaultMap<String,Object>getQueueArgs(){returnnull;};/**
* 交换机配置
* @return
*/defaultMap<String,Object>getExchangeArgs(){returnnull;};/**
* 消息扩展属性
* @return
*/defaultMessagePropertiesgetMessageProperties(){returnnull;};defaultvoidsetMessageProperties(MessageProperties messageProperties){};}
定义不同的元数据模板
publicabstractclassFanoutMessageMetaDataimplementsMessageMetaData{privateMessageProperties messageProperties =null;@OverridepublicExchangeTypeEnumgetExchangeType(){returnExchangeTypeEnum.FANOUT;}/**
* 交换机名称
* @return
*/abstractpublicStringgetExchange();@OverridepublicMessagePropertiesgetMessageProperties(){returnthis.messageProperties;}@OverridepublicvoidsetMessageProperties(MessageProperties messageProperties){this.messageProperties=messageProperties;}}publicabstractclassDirectMessageMetadataimplementsMessageMetaData{privateMessageProperties messageProperties;@OverridepublicExchangeTypeEnumgetExchangeType(){returnExchangeTypeEnum.DIRECT;}/**
* 交换机名称
* @return
*/abstractpublicStringgetExchange();@OverridepublicMessagePropertiesgetMessageProperties(){returnthis.messageProperties;}@OverridepublicvoidsetMessageProperties(MessageProperties messageProperties){this.messageProperties=messageProperties;}}
申明元数据解析接口,定义解析规范
/**
* 判断是否支持当前交换机类型
* @author likun
* @date 2022/6/17 11:44
*/publicinterfaceSupport{Booleansupport(ExchangeTypeEnum exchangeTypeEnum);}/**
* 核心参数解析器 解析核心参数并完成队列交换机的绑定
* @author likun
* @date 2022/6/17 11:42
*/publicinterfaceMessageMetaDataResolverextendsSupport{/**
* 解析核心参数
* @param messageMetaData
*/voidresolve(MessageMetaData messageMetaData);}
定义不同模板的解析器
@RequiredArgsConstructor@Slf4jpublicabstractclassAbstractMessageMetadataResolverimplementsMessageMetaDataResolver{privatefinalRabbitAdmin rabbitAdmin;@Overridepublicvoidresolve(MessageMetaData messageMetaData){if(support(messageMetaData.getExchangeType())){doResolve(messageMetaData);}}/**
* 下游子类实现
* @param messageMetaData
*/abstractvoiddoResolve(MessageMetaData messageMetaData);/**
* 申明队列
*/publicvoiddeclareQueue(Queue queue){
rabbitAdmin.declareQueue(queue);
log.info("queue [{}] declared.",queue);}publicvoiddeclareExchange(Exchange exchange){
rabbitAdmin.declareExchange(exchange);
log.info("exchange [{}] declared.",exchange);}publicvoiddeclareBinding(Binding binding){
rabbitAdmin.declareBinding(binding);
log.info("binding [{}] declared.",binding);}}publicclassDirectMessageMetadataResolverextendsAbstractMessageMetadataResolver{publicDirectMessageMetadataResolver(RabbitAdmin rabbitAdmin){super(rabbitAdmin);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returnExchangeTypeEnum.DIRECT.equals(exchangeTypeEnum);}@OverridevoiddoResolve(MessageMetaData messageMetaData){DirectMessageMetadata directMessageMetadata =(DirectMessageMetadata) messageMetaData;// 申明队列Queue queue =newQueue(directMessageMetadata.getQueue(),true,false,false, directMessageMetadata.getQueueArgs());declareQueue(queue);// 申明交换机DirectExchange exchange =newDirectExchange(directMessageMetadata.getExchange(),true,false, directMessageMetadata.getExchangeArgs());declareExchange(exchange);// 交换机绑定队列Binding binding =BindingBuilder.bind(queue).to(exchange).withQueueName();declareBinding(binding);}}publicclassFanoutMessageMetaDateResolverextendsAbstractMessageMetadataResolver{publicFanoutMessageMetaDateResolver(RabbitAdmin rabbitAdmin){super(rabbitAdmin);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returnExchangeTypeEnum.FANOUT.equals(exchangeTypeEnum);}@OverridevoiddoResolve(MessageMetaData messageMetaData){FanoutMessageMetaDataFanoutMessageMetaData=(FanoutMessageMetaData) messageMetaData;// 申明队列Queue queue =newQueue(FanoutMessageMetaData.getQueue(),true,false,false,FanoutMessageMetaData.getQueueArgs());declareQueue(queue);// 申明交换机FanoutExchange exchange =newFanoutExchange(FanoutMessageMetaData.getExchange(),true,false,FanoutMessageMetaData.getExchangeArgs());declareExchange(exchange);// 队列绑定交换机Binding binding =BindingBuilder.bind(queue).to(exchange);declareBinding(binding);}}
申明静态代理器
/**
* 委派解析器
* @author likun
* @date 2022年06月17日 14:05
*/@Slf4jpublicclassDelegatingMessageMetaDataResolverimplementsMessageMetaDataResolver{privatefinalMap<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap =newConcurrentHashMap<>();publicDelegatingMessageMetaDataResolver(Map<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap){this.messageMetaDataResolverMap.putAll(messageMetaDataResolverMap);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returntrue;}@Overridepublicvoidresolve(MessageMetaData messageMetaData){if(messageMetaData==null){
log.error("metaDate resolve must have messageMetaData but is null");return;}MessageMetaDataResolver messageMetaDataResolver = messageMetaDataResolverMap.get(messageMetaData.getExchangeType());if(messageMetaDataResolver==null){
log.error("messageMetaDataResolver is null");return;}
messageMetaDataResolver.resolve(messageMetaData);}}
元数据初始化器
/**
* 元数据核心参数初始化器
* @author likun
* @date 2022年06月17日 15:43
*/@RequiredArgsConstructor@Slf4jpublicclassMessageMetaDataInitalizerimplementsInitializingBean,Ordered{privatefinalXlcpMqProperties xlcpMqProperties;privatefinalMessageMetaDataResolver messageMetaDataResolver;@OverridepublicvoidafterPropertiesSet()throwsException{
log.info("autoCreateMq configured with [{}]",xlcpMqProperties.getAutoCreateMq());if(xlcpMqProperties.getAutoCreateMq()){Map<String,MessageMetaData> messageMetaDataMap =SpringContextHolder.getBeansOfType(MessageMetaData.class);
messageMetaDataMap.forEach((key,messageMetaData)->{
log.info("Mq auto declared with metaDate [{}]",messageMetaData);
messageMetaDataResolver.resolve(messageMetaData);});}}@OverridepublicintgetOrder(){returnOrdered.LOWEST_PRECEDENCE;}}
将静态代理器和初始器交由spring容器管理
2.2 动态消息监听器构建
申明抽象消息监听器
@Slf4jpublicabstractclassDynamicMessageListener<T>extendsAbstractAdaptableMessageListener{privatefinalMessageMetaData messageMetaData;publicstaticfinalString MQ_MESSAGE_ID_PREFIX ="mq:message:id";publicDynamicMessageListener(MessageMetaData messageMetaData){this.messageMetaData=messageMetaData;}publicvoiddoExcute(Message message,Channel channel,Consumer<Object> consumer)throwsIOException{MessageProperties messageProperties=message.getMessageProperties();String messageId=messageProperties.getMessageId();// 校验当前消息是否被消费RedisTemplate redisTemplate=SpringContextHolder.getBean(RedisTemplate.class);
redisTemplate.setKeySerializer(newStringRedisSerializer());String mqMessageMqKey =String.format("%s%s%s",MQ_MESSAGE_ID_PREFIX,StrPool.COLON,messageId);if(redisTemplate.hasKey(mqMessageMqKey)){ackOk(message,channel);
log.info("当前消息已经被消费了,消息id为:{}",messageId);return;}MessageConverter messageConverter=getMessageConverter();Object convetMessag= messageConverter.fromMessage(message);Boolean ackFalg =Boolean.FALSE;try{
consumer.accept(convetMessag);// 存入redis中确保消息已经被消费了 保存30分钟
redisTemplate.opsForValue().set(mqMessageMqKey,"",30,TimeUnit.MINUTES);}catch(Exception exception){
exception.printStackTrace();// 签收失败将消息重新投递到队列中
ackFalg=onError(message,channel);}finally{if(!ackFalg){ackOk(message, channel);}}}protectedBooleanonError(Message message,Channel channel)throwsIOException{
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);returnBoolean.TRUE;}protectedvoidackOk(Message message,Channel channel)throwsIOException{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}publicMessageMetaDatagetMessageMetaData(){return messageMetaData;}}
申明单消息监听器模板和批量消息监听模板
publicabstractclassSingleDynamicMessageListener<T>extendsDynamicMessageListener<T>{publicSingleDynamicMessageListener(MessageMetaData messageMetaData){super(messageMetaData);}publicabstractvoidonMessage(T t);@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{doExcute(message,channel,object->{onMessage((T)object);});}}publicabstractclassBatchDynamicMessageListener<T>extendsDynamicMessageListener<T>{publicBatchDynamicMessageListener(MessageMetaData messageMetaData){super(messageMetaData);}@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{MessageConverter messageConverter=getMessageConverter();doExcute(message,channel,(obj)->{if(message instanceofCollection){onMessageBatch((Collection<T>) obj);}else{ArrayList<T> list =newArrayList<>();
list.add((T)message);onMessageBatch(list);}});}/**
* 批量消息
* @param collection
*/abstractvoidonMessageBatch(Collection<T> collection);}
申明动态容器监听器
@RequiredArgsConstructorpublicclassDynamicMessageListenerContainerextendsSimpleRabbitListenerContainerFactory{privatefinalint DEFAULT_PREFETCH_COUNT =1;privatefinalConnectionFactory connectionFactory;/**
* 后缀
*/publicstaticfinalString LISTENER_CONTAINER_SUFFIX ="SimpleMessageListenerContainer";privateint prefetchCount = DEFAULT_PREFETCH_COUNT;publicSimpleMessageListenerContainercreateListenerContainer(DynamicMessageListener dynamicMessageListener,MessageMetaData messageMetaData){SimpleMessageListenerContainer simpleMessageListenerContainer=newSimpleMessageListenerContainer();
simpleMessageListenerContainer.setQueueNames(messageMetaData.getQueue());
simpleMessageListenerContainer.setMessageListener(dynamicMessageListener);
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
simpleMessageListenerContainer.setPrefetchCount(prefetchCount);//设置当前消息为手动签收
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);return simpleMessageListenerContainer;}publicvoidsetPrefetchCount(int prefetchCount){this.prefetchCount = prefetchCount;}}
申明动态容器监听器初始化容器
@Slf4jpublicclassDynamicMessageListenerInitializeimplementsInitializingBean,Ordered{@OverridepublicvoidafterPropertiesSet()throwsException{// 获得所有客户端配置的监听器Map<String,DynamicMessageListener> listenerMap =SpringContextHolder.getBeansOfType(DynamicMessageListener.class);// 获取bean工厂ConfigurableListableBeanFactory beanFactory =(ConfigurableListableBeanFactory)SpringContextHolder.getBeanFactory();// 获得监听器容器 同于配置配置监听器DynamicMessageListenerContainer dynamicMessageListenerContainer =SpringContextHolder.getBean(DynamicMessageListenerContainer.class);// 初始化监听器的配置
listenerMap.forEach((listenerName,listener)->{MessageMetaData messageMetaData = listener.getMessageMetaData();SimpleMessageListenerContainer listenerContainer = dynamicMessageListenerContainer.createListenerContainer(listener, messageMetaData);String beanName =String.format("%s%s",listenerName,DynamicMessageListenerContainer.LISTENER_CONTAINER_SUFFIX);
beanFactory.registerSingleton(beanName,listenerContainer);
log.info("create [{}] listener to [{}]",beanName,messageMetaData);});}@OverridepublicintgetOrder(){returnOrdered.LOWEST_PRECEDENCE;}}
将初始化容器交给spring容器管理
2.3 消息发射器构建
申明发射规范
/**
* 消息投递
* @author likun
* @date 2022年06月17日 14:19
*/publicinterfaceMessageDeliveryextendsSupport{/**
* 投递消息
* @param messageMetaData 元数据信息
* @param t 消息
* @param <T>
* @return
*/<T>Booleandeliver(MessageMetaData messageMetaData,T t);}
申明不同模板的发射器
publicabstractclassAbstractMessageDeliveryimplementsMessageDelivery{publicfinalRabbitTemplate rabbitTemplate;publicAbstractMessageDelivery(RabbitTemplate rabbitTemplate){this.rabbitTemplate=rabbitTemplate;}@Overridepublic<T>Booleandeliver(MessageMetaData messageMetaData,T t){if(support(messageMetaData.getExchangeType())){doDeliver(messageMetaData,t);returntrue;}returnfalse;}publicabstract<T>voiddoDeliver(MessageMetaData messageMetaData,T t);publicMessagecreateMessage(Object object,MessageMetaData messageMetaData){// 消息转换 并设置当前消息全局唯一id保证消息不被重复下消费SimpleMessageConverter converter =newSimpleMessageConverter();finalMessageProperties messageProperties;MessageProperties customMessageProperties = messageMetaData.getMessageProperties();if(customMessageProperties!=null){
messageProperties=customMessageProperties;}else{
messageProperties =MessagePropertiesBuilder.newInstance().build();}Message message = converter.toMessage(object,setSharedMessageProperties(messageProperties));return message;}/**
* 设置共有属性
*/privateMessagePropertiessetSharedMessageProperties(MessageProperties messageProperties){
messageProperties.setMessageId(Convert.toStr(UniqueIdUtil.genId()));return messageProperties;}}publicclassFanoutMessageMetaDateDeliverextendsAbstractMessageDelivery{publicFanoutMessageMetaDateDeliver(RabbitTemplate rabbitTemplate){super(rabbitTemplate);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returnExchangeTypeEnum.FANOUT.equals(exchangeTypeEnum);}@Overridepublic<T>voiddoDeliver(MessageMetaData messageMetaData,T t){FanoutMessageMetaDataFanoutMessageMetaData=(FanoutMessageMetaData) messageMetaData;Message message =createMessage(t, messageMetaData);
rabbitTemplate.convertAndSend(FanoutMessageMetaData.getExchange(),"",message);}}publicclassDirectMessageMetaDateDeliverextendsAbstractMessageDelivery{publicDirectMessageMetaDateDeliver(RabbitTemplate rabbitTemplate){super(rabbitTemplate);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returnExchangeTypeEnum.DIRECT.equals(exchangeTypeEnum);}@Overridepublic<T>voiddoDeliver(MessageMetaData messageMetaData,T t){DirectMessageMetadata directMessageMetadata =(DirectMessageMetadata) messageMetaData;Message message =createMessage(t,messageMetaData);
rabbitTemplate.convertAndSend(directMessageMetadata.getExchange(),directMessageMetadata.getQueue(),message);}}
申明静态代理器
/**
* 委派消息推送者
* @author likun
* @date 2022年06月17日 14:38
*/publicclassDelegatingMessageDeliveryimplementsMessageDelivery{privatefinalMap<ExchangeTypeEnum,MessageDelivery> messageDeliveryMap =newConcurrentHashMap<>();publicDelegatingMessageDelivery(Map<ExchangeTypeEnum,MessageDelivery> messageDeliveryMap){this.messageDeliveryMap.putAll(messageDeliveryMap);}@OverridepublicBooleansupport(ExchangeTypeEnum exchangeTypeEnum){returntrue;}@Overridepublic<T>Booleandeliver(MessageMetaData messageMetaData,T t){Assert.notNull(messageMetaData,"message deliver must have messageMetaData but is null");ExchangeTypeEnum exchangeType = messageMetaData.getExchangeType();MessageDelivery delivery = messageDeliveryMap.get(messageMetaData.getExchangeType());Assert.notNull(delivery,"message deliver not null");return delivery.deliver(messageMetaData,t);}}
2.4 申明公用基础属性
@ConfigurationProperties(prefix =XlcpMqProperties.PRE_FIX)@DatapublicclassXlcpMqProperties{publicfinalstaticString PRE_FIX ="config.mq";privateBoolean enabled =Boolean.TRUE;/**
* mq 核心参数自动化配置
*/privateBoolean autoCreateMq =Boolean.TRUE;}
2.5定义springboot基础规范
@Configuration@EnableConfigurationProperties(XlcpMqProperties.class)publicclassXlcpMqMessageAutoConfiguration{@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}/**
* 委派数据解析器
*/@Bean@ConditionalOnMissingBeanpublicMessageMetaDataResolvermessageMetaDataResolver(RabbitAdmin rabbitAdmin){HashMap<ExchangeTypeEnum,MessageMetaDataResolver> resolverHashMap =newHashMap<>();
resolverHashMap.put(ExchangeTypeEnum.FANOUT,newFanoutMessageMetaDateResolver(rabbitAdmin));
resolverHashMap.put(ExchangeTypeEnum.DIRECT,newDirectMessageMetadataResolver(rabbitAdmin));returnnewDelegatingMessageMetaDataResolver(resolverHashMap);}/**
* 元数据初始化解析器
*/@Bean@ConditionalOnMissingBeanpublicMessageMetaDataInitalizermessageMetaDataInitalizer(MessageMetaDataResolver messageMetaDataResolver,XlcpMqProperties xlcpMqProperties){returnnewMessageMetaDataInitalizer(xlcpMqProperties,messageMetaDataResolver);}/**
* 消息投递者
*/@Bean@ConditionalOnMissingBeanpublicMessageDeliverymessageDelivery(RabbitTemplate rabbitTemplate){HashMap<ExchangeTypeEnum,MessageDelivery> deliveryHashMap =newHashMap<>();
deliveryHashMap.put(ExchangeTypeEnum.FANOUT,newFanoutMessageMetaDateDeliver(rabbitTemplate));
deliveryHashMap.put(ExchangeTypeEnum.DIRECT,newDirectMessageMetaDateDeliver(rabbitTemplate));returnnewDelegatingMessageDelivery(deliveryHashMap);}@BeanpublicDynamicMessageListenerContainerdynamicMessageListenerContainer(ConnectionFactory connectionFactory){returnnewDynamicMessageListenerContainer(connectionFactory);}@BeanpublicDynamicMessageListenerInitializedynamicMessageListenerInitialize(){returnnewDynamicMessageListenerInitialize();}}
三 客户端使用
3.1 申明自定义元数据信息
@ComponentpublicclassDemoMessageMetaDataextendsFanoutMessageMetaData{@OverridepublicStringgetExchange(){return"demoExchange";}@OverridepublicStringgetQueue(){return"demoQueue";}}
3.2 申明消息监听器
@DatapublicclassDemoMessageimplementsSerializable{privatestaticfinallong serialVersionUID =-5307128314859934844L;privateString text;}@Component@Slf4jpublicclassDemoMessageListenerextendsSingleDynamicMessageListener<DemoMessage>{publicDemoMessageListener(){super(newDemoMessageMetaData());}@OverridepublicvoidonMessage(DemoMessage demoMessage){
log.info("监听到消息:{}",demoMessage);}}
3.3 业务代码中使用
@RestController@RequestMapping("/mq")@Slf4jpublicclassDemoMessageController{@AutowiredprivateDemoMessageMetaData demoMessageMetaData;@AutowiredprivateMessageDelivery messageDelivery;@GetMapping("/message")@Inner(value =false)publicvoidsendSingleMessage(){DemoMessage demoMessage =newDemoMessage();
demoMessage.setText("hello mq");MessageProperties messageProperties =MessagePropertiesBuilder.newInstance().setExpiration("1").setDeliveryTag(1234L).build();
demoMessageMetaData.setMessageProperties(messageProperties);
messageDelivery.deliver(demoMessageMetaData,demoMessage);}}
四 一些说明
4.1 如何保证消息的幂等性?
4.2 如何保证消息的可靠性投递
主要从三个方面入手:
开启数据的持久化,避免服务宕机后重启消息丢失
消息的投递:通过confirm回调确保消息投递成功
消费端主要是通过手动签收来确保消息被消费成功
4.3 如何基于mq实现消息的延迟消费
注意:这种方式当最前面的消息没过期时后面的消息过期了也不会投递到死信交换机中,这里容易导致消息的堆积,一般用于时长固定的定时任务,可以采用rocketMq进行可变的延时消息的投递.
版权归原作者 Instanceztt 所有, 如有侵权,请联系我们删除。