0


RabbitMQ详细介绍

RabbitMQ

    基于AMQP-Advanced Message Queuing Protocol(高级消息队列协议)规范实现的开源项目

其中核心概念

1、Exchange

    交换机,接受生产者发送的消息并将信息路由给Binding的队列;包含4中路由规则:direct(一对一)、fanout(广播)、topic(#、*规则匹配)、和headers

2、Queue

    消息队列,一个Exchange包含多个Queue,需要Binding,是存储数据的容器

3、Bingding

    绑定Exchang和Queue之间的关联,绑定时需要指定4种路由规则,Exchange和Queue之间为多对多的关系

4、Message

    顾名思义-消息,由消息体和消息头组成,消息头包含routing-key路由键,会匹配Exchange中的Queue路由规则。可全局设置消息的优先权,deliver-mode是否持久性存储。

5、Channel

    信道,AMQP命令都是已通过信道发送的,多路复用的双向数据流通道,包括发布、订阅、接收消息。

6、Consumer

    消息的消费者,通过channel连接Queue进行消费消息。

7、Virtual Host

    虚拟主机,可以表示多个Exchange相互独立,默认是的Vhost是/

8、Broker

    多个Exchange的集合

docker安装rabbitMq

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)

四种路由规则

direct(一对一)

    消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routingkey标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式

fanout(广播)

    每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic(规则匹配)

    交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词。

header-较少使用

    exchange主要通过发送的request message中的header进行匹配,其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。

可视化

创建Exchange

创建Queue

Exchange绑定Queue

发送消息

获取消息

代码详情

创建Exchange

Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[{}]创建成功:","hello-java-exchange");

创建Queue

Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功:","hello-java-queue");

绑定队列

Binding binding = new Binding("hello-java-queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",
                null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding[{}]创建成功:","hello-java-binding");

spring-boot整合

引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

启动配置

@EnableRabbit

RabbitAutoConfiguration就会自动生效

配置文件

# RabbitMQ配置
spring.rabbitmq.host=rabbitIP
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

发送信息

@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend("exchange","路由键",消息体);

相关注解使用

监听队列中的消息

@RabbitListener(queues={"queue-name"})

处理队列中不同类型的消息体

@RabbitHandler

手动ACK

    默认情况下RabbitMQ在消息发出后就立即将这条消息删除即为自动ack,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。
     注:如果手动ack时有个requeue参数设置为true,那么就存在该消息又会重复消费,如果一直不消费就会造成无限消费。
提供方法

Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
Channel.basicNack (用于否定确认)
Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

@RabbitListener
public void listener(Message messag,Channel channel){
    long deliveryTag = messag.getMessageProperties().getDeliveryTag();
    try {
        channel.basicAck(deliveryTag,false);
    } catch (IOException e) {
        e.printStackTrace();
        try {
            channel.basicReject(deliveryTag,false);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

死信队列

    说简单点就是备份队列,而死信的来源有以下几种:

消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
消息在队列的存活时间超过设置的TTL时间。
消息队列的消息数量已经超过最大队列长度。

死信队列消息如果进入

    “死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃
配置
/**************** 死信配置 *****************/
/**
 * 死信交换机
 */
@Bean
DirectExchange deadExchange() {
    return new DirectExchange("deadExchange", true, false);
}

/**
 * 死信队列
 */
@Bean
public Queue deadQueue() {
    return new Queue("deadQueue", true, false, false);
}

@Bean
Binding deadRouteBinding() {
    return BindingBuilder.bind(deadQueue())
            .to(deadExchange())
            .with("deadRouting");
}

/**
 * 转发到 死信队列,配置参数
 */
private Map<String, Object> deadQueueArgs() {
    Map<String, Object> map = new HashMap<>();
    // 绑定该队列到死信交换机
    map.put("x-dead-letter-exchange", "deadExchange");
    map.put("x-dead-letter-routing-key", "deadRouting");
    return map;
}
标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/weixin_43188446/article/details/138450093
版权归原作者 华贵季忆 所有, 如有侵权,请联系我们删除。

“RabbitMQ详细介绍”的评论:

还没有评论