0


RabbitMQ基本使用及企业开发中注意事项

一、基本使用

部署完RabbitMQ有两种使用方式:

  • 网页客户端
  • Java代码

MQ组成部分:

  • 虚拟主机 -> 进行数据隔离的,好比mysql中的不同数据库
  • 交换机 -> 进行路由转发消息 - fanout:最普通 相当于广播- topic:可依key绑定不同队列,可使用.分割key,并支持模糊匹配- direct:可依key绑定不同队列,不支持模糊匹配
  • 队列 -> 接受交换机转发过来的消息

Java中使用(注解开发):

1.引依赖和配置属性 - 版本选个差不多的 我是继承的springboot的

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring:
  rabbitmq:
    host: 192.168.189.189
    port: 5672
    virtual-host: hmallVir
    username: hmall
    password: 123321

2.使用RabbitTemplate发消息

@SpringBootTest
public class TestSendMs {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testSend1(){
        String queueName = "hmallQueue1";
        String ms = "hello";
        rabbitTemplate.convertAndSend(queueName, ms);
    }

}

3.使用RabbitListerner接受消息

@Component
public class MyListeners {

    @RabbitListener(queues = "hmallQueue1")
    public void listenerMs1(String ms) throws InterruptedException {
        System.out.println("消费者1接收到消息  ->  " + ms);
        Thread.sleep(20);
    }

}

4.声明队列和交换机的方式二(简单常用)

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "hmallQueue1"),
            exchange = @Exchange(name = "hmall", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到消息 -> " + msg);
    }

轮询接受消息问题:

队列读取消息时使用轮询机制,每个队列都读取相同的消息数量,这样不好,我们要针对队列处理消息的能力,需要在配置文件设置属性fetch

spring:
  rabbitmq:
    host: 192.168.189.189
    port: 5672
    virtual-host: hmallVir
    username: hmall
    password: 123321
    listener:
      simple:
        prefetch: 1 # 每次处理完消息再获取新的消息 性能越好处理消息越多

扩展消息转换器:

默认的消息转换器是直接将对象序列化为Byte[],即读不懂又不安全还占内存

        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
    @Bean
    public MessageConverter jacksonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

二、使用注意事项

1. 生产者重连机制 - 保证mq服务是通的

连接重试,注意这里是阻塞式的,意味着连接失败会一直重试其他业务不会执行,所以建议禁用此模式。

spring:
  rabbitmq:
    host: 192.168.189.189
    port: 5672
    virtual-host: hmallVir
    username: hmall
    password: 123321
    connection-timeout: 1s # 连接失败重试
    template:
      retry:
        enabled: true # 开启重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待市场倍数
        max-attempts: 3 # 最大重试次数
2. 生产者确认机制 - 回调机制

SpringAMQP生产者确认机制的返回值情况?

  1. ack 消息成功投递到mq 1. 交换机收到了 没有路由转发到队列 ack + return路由异常2. 交换机收到并路由成功 只ack
  2. nack 消息丢失 投递失败
3. MQ的可靠性

服务一旦挂了消息就都没有了,还有就是内存如果满了,会触发阻塞式的强制持久化操作,这会导致这段时间处理消息的能力为0

  1. 设置消息的持久化属性:需要将delivery_mode属性设置为2。这个设置会将消息标记为持久化,确保消息被写入磁盘而不是仅保存在内存中。
  2. 声明持久化的队列:在RabbitMQ中声明队列时,需要将durable属性设置为true。这样,队列的元数据以及队列中的消息都会被持久化到磁盘上。需要注意的是,即使队列被设置为持久化,也不能保证内部所存储的消息不会丢失,因为RabbitMQ默认在消息被消费后立即删除它。要确保消息不被删除,需要在消费时进行相应的设置。
  3. 声明持久化的交换机:与队列类似,交换机也可以被设置为持久化。在声明交换机时,将durable属性设置为true,就可以将交换机标记为持久化。如果交换机不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,但已发送的消息不会丢失,只是不能再将新的消息发送到这个交换机中。
4. Lazy Queue模式
  1. 降低内存压力:在高负载情况下,大量消息堆积可能导致内存压力剧增。使用Lazy Queue模式,消息直接写入磁盘,可以有效降低内存使用,避免内存溢出等问题。
  2. 支持大量消息存储:由于消息存储在磁盘上,Lazy Queue模式可以支持数百万条消息的存储,满足大规模消息处理的需求。
  3. 提高系统稳定性:通过将消息持久化到磁盘,Lazy Queue模式增强了消息的可靠性,即使RabbitMQ服务器发生故障或重启,消息也不会丢失,保证了系统的稳定性和数据的完整性。


5. 消费者确认机制

这里就是模拟事务嘛,利用AOP思想


本文转载自: https://blog.csdn.net/Panci_/article/details/137512444
版权归原作者 不会就选C. 所有, 如有侵权,请联系我们删除。

“RabbitMQ基本使用及企业开发中注意事项”的评论:

还没有评论