0


spring boot RabbitMq基础教程

RabbitMq

由于

RabbitMQ

采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与

RabbitMQ

交互。并且

RabbitMQ

官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

概念

请添加图片描述

  • **publisher**:生产者,也就是发送消息的一方
  • **consumer**:消费者,也就是消费消息的一方
  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

交换机

我们打开Exchanges选项卡,可以看到已经存在很多交换机:
请添加图片描述

我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:
请添加图片描述

请添加图片描述

这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。

队列

我们打开

Queues

选项卡,新建一个队列:

在这里插入图片描述

命名为

hello.queue1


在这里插入图片描述

再以相同的方式,创建一个队列,密码为

hello.queue2

,最终队列列表如下:
在这里插入图片描述

此时,我们再次向

amq.fanout

交换机发送一条消息。会发现消息依然没有到达队列!!
发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。

绑定关系

点击

Exchanges

选项卡,点击

amq.fanout

交换机,进入交换机详情页,然后点击

Bindings

菜单,在表单中填写要绑定的队列名称:
请添加图片描述

相同的方式,将hello.queue2也绑定到改交换机。
最终,绑定结果如下:
请添加图片描述

发送消息

再次回到exchange页面,找到刚刚绑定的

amq.fanout

,点击进入详情页,再次发送一条消息:
请添加图片描述

回到

Queues

页面,可以发现

hello.queue

中已经有一条消息了:
请添加图片描述

点击队列名称,进入详情页,查看队列详情,这次我们点击get message:
请添加图片描述

可以看到消息到达队列了:
请添加图片描述

这个时候如果有消费者监听了MQ的

hello.queue1

hello.queue2

队列,自然就能接收到消息了。

用户管理

点击

Admin

选项卡,首先会看到RabbitMQ控制台的用户管理界面:
请添加图片描述

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的

itheima

这个用户。仔细观察用户表格中的字段,如下:

  • Nameadmin,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用

virtual host

的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

交换机

。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
请添加图片描述

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
请添加图片描述

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

我们的计划是这样的:

  • 创建一个名为 hmall.fanout的交换机,类型是Fanout
  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout
消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@TestpublicvoidtestFanoutExchange(){// 交换机名称String exchangeName ="hmall.fanout";// 消息String message ="hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName,"", message);}
消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
请添加图片描述

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

案例需求如图
请添加图片描述

  1. 声明一个名为hmall.direct的交换机
  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred
  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred
  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 在publisher中编写测试方法,向hmall.direct发送消息
声明队列和交换机

首先在控制台声明两个队列

direct.queue1

direct.queue2

,然后声明一个direct类型的交换机,命名为

hmall.direct

:
在这里插入图片描述

然后使用

red

blue

作为key,绑定

direct.queue1

hmall.direct

同理,使用

red

yellow

作为key,绑定

direct.queue2

hmall.direct

,步骤略,最终结果:

消息接收

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(queues ="direct.queue1")publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="direct.queue2")publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}
消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="hmall.direct";// 消息String message ="红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"red", message);}

由于使用的red这个key,所以两个消费者都收到了消息:

我们再切换为blue这个key:

@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="hmall.direct";// 消息String message ="最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"blue", message);}

你会发现,只有消费者1收到了消息:

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

交换机

说明
Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。
只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以

.

分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

图示:
请添加图片描述

假如此时publisher发送的消息使用的

RoutingKey

共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括: - china.news- china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括: - china.news- japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。
首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:
请添加图片描述

消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

/**
 * topicExchange
 */@TestpublicvoidtestSendTopicExchange(){// 交换机名称String exchangeName ="hmall.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}
消息接收

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}

声明队列和交换机

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

fanout示例

在consumer中创建一个类,声明队列和交换机:

packagecom.itheima.consumer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
     * 声明交换机
     * @return Fanout类型交换机
     */@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("hmall.fanout");}/**
     * 第1个队列
     */@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
     * 第2个队列
     */@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

packagecom.itheima.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/**
     * 声明交换机
     * @return Direct类型交换机
     */@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("hmall.direct").build();}/**
     * 第1个队列
     */@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1WithRed(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("red");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1WithBlue(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/**
     * 第2个队列
     */@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2WithRed(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("red");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2WithYellow(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue1"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue2"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}

是不是简单多了。
再试试Topic模式:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue1"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue2"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}

消息转换器

Spring的消息发送代码接收的消息体是一个Object:
请添加图片描述

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差
配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisher

consumer

两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>

注意,如果项目中引入了

spring-boot-starter-web

依赖,则无需再次引入

Jackson

依赖。

配置消息转换器,在

publisher

consumer

两个服务的启动类中添加一个Bean即可:

@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

总结

以上的代码已上传到Github

https://github.com/onenewcode/mq-demo

本文转载自: https://blog.csdn.net/studycodeday/article/details/133790057
版权归原作者 过去日记 所有, 如有侵权,请联系我们删除。

“spring boot RabbitMq基础教程”的评论:

还没有评论