1.application.yml
spring:rabbitmq:host: 127.0.0.1 #ipport:5672#端口username: guest #账号password: guest #密码virtualHost:#链接的虚拟主机addresses: 127.0.0.1:5672#多个以逗号分隔,与host功能一样。requestedHeartbeat:60#指定心跳超时,单位秒,0为不指定;默认60spublisherConfirms:true#发布确认机制是否启用#确认消息已发送到交换机(Exchange)#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。#publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:truepublisherReturns:true#发布返回是否启用connectionTimeout:#链接超时。单位ms。0表示无穷大不超时### ssl相关ssl:enabled:#是否支持sslkeyStore:#指定持有SSL certificate的key store的路径keyStoreType:#key store类型 默认PKCS12keyStorePassword:#指定访问key store的密码trustStore:#指定持有SSL certificates的Trust storetrustStoreType:#默认JKStrustStorePassword:#访问密码algorithm:#ssl使用的算法,例如,TLSv1.1verifyHostname:#是否开启hostname验证### cache相关cache:channel:size:#缓存中保持的channel数量checkoutTimeout:#当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channelconnection:mode:#连接工厂缓存模式:CHANNEL 和 CONNECTIONsize:#缓存的连接数,只有是CONNECTION模式时生效### listenerlistener:type:#两种类型,SIMPLE,DIRECT## simple类型simple:concurrency:#最小消费者数量maxConcurrency:#最大的消费者数量transactionSize:#指定一个事务处理的消息数量,最好是小于等于prefetch的数量missingQueuesFatal:#是否停止容器当容器中的队列不可用## 与direct相同配置部分autoStartup:#是否自动启动容器acknowledgeMode:#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认autoprefetch:#指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量defaultRequeueRejected:#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)idleEventInterval:#container events发布频率,单位ms##重试机制retry:stateless:#有无状态enabled:#是否开启maxAttempts:#最大重试次数,默认3initialInterval:#重试间隔multiplier:#对于上一次重试的乘数maxInterval:#最大重试时间间隔direct:consumersPerQueue:#每个队列消费者数量missingQueuesFatal:#...其余配置看上方公共配置## template相关template:mandatory:#是否启用强制信息;默认falsereceiveTimeout:#`receive()`接收方法超时时间replyTimeout:#`sendAndReceive()`超时时间exchange:#默认的交换机routingKey:#默认的路由defaultReceiveQueue:#默认的接收队列## retry重试相关retry:enabled:#是否开启maxAttempts:#最大重试次数initialInterval:#重试间隔multiplier:#失败间隔乘数maxInterval:#最大间隔
2.application.properties
############################ base ############################
spring.rabbitmq.host: 服务Host
spring.rabbitmq.port: 服务端口
spring.rabbitmq.username: 登陆用户名
spring.rabbitmq.password: 登陆密码
spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
#确认消息已发送到交换机(Exchange)#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。#spring.rabbitmq.publisher-confirms-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true
spring.rabbitmq.publisher-returns: 是否启用【发布返回】
spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
############################ ssl ############################
spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
spring.rabbitmq.ssl.trust-store-type: JKS:Trust store 类型
spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置,例如,TLSv1.1
spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证
spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证
############################ cache ############################
spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
############################ listener ############################spring.rabbitmq.listener.type=simple: 容器类型.simple或direct
spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器
spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
spring.rabbitmq.listener.direct.acknowledge-mode= ack模式
spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器
spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量.
spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队.
spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔.
spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败.
spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量.
spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制.
spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message.
spring.rabbitmq.listener.direct.retry.max-attempts=3# Maximum number of attempts to deliver a message.
spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts.spring.rabbitmq.listener.direct.retry.multiplier=1# Multiplier to apply to the previous retry interval.spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful.############################ template ############################
spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled: 发送重试是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重试次数
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
3.RabbitConfig.java (自定义Rabbitmq配置类)
//常用的三个配置如下//1---设置手动应答(acknowledge-mode: manual)// 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调// publisher-confirm-type: correlated// #保证交换机能把消息推送到队列中// publisher-returns: true// template:// #以下是rabbitmqTemplate配置// mandatory: true)// 3---设置重试@ConfigurationpublicclassRabbitConfig{@AutowiredprivateConnectionFactory rabbitConnectionFactory;//@Bean 缓存连接池//public CachingConnectionFactory rabbitConnectionFactory@AutowiredprivateRabbitProperties properties;//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉// 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称// @Bean// public ConnectionFactory connectionFactory() throws Exception {// //创建工厂类// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();// //用户名// cachingConnectionFactory.setUsername("gust");// //密码// cachingConnectionFactory.setPassword("gust");// //rabbitMQ地址// cachingConnectionFactory.setHost("127.0.0.1");// //rabbitMQ端口// cachingConnectionFactory.setPort(Integer.parseInt("5672"));//// //设置发布消息后回调// cachingConnectionFactory.setPublisherReturns(true);// //设置发布后确认类型,此处确认类型为交互// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);// return cachingConnectionFactory;// }// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){SimpleRabbitListenerContainerFactory containerFactory =newSimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(rabbitConnectionFactory);// 并发消费者数量
containerFactory.setConcurrentConsumers(1);
containerFactory.setMaxConcurrentConsumers(20);// 预加载消息数量 -- QOS
containerFactory.setPrefetchCount(1);// 应答模式(此处设置为手动)
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息序列化方式
containerFactory.setMessageConverter(newJackson2JsonMessageConverter());// 设置通知调用链 (这里设置的是重试机制的调用链)
containerFactory.setAdviceChain(RetryInterceptorBuilder.stateless().recoverer(newRejectAndDontRequeueRecoverer()).retryOperations(rabbitRetryTemplate()).build());return containerFactory;}// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate=newRabbitTemplate(rabbitConnectionFactory);//默认是用jdk序列化//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());//CorrelationData correlationData, boolean b, String s
rabbitTemplate.setConfirmCallback((correlationData, b, s)->{System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);System.out.println("ConfirmCallback "+"确认情况:"+b);System.out.println("ConfirmCallback "+"原因:"+s);});//Message message, int i, String s, String s1, String s2
rabbitTemplate.setReturnCallback((message, i, s, s1, s2)->{System.out.println("ReturnCallback: "+"消息:"+message);System.out.println("ReturnCallback: "+"回应码:"+i);System.out.println("ReturnCallback: "+"回应消息:"+s);System.out.println("ReturnCallback: "+"交换机:"+s1);System.out.println("ReturnCallback: "+"路由键:"+s2);});return rabbitTemplate;}//重试的Template@BeanpublicRetryTemplaterabbitRetryTemplate(){RetryTemplate retryTemplate =newRetryTemplate();// 设置监听 调用重试处理过程
retryTemplate.registerListener(newRetryListener(){@Overridepublic<T,EextendsThrowable>booleanopen(RetryContext retryContext,RetryCallback<T,E> retryCallback){// 执行之前调用 (返回false时会终止执行)returntrue;}@Overridepublic<T,EextendsThrowable>voidclose(RetryContext retryContext,RetryCallback<T,E> retryCallback,Throwable throwable){// 重试结束的时候调用 (最后一次重试 )System.out.println("---------------最后一次调用");return;}@Overridepublic<T,EextendsThrowable>voidonError(RetryContext retryContext,RetryCallback<T,E> retryCallback,Throwable throwable){// 异常 都会调用System.err.println("-----第{}次调用"+retryContext.getRetryCount());}});
retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
retryTemplate.setRetryPolicy(retryPolicyByProperties());return retryTemplate;}@BeanpublicExponentialBackOffPolicybackOffPolicyByProperties(){ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy();long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();// 重试间隔
backOffPolicy.setInitialInterval(initialInterval *1000);// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval *1000);// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);return backOffPolicy;}@BeanpublicSimpleRetryPolicyretryPolicyByProperties(){SimpleRetryPolicy retryPolicy =newSimpleRetryPolicy();int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);return retryPolicy;}}
本文转载自: https://blog.csdn.net/Ying_ph/article/details/133071100
版权归原作者 一枚小蜗牛H 所有, 如有侵权,请联系我们删除。
版权归原作者 一枚小蜗牛H 所有, 如有侵权,请联系我们删除。