0


RabbitMQ的四种交换器以及死信队列介绍

前言

RabbitMQ作为一款用途非常广泛的消息队列,可以做到解耦,异步调用,以及流量削峰等非常强大的功能(上一篇博客有详细介绍
四种MQ的介绍与区别
)。接下来详细介绍RabbitMQ的具体代码实现~

一,RabbitMQ概述

RabbitMQ是一个使用Erlang语言开发的(即安装RabbitMQ之前,必须先安装Erlang,一键式傻瓜安装),实现AMQP (高级消息队列协议)的开源消息中间件。它可以实现异步消息处理,是一款消息代理,它接受和转发消息。

RabbitMQ有以下特点:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。在消息进入MQ前由Exchange (交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。

二,基础代码实现

2.1 添加依赖以及配置相关信息

首先添加依赖,如下

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

这里使用了自动配置版本,若指定version,需要和springboot版本保持一致。
然后在yaml文件配置RabbitMQ的相关ip以及端口

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

这里使用了自动配置版本,若指定version,需要和springboot版本保持一致

2.2 创建队列相关配置

@ConfigurationpublicclassRabbitMQConfig{/**
     * 创建队列*/@BeanpublicQueuecreateQ1(){returnnewQueue("queue01");}}

2.3 创建监听器相关配置

@Component@RabbitListener(queues ="queue01")//queues指定监听的队列名称,即要消费哪一个队列的消息publicclassMqListener{@RabbitHandlerpublicvoidhandler(String msg){//处理消息System.err.println("监听器消费消息:"+msg+"--->"+LocalTime.now());}}

2.4 发送消息

这里我们创建一个接口,来向队列发送消息,看看能不能被监听器消费掉

@RestControllerpublicclassController{@ResourceprivateRabbitTemplate template;@GetMapping("/api/mq/test1")publicStringsendMsg(String msg){System.out.println("发送消息:"+msg+"--->"+LocalTime.now());
        template.convertAndSend("","queue01",msg);return"OK";}}

在这里插入图片描述
上述便是最基础的消息队列(点对点),一个队列,一个生产者,一个消费者。

三,Work消息(一个队列,多个消费者)

如果同时有多个监听器监听同一个队列呢?下面再添加一个监听器

@Component@RabbitListener(queues ="queue01")publicclassMqListener01{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener01)监听器消费消息:"+msg+"--->"+LocalTime.now());}}

接下来我们多次向队列发送消息
修改一下发送消息接口:

@RestControllerpublicclassController{@ResourceprivateRabbitTemplate template;@GetMapping("/api/mq/test1")publicStringsendMsg(String msg){for(int i=1; i<11; i++){System.out.println("第"+i+"次发送消息:"+msg+"--->"+LocalTime.now());
            template.convertAndSend("","queue01","第"+i+"个消息"+msg);}return"OK";}}

控制台输出如下:
在这里插入图片描述因此,我们可以总结以下:
当有多个监听器监听同一个消息队列,监听器会相互竞争处理消息。

四,Exchange消息(核心)

RabbitMQ基于Exchange实现消息的一对多,一个消息可以被消费多次

Exchange有4种类型:

1.fanout 直接转发,不对消息做过滤

2.direct 路由关键字过滤,只支持精确值,可以对消息进行过滤匹配

3.topic 路由关键字过滤,支持模糊值, *:1个单词 #:0或多个单词

4.header 消息头过滤,对消息的请求消息头进行匹配过滤,any(类似 or):任意1个 all:全部(类似 and)

4.1 fanout 交换器

首先,创建两个消息队列,以及一个交换器,两个消息队列分别与交换器绑定

publicclassRabbitMQConfig01{/**
     * 创建交换器*/@BeanpublicFanoutExchangecreateFe(){returnnewFanoutExchange("fanout");}/**
     * 创建队列*/@BeanpublicQueuecreateQ2(){returnnewQueue("queue02");}@BeanpublicQueuecreateQ3(){returnnewQueue("queue03");}/**
     * 创建绑定关系*/@BeanpublicBindingcreateBq2(FanoutExchange fe){returnBindingBuilder.bind(createQ2()).to(fe);}@BeanpublicBindingcreateBq3(FanoutExchange fe){returnBindingBuilder.bind(createQ3()).to(fe);}}

接下来我们创建两个监听器,分别监听queue02和queue03这两个消息队列

@Component@RabbitListener(queues ="queue02")publicclassMqListener02{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener02)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue03")classMqListener03{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener03)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}

现在,创建一个接口,来向交换器发送信息

@GetMapping("/api/mq/test2")publicStringsendMsg02(String msg){System.out.println("向交换器发送消息");
    template.convertAndSend("fanout","", msg);return"OK";}

结果如下
在这里插入图片描述可以得到以下结论:
向交换器发送消息,该交换器绑定的所有队列都会收到消息。即一条消息被多个消费者多次消费

4.2 direct 交换器

direct 交换器可以对指定的固定字符进行过滤
首先创建direct交换器,并且绑定队列
@ConfigurationpublicclassRabbitMQConfig02{/**
     * 创建交换器*/@BeanpublicDirectExchangecreateDe(){returnnewDirectExchange("direct");}/**
     * 创建队列*/@BeanpublicQueuecreateQ4(){returnnewQueue("queue04");}@BeanpublicQueuecreateQ5(){returnnewQueue("queue05");}/**
     * 创建绑定关系*/@BeanpublicBindingcreateBq4(DirectExchange de){returnBindingBuilder.bind(createQ4()).to(de).with("info");//创建绑定关系,并且声明固定字符}@BeanpublicBindingcreateBq5(DirectExchange de){returnBindingBuilder.bind(createQ5()).to(de).with("error");}}
创建监听器
@Component@RabbitListener(queues ="queue04")publicclassMqListener04{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener04_info)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue05")classMqListener05{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener05_error)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}

创建接口,向交换器发送消息

@GetMapping("/api/mq/test3")publicStringsendMsg03(String msg,String key){System.out.println("向交换器发送消息");
        template.convertAndSend("direct", key, msg);//且指定key,对应交换器的过滤字符return"OK";}

结果如下
在这里插入图片描述可以看到,我们向direct交换器发送消息时,指定了key,即该消息只会被转发到指定了相同key的消息队列上

4.3 topic 交换器

首先,创建topic交换器,且绑定队列

@ConfigurationpublicclassRabbitMQConfig03{/**
     * 创建交换器*/@BeanpublicTopicExchangecreateTe(){returnnewTopicExchange("topic");}/**
     * 创建队列*/@BeanpublicQueuecreateQ6(){returnnewQueue("queue06");}@BeanpublicQueuecreateQ7(){returnnewQueue("queue07");}/**
     * 创建绑定关系*/@BeanpublicBindingcreateBq6(TopicExchange te){returnBindingBuilder.bind(createQ6()).to(te).with("*");//*表示一个单词}@BeanpublicBindingcreateBq7(TopicExchange te){returnBindingBuilder.bind(createQ7()).to(te).with("#");// #表示任意个单词}}
创建监听器监听队列
@Component@RabbitListener(queues ="queue06")publicclassMqListener06{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener06_*)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue07")classMqListener07{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener07_#)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}

创建接口,向交换器发送消息

@GetMapping("/api/mq/test4")publicStringsendMsg04(String msg,String key){System.out.println("向交换器发送消息");
        template.convertAndSend("topic", key, msg);return"OK";}

结果如下
在这里插入图片描述
可以看到,我们指定发送的key为“skr”,两个队列的绑定关系都成立
接下来,我们换一下,key為“skr.skr”
在这里插入图片描述
可以看到成功发送消息到过滤字符为‘#’的队列,应为“skr.skr”相当于两个单词(.为分隔符)

4.4 header 交换器

创建header交换器

@ConfigurationpublicclassRabbitMQConfig04{/**
     * 创建交换器*/@BeanpublicHeadersExchangecreateHe(){returnnewHeadersExchange("header");}/**
     * 创建队列*/@BeanpublicQueuecreateQ8(){returnnewQueue("queue08");}@BeanpublicQueuecreateQ9(){returnnewQueue("queue09");}/**
     * 创建绑定关系*/@BeanpublicBindingcreateBq8(HeadersExchange he){returnBindingBuilder.bind(createQ8()).to(he).whereAny("id","author").exist();//any 任意1个即可}@BeanpublicBindingcreateBq9(HeadersExchange he){returnBindingBuilder.bind(createQ9()).to(he).whereAll("id","author").exist();//all 同时满足}}

创建监听器

@Component@RabbitListener(queues ="queue08")publicclassMqListener08{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener08_any)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue09")classMqListener09{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener09_all)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}

创建接口发送消息

@GetMapping("/api/mq/test5")publicStringsendMsg(String msg){MessagePostProcessor postProcessor=newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
            message.getMessageProperties().setHeader("id","121");//指定消息头return message;}};
    template.convertAndSend("header","",msg,postProcessor);return"OK";}

结果如下
在这里插入图片描述即绑定any的队列收到了消息,因为我们只指定了id这一个消息头

五,死信队列

当一个队列过期,长度超过指定值就会成为死信队列。随之这个消息会自动绑定一个交换器。
5.1 创建死信队列,正常队列和交换器绑定

@ConfigurationpublicclassRabbitMQConfigdead{//1.创建 死信交换器  专门转发死信消息@Bean("dead")publicDirectExchangecreateDLX(){returnnewDirectExchange("deadex");}//2.创建队列//队列 生成死信 1.没有消费者 2.有效期 3.设置死信和死信匹配关键字@BeanpublicQueuecreateQ10(){Map<String,Object> param =newHashMap<>();//设置死信交换器
        param.put("x-dead-letter-exchange","deadex");//设置死信交换器 匹配的路由
        param.put("x-dead-letter-routing-key","test1");//设置队列中消息的有效期 8秒
        param.put("x-message-ttl",8000);returnQueueBuilder.durable("queue10").withArguments(param).build();}@BeanpublicQueuecreateQ11(){returnnewQueue("queue11");}//3.绑定@BeanpublicBindingcreateBd8(@Qualifier("dead")DirectExchange dead){returnBindingBuilder.bind(createQ11()).to(dead).with("test1");}}

这里queue10消息队列,在8s后会过期,成为死信队列

然后创建监听器,监听queue11这个正常队列的消息

@Component@RabbitListener(queues ="queue11")publicclassMqListenerDead{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListenerDead)监听器消费消息:"+msg+"--->"+LocalTime.now());}}

然后创建接口,向queue这个死信队列发送消息

@GetMapping("/api/mq/test6")publicStringsendMsg06(String msg){System.out.println("向8s后会成为死信队列的队列发送消息--->"+LocalTime.now());
        template.convertAndSend("","queue10", msg);return"OK";}

这里启动可能会报冲突,应为前面有一个注册bean同样注册了DirectExchange类型,我们加个注解,使之通过名称指定(正常启动请忽略),
修改后的direct交换器配置类

@ConfigurationpublicclassRabbitMQConfig02{/**
     * 创建交换器*/@Bean("direct")publicDirectExchangecreateDe(){returnnewDirectExchange("direct");}/**
     * 创建队列*/@BeanpublicQueuecreateQ4(){returnnewQueue("queue04");}@BeanpublicQueuecreateQ5(){returnnewQueue("queue05");}/**
     * 创建绑定关系*/@BeanpublicBindingcreateBq4(@Qualifier("direct")DirectExchange de){returnBindingBuilder.bind(createQ4()).to(de).with("info");}@BeanpublicBindingcreateBq5(@Qualifier("direct")DirectExchange de){returnBindingBuilder.bind(createQ5()).to(de).with("error");}}

在这里插入图片描述可以看到,监听queue11的监听器收到了消息,也就是说我们发送到queue10的消息被交换器转发了。

当然,如果有监听器监听queue10,是之在过期之前被消费掉,他也就不会成为死信队列被转发到queue11.

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/m0_73402102/article/details/140991010
版权归原作者 CodingHao 所有, 如有侵权,请联系我们删除。

“RabbitMQ的四种交换器以及死信队列介绍”的评论:

还没有评论