直接上代码
packagecom.example.demo;importorg.aopalliance.aop.Advice;importorg.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.messaging.converter.MappingJackson2MessageConverter;importorg.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;importorg.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;importorg.springframework.retry.interceptor.RetryInterceptorBuilder;importorg.springframework.retry.interceptor.RetryOperationsInterceptor;@ConfigurationpublicclassRabbitMqConfigimplementsRabbitListenerConfigurer{publicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory,BeforePublishPostProcessor messagePostProcessor){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());
rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);return rabbitTemplate;}publicJackson2JsonMessageConverterproducerJackson2MessageConverter(){returnnewJackson2JsonMessageConverter();}@OverridepublicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrar registrar){
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}publicMessageHandlerMethodFactorymessageHandlerMethodFactory(){DefaultMessageHandlerMethodFactory messageHandlerMethodFactory =newDefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}publicMappingJackson2MessageConverterconsumerJackson2MessageConverter(){returnnewMappingJackson2MessageConverter();}/**
* 使用的时候在
* @RabbitListener(containerFactory="containerFactory")
*/@Bean("containerFactory")publicSimpleRabbitListenerContainerFactorycontainerFactory(ConnectionFactory connectionFactory,AfterReceivePostProcessor messagePostProcessor,RabbitErrorHandler rabbitErrorHandler){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());
factory.setAfterReceivePostProcessors(messagePostProcessor);
factory.setErrorHandler(rabbitErrorHandler);
factory.setDefaultRequeueRejected(false);
factory.setAdviceChain(newAdvice[]{getRetryOperationsInterceptor()});return factory;}/**
* 防止队列出错无限次重试
*/privateRetryOperationsInterceptorgetRetryOperationsInterceptor(){returnRetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(10000,2,30000).build();}/**
* 配置了交换机队列以及绑定的配置类上加@DependsOn("rabbitAdmin")
* 可以防止交换机队列以及绑定无法创建的问题
* rabbitAdmin创建一定要在前
*/publicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}
AfterReceivePostProcessor
packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassAfterReceivePostProcessorimplementsMessagePostProcessor{@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
log.info("接收消息属性={},body={}",message.getMessageProperties(),newString(message.getBody()));return message;}}
BeforePublishPostProcessor
packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Correlation;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassBeforePublishPostProcessorimplementsMessagePostProcessor{@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{return message;}@OverridepublicMessagepostProcessMessage(Message message,Correlation correlation,String exchange,String routingKey){
log.info("发送的消息MessageProperties={},body={},correlation={},exchange={},routingKey={}",message.getMessageProperties(),message.getBody(),correlation,exchange,routingKey);returnMessagePostProcessor.super.postProcessMessage(message, correlation, exchange, routingKey);}}
RabbitErrorHandler
packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importorg.springframework.util.ErrorHandler;@Slf4j@ComponentpublicclassRabbitErrorHandlerimplementsErrorHandler{@OverridepublicvoidhandleError(Throwable t){
log.error(t.getMessage(),t);}}
本文转载自: https://blog.csdn.net/hjnjmjkj/article/details/136171182
版权归原作者 hjnjmjkj 所有, 如有侵权,请联系我们删除。
版权归原作者 hjnjmjkj 所有, 如有侵权,请联系我们删除。