前言
RabbitMQ作为一款用途非常广泛的消息队列,可以做到解耦,异步调用,以及流量削峰等非常强大的功能(上一篇博客有详细介绍
四种MQ的介绍与区别
)。接下来详细介绍RabbitMQ的具体代码实现~
一,RabbitMQ概述
RabbitMQ是一个使用Erlang语言开发的(即安装RabbitMQ之前,必须先安装Erlang,一键式傻瓜安装),实现AMQP (高级消息队列协议)的开源消息中间件。它可以实现异步消息处理,是一款消息代理,它接受和转发消息。
RabbitMQ有以下特点:
- 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
- 灵活的分发消息策略。在消息进入MQ前由Exchange (交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
- 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
二,基础代码实现
2.1 添加依赖以及配置相关信息
首先添加依赖,如下
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
这里使用了自动配置版本,若指定version,需要和springboot版本保持一致。
然后在yaml文件配置RabbitMQ的相关ip以及端口
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
这里使用了自动配置版本,若指定version,需要和springboot版本保持一致
2.2 创建队列相关配置
@ConfigurationpublicclassRabbitMQConfig{/**
* 创建队列*/@BeanpublicQueuecreateQ1(){returnnewQueue("queue01");}}
2.3 创建监听器相关配置
@Component@RabbitListener(queues ="queue01")//queues指定监听的队列名称,即要消费哪一个队列的消息publicclassMqListener{@RabbitHandlerpublicvoidhandler(String msg){//处理消息System.err.println("监听器消费消息:"+msg+"--->"+LocalTime.now());}}
2.4 发送消息
这里我们创建一个接口,来向队列发送消息,看看能不能被监听器消费掉
@RestControllerpublicclassController{@ResourceprivateRabbitTemplate template;@GetMapping("/api/mq/test1")publicStringsendMsg(String msg){System.out.println("发送消息:"+msg+"--->"+LocalTime.now());
template.convertAndSend("","queue01",msg);return"OK";}}
上述便是最基础的消息队列(点对点),一个队列,一个生产者,一个消费者。
总
三,Work消息(一个队列,多个消费者)
如果同时有多个监听器监听同一个队列呢?下面再添加一个监听器
@Component@RabbitListener(queues ="queue01")publicclassMqListener01{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener01)监听器消费消息:"+msg+"--->"+LocalTime.now());}}
接下来我们多次向队列发送消息
修改一下发送消息接口:
@RestControllerpublicclassController{@ResourceprivateRabbitTemplate template;@GetMapping("/api/mq/test1")publicStringsendMsg(String msg){for(int i=1; i<11; i++){System.out.println("第"+i+"次发送消息:"+msg+"--->"+LocalTime.now());
template.convertAndSend("","queue01","第"+i+"个消息"+msg);}return"OK";}}
控制台输出如下:
因此,我们可以总结以下:
当有多个监听器监听同一个消息队列,监听器会相互竞争处理消息。
四,Exchange消息(核心)
RabbitMQ基于Exchange实现消息的一对多,一个消息可以被消费多次
Exchange有4种类型:
1.fanout 直接转发,不对消息做过滤
2.direct 路由关键字过滤,只支持精确值,可以对消息进行过滤匹配
3.topic 路由关键字过滤,支持模糊值, *:1个单词 #:0或多个单词
4.header 消息头过滤,对消息的请求消息头进行匹配过滤,any(类似 or):任意1个 all:全部(类似 and)
4.1 fanout 交换器
首先,创建两个消息队列,以及一个交换器,两个消息队列分别与交换器绑定
publicclassRabbitMQConfig01{/**
* 创建交换器*/@BeanpublicFanoutExchangecreateFe(){returnnewFanoutExchange("fanout");}/**
* 创建队列*/@BeanpublicQueuecreateQ2(){returnnewQueue("queue02");}@BeanpublicQueuecreateQ3(){returnnewQueue("queue03");}/**
* 创建绑定关系*/@BeanpublicBindingcreateBq2(FanoutExchange fe){returnBindingBuilder.bind(createQ2()).to(fe);}@BeanpublicBindingcreateBq3(FanoutExchange fe){returnBindingBuilder.bind(createQ3()).to(fe);}}
接下来我们创建两个监听器,分别监听queue02和queue03这两个消息队列
@Component@RabbitListener(queues ="queue02")publicclassMqListener02{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener02)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue03")classMqListener03{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener03)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}
现在,创建一个接口,来向交换器发送信息
@GetMapping("/api/mq/test2")publicStringsendMsg02(String msg){System.out.println("向交换器发送消息");
template.convertAndSend("fanout","", msg);return"OK";}
结果如下
可以得到以下结论:
向交换器发送消息,该交换器绑定的所有队列都会收到消息。即一条消息被多个消费者多次消费
4.2 direct 交换器
direct 交换器可以对指定的固定字符进行过滤
首先创建direct交换器,并且绑定队列
@ConfigurationpublicclassRabbitMQConfig02{/**
* 创建交换器*/@BeanpublicDirectExchangecreateDe(){returnnewDirectExchange("direct");}/**
* 创建队列*/@BeanpublicQueuecreateQ4(){returnnewQueue("queue04");}@BeanpublicQueuecreateQ5(){returnnewQueue("queue05");}/**
* 创建绑定关系*/@BeanpublicBindingcreateBq4(DirectExchange de){returnBindingBuilder.bind(createQ4()).to(de).with("info");//创建绑定关系,并且声明固定字符}@BeanpublicBindingcreateBq5(DirectExchange de){returnBindingBuilder.bind(createQ5()).to(de).with("error");}}
创建监听器
@Component@RabbitListener(queues ="queue04")publicclassMqListener04{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener04_info)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue05")classMqListener05{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener05_error)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}
创建接口,向交换器发送消息
@GetMapping("/api/mq/test3")publicStringsendMsg03(String msg,String key){System.out.println("向交换器发送消息");
template.convertAndSend("direct", key, msg);//且指定key,对应交换器的过滤字符return"OK";}
结果如下
可以看到,我们向direct交换器发送消息时,指定了key,即该消息只会被转发到指定了相同key的消息队列上
4.3 topic 交换器
首先,创建topic交换器,且绑定队列
@ConfigurationpublicclassRabbitMQConfig03{/**
* 创建交换器*/@BeanpublicTopicExchangecreateTe(){returnnewTopicExchange("topic");}/**
* 创建队列*/@BeanpublicQueuecreateQ6(){returnnewQueue("queue06");}@BeanpublicQueuecreateQ7(){returnnewQueue("queue07");}/**
* 创建绑定关系*/@BeanpublicBindingcreateBq6(TopicExchange te){returnBindingBuilder.bind(createQ6()).to(te).with("*");//*表示一个单词}@BeanpublicBindingcreateBq7(TopicExchange te){returnBindingBuilder.bind(createQ7()).to(te).with("#");// #表示任意个单词}}
创建监听器监听队列
@Component@RabbitListener(queues ="queue06")publicclassMqListener06{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener06_*)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue07")classMqListener07{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener07_#)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}
创建接口,向交换器发送消息
@GetMapping("/api/mq/test4")publicStringsendMsg04(String msg,String key){System.out.println("向交换器发送消息");
template.convertAndSend("topic", key, msg);return"OK";}
结果如下
可以看到,我们指定发送的key为“skr”,两个队列的绑定关系都成立
接下来,我们换一下,key為“skr.skr”
可以看到成功发送消息到过滤字符为‘#’的队列,应为“skr.skr”相当于两个单词(.为分隔符)
4.4 header 交换器
创建header交换器
@ConfigurationpublicclassRabbitMQConfig04{/**
* 创建交换器*/@BeanpublicHeadersExchangecreateHe(){returnnewHeadersExchange("header");}/**
* 创建队列*/@BeanpublicQueuecreateQ8(){returnnewQueue("queue08");}@BeanpublicQueuecreateQ9(){returnnewQueue("queue09");}/**
* 创建绑定关系*/@BeanpublicBindingcreateBq8(HeadersExchange he){returnBindingBuilder.bind(createQ8()).to(he).whereAny("id","author").exist();//any 任意1个即可}@BeanpublicBindingcreateBq9(HeadersExchange he){returnBindingBuilder.bind(createQ9()).to(he).whereAll("id","author").exist();//all 同时满足}}
创建监听器
@Component@RabbitListener(queues ="queue08")publicclassMqListener08{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener08_any)监听器消费消息:"+msg+"--->"+LocalTime.now());}@Component@RabbitListener(queues ="queue09")classMqListener09{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListener09_all)监听器消费消息:"+ msg +"--->"+LocalTime.now());}}}
创建接口发送消息
@GetMapping("/api/mq/test5")publicStringsendMsg(String msg){MessagePostProcessor postProcessor=newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setHeader("id","121");//指定消息头return message;}};
template.convertAndSend("header","",msg,postProcessor);return"OK";}
结果如下
即绑定any的队列收到了消息,因为我们只指定了id这一个消息头
五,死信队列
当一个队列过期,长度超过指定值就会成为死信队列。随之这个消息会自动绑定一个交换器。
5.1 创建死信队列,正常队列和交换器绑定
@ConfigurationpublicclassRabbitMQConfigdead{//1.创建 死信交换器 专门转发死信消息@Bean("dead")publicDirectExchangecreateDLX(){returnnewDirectExchange("deadex");}//2.创建队列//队列 生成死信 1.没有消费者 2.有效期 3.设置死信和死信匹配关键字@BeanpublicQueuecreateQ10(){Map<String,Object> param =newHashMap<>();//设置死信交换器
param.put("x-dead-letter-exchange","deadex");//设置死信交换器 匹配的路由
param.put("x-dead-letter-routing-key","test1");//设置队列中消息的有效期 8秒
param.put("x-message-ttl",8000);returnQueueBuilder.durable("queue10").withArguments(param).build();}@BeanpublicQueuecreateQ11(){returnnewQueue("queue11");}//3.绑定@BeanpublicBindingcreateBd8(@Qualifier("dead")DirectExchange dead){returnBindingBuilder.bind(createQ11()).to(dead).with("test1");}}
这里queue10消息队列,在8s后会过期,成为死信队列
然后创建监听器,监听queue11这个正常队列的消息
@Component@RabbitListener(queues ="queue11")publicclassMqListenerDead{@RabbitHandlerpublicvoidhandler(String msg){System.err.println("(MqListenerDead)监听器消费消息:"+msg+"--->"+LocalTime.now());}}
然后创建接口,向queue这个死信队列发送消息
@GetMapping("/api/mq/test6")publicStringsendMsg06(String msg){System.out.println("向8s后会成为死信队列的队列发送消息--->"+LocalTime.now());
template.convertAndSend("","queue10", msg);return"OK";}
这里启动可能会报冲突,应为前面有一个注册bean同样注册了DirectExchange类型,我们加个注解,使之通过名称指定(正常启动请忽略),
修改后的direct交换器配置类
@ConfigurationpublicclassRabbitMQConfig02{/**
* 创建交换器*/@Bean("direct")publicDirectExchangecreateDe(){returnnewDirectExchange("direct");}/**
* 创建队列*/@BeanpublicQueuecreateQ4(){returnnewQueue("queue04");}@BeanpublicQueuecreateQ5(){returnnewQueue("queue05");}/**
* 创建绑定关系*/@BeanpublicBindingcreateBq4(@Qualifier("direct")DirectExchange de){returnBindingBuilder.bind(createQ4()).to(de).with("info");}@BeanpublicBindingcreateBq5(@Qualifier("direct")DirectExchange de){returnBindingBuilder.bind(createQ5()).to(de).with("error");}}
可以看到,监听queue11的监听器收到了消息,也就是说我们发送到queue10的消息被交换器转发了。
当然,如果有监听器监听queue10,是之在过期之前被消费掉,他也就不会成为死信队列被转发到queue11.
版权归原作者 CodingHao 所有, 如有侵权,请联系我们删除。