0


微服务: 04-springboot中rabbitmq的yml或properties配置,消息回收,序列化方式

1. 本文简介:

rabbitmq的连接配置, 以及回收机制配置, 修改默认java序列化方式

1.1 java序列化的缺点

---> 1.1.1 无法跨语言

serialVersionUID版本号必须相同,相同类名 版本号不一样 反序列化失败

--->1.1.2 易被攻击

Apache Commons Collections 允许链式的任意的类函数反射调用,攻击者通过“实现了 Java 序列化协议”的端口,把攻击代码上传到服务器上,再由 Apache Commons Collections 里的 TransformedMap 来执行。

---> 1.1.3 序列化后的流太大

Java 序列化实现的二进制编码完成的二进制数组大小,比 ByteBuffer 实现的二进制编码完成的二进制数组大小要大上几倍。

---> 1.1.4 序列化性能太差

ObjectOutputStream序列化效率很低

2. 配置总览

2.1 基础配置

#基础配置
spring.rabbitmq.host=*
spring.rabbitmq.port=5672
spring.rabbitmq.username=pzy
spring.rabbitmq.password=*
spring.rabbitmq.virtual-host=develop

2.2 连接重试配置

# 开启rabbit初始化重试机制
spring.rabbitmq.template.retry.enabled=true
## 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=1000ms
## 最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
# 间隔乘数
spring.rabbitmq.template.retry.multiplier=1
# 初始化的时间间隔
spring.rabbitmq.template.retry.initial-interval=1000ms

2.3 异常重试机制

#异常重试机制设置[未加死信队列 五次异常后 直接抛弃了]
#设置是否重回队列 true即出现异常会将消息重新发送到队列中
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#设置是否启用消息重试机制,默认为false。
spring.rabbitmq.listener.simple.retry.enabled=true
#设置消息重试的最大次数,默认为3。
spring.rabbitmq.listener.simple.retry.max-attempts=5
#设置消息重试的初始间隔时间,默认为1000ms。
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
#设置消息重试的时间间隔倍数,默认为1(重试时间越来越长)
spring.rabbitmq.listener.simple.retry.multiplier=1.2
##设置消息重试的最大时间间隔,默认为10000ms。
spring.rabbitmq.listener.simple.retry.max-interval=3000ms

2.4 确认模式(本篇是自动)

#spring.rabbitmq.listener.simple.acknowledge-mode=none
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

---> 2.4.1 如图所示

2.5 发送确认设置

#发布消息成功到交换器后会触发回调方法(默认禁用none)
spring.rabbitmq.publisher-confirm-type=correlated
#消息发布不可达目的地的时候 才进行回退
spring.rabbitmq.publisher-returns=true

---> 2.5.1 参数解释: (老版的功能 直接是一个布尔值 false不开启 true开启)

None 禁用发布确认模式,是默认值
CORRELATED 发布消息成功到交换机后会触发回调方法
SIMPLE 有两种效果:

其一: 效果和 CORRELATED 值一样会触发回调方法

其二: 在发布消息成功 使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,无法发送消息到 broker

---> 2.5.2 写一个callbackConfig, 重写方法

package com.aisce.axmall.order.config.rabbitmq;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq的成功与失败
 * 消息回调
 * @author pzy
 * @description: TODO
 * @version 1.0.1 beta版
 */
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    /**
     * 消息正常发送 或者发送到broker后出现问题
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {

            log.error("confirm==>发送到broker失败\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
        } else {

            log.info("confirm==>发送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
        }
    }

    /**
     * 压根没到目的地 执行
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(@NonNull Message message, int replyCode,
                                @NonNull String replyText, @NonNull String exchange, @NonNull String routingKey) {

        log.error("error returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                message, replyCode, replyText, exchange, routingKey);
    }

    //可进行后续操作

}

2.6 重新配置序列化

有两种方式(原理差不多)

---> 方法一: 都以application/json传递接收

@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

---> 方法二: 都以application/json传递接收

/**
 * rabbitmq配置类
 *
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
@Configuration
public class RabbitConfig {

    @Autowired
    private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;

    @Autowired
    private RabbitCallbackConfig rabbitCallbackConfig;

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //消息到不到队列 自动重新返回到生产者
        rabbitTemplate.setMandatory(true);//其实前面配置加了
        rabbitTemplate.setConfirmCallback(rabbitCallbackConfig);
        rabbitTemplate.setReturnCallback(rabbitCallbackConfig);
        // 使用 JSON 序列化与反序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());

        return rabbitTemplate;
    }
}

---> 展示效果:


3. 配置总结

3.1 完整的application.properties的配置

#beta版 rabbitmq V3.0.1 版本启动测试  pzy===============================================>
#基础配置
spring.rabbitmq.host=*
spring.rabbitmq.port=5672
spring.rabbitmq.username=pzy
spring.rabbitmq.password=*
spring.rabbitmq.virtual-host=develop
#发送确认机制设置
#发布消息成功到交换器后会触发回调方法(默认禁用none)
spring.rabbitmq.publisher-confirm-type=correlated
#消息发布不可达目的地的时候 才进行回退
spring.rabbitmq.publisher-returns=true

# 队列设置
#设置每次预抓取的数量是3,处理完之前不收下一条 默认250
spring.rabbitmq.listener.simple.prefetch=3
# 手动确认模式
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

# 开启rabbit初始化重试机制
spring.rabbitmq.template.retry.enabled=true
## 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=1000ms
## 最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
# 间隔乘数
spring.rabbitmq.template.retry.multiplier=1
# 初始化的时间间隔
spring.rabbitmq.template.retry.initial-interval=1000ms

#异常重试机制设置[未加死信队列 五次异常后 直接抛弃了]
#设置是否重回队列 true即出现异常会将消息重新发送到队列中
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#设置是否启用消息重试机制,默认为false。
spring.rabbitmq.listener.simple.retry.enabled=true
#设置消息重试的最大次数,默认为3。
spring.rabbitmq.listener.simple.retry.max-attempts=5
#设置消息重试的初始间隔时间,默认为1000ms。
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
#设置消息重试的时间间隔倍数,默认为1(重试时间越来越长)
spring.rabbitmq.listener.simple.retry.multiplier=1.2
##设置消息重试的最大时间间隔,默认为10000ms。
spring.rabbitmq.listener.simple.retry.max-interval=3000ms

# beta版 ============================================================================>

3.2 完整的application.yml配置

properties 转成 yml 后 注释没了 具体看上面的

#只有服务的消费者 决定怎么消费确认 生产者决定不了
spring: 
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        default-requeue-rejected: true
        retry:
          enabled: true
          initial-interval: 2000ms
          max-attempts: 5
          max-interval: 3000ms
          multiplier: 1.2
    host: *    
    port: 5672
    username: pzy
    password: *
    virtual-host: develop
    # 新版的就是publisher-confirm-type=correlated
    publisher-confirms: true
    publisher-returns: true
    template:
      retry:
        enabled: true
        initial-interval: 1000ms
        max-attempts: 3
        max-interval: 1000ms
        multiplier: 1
    

4. 文章传送门

微服务: 00-rabbitmq出现的异常以及解决方案

微服务: 01-rabbitmq的应用场景及安装(docker)

微服务 02-rabbitmq在springboot中如何使用(上篇)

微服务: 03-rabbitmq在springboot中如何使用(下篇)

5. 下文预告

微服务: 05-rabbitmq设置重试次数并设置死信队列


本文转载自: https://blog.csdn.net/pingzhuyan/article/details/131786701
版权归原作者 pingzhuyan 所有, 如有侵权,请联系我们删除。

“微服务: 04-springboot中rabbitmq的yml或properties配置,消息回收,序列化方式”的评论:

还没有评论