0


spring boot rabbitmq常用配置

直接上代码

packagecom.example.demo;importorg.aopalliance.aop.Advice;importorg.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.messaging.converter.MappingJackson2MessageConverter;importorg.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;importorg.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;importorg.springframework.retry.interceptor.RetryInterceptorBuilder;importorg.springframework.retry.interceptor.RetryOperationsInterceptor;@ConfigurationpublicclassRabbitMqConfigimplementsRabbitListenerConfigurer{publicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory,BeforePublishPostProcessor messagePostProcessor){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());
        rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);return rabbitTemplate;}publicJackson2JsonMessageConverterproducerJackson2MessageConverter(){returnnewJackson2JsonMessageConverter();}@OverridepublicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrar registrar){
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}publicMessageHandlerMethodFactorymessageHandlerMethodFactory(){DefaultMessageHandlerMethodFactory messageHandlerMethodFactory =newDefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}publicMappingJackson2MessageConverterconsumerJackson2MessageConverter(){returnnewMappingJackson2MessageConverter();}/**
    * 使用的时候在
    * @RabbitListener(containerFactory="containerFactory")
    */@Bean("containerFactory")publicSimpleRabbitListenerContainerFactorycontainerFactory(ConnectionFactory connectionFactory,AfterReceivePostProcessor messagePostProcessor,RabbitErrorHandler rabbitErrorHandler){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(newJackson2JsonMessageConverter());
        factory.setAfterReceivePostProcessors(messagePostProcessor);
        factory.setErrorHandler(rabbitErrorHandler);
        factory.setDefaultRequeueRejected(false);
        factory.setAdviceChain(newAdvice[]{getRetryOperationsInterceptor()});return factory;}/**
    * 防止队列出错无限次重试
    */privateRetryOperationsInterceptorgetRetryOperationsInterceptor(){returnRetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(10000,2,30000).build();}/**
    * 配置了交换机队列以及绑定的配置类上加@DependsOn("rabbitAdmin")
    * 可以防止交换机队列以及绑定无法创建的问题
    * rabbitAdmin创建一定要在前
    */publicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}

AfterReceivePostProcessor

packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassAfterReceivePostProcessorimplementsMessagePostProcessor{@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
        log.info("接收消息属性={},body={}",message.getMessageProperties(),newString(message.getBody()));return message;}}

BeforePublishPostProcessor

packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Correlation;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassBeforePublishPostProcessorimplementsMessagePostProcessor{@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{return message;}@OverridepublicMessagepostProcessMessage(Message message,Correlation correlation,String exchange,String routingKey){
        log.info("发送的消息MessageProperties={},body={},correlation={},exchange={},routingKey={}",message.getMessageProperties(),message.getBody(),correlation,exchange,routingKey);returnMessagePostProcessor.super.postProcessMessage(message, correlation, exchange, routingKey);}}

RabbitErrorHandler

packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importorg.springframework.util.ErrorHandler;@Slf4j@ComponentpublicclassRabbitErrorHandlerimplementsErrorHandler{@OverridepublicvoidhandleError(Throwable t){
        log.error(t.getMessage(),t);}}

本文转载自: https://blog.csdn.net/hjnjmjkj/article/details/136171182
版权归原作者 hjnjmjkj 所有, 如有侵权,请联系我们删除。

“spring boot rabbitmq常用配置”的评论:

还没有评论