文章目录
1、RabbitMQ是什么
RabbitMQ是一个开源的
遵循AMQP协议
实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中
存储消息,转发消息
,具有
高可用
,
高可扩性
,
易用性
等特征。
1.1、RabbitMQ—使用场景
一般场景
像一般的下订单业务如下图:
将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端,
像这样耗时就很大 = 所有服务操作的耗时总和,而且若是这一整条执行链某个环节出了问题触发回滚,得不偿失
publicvoidmakeOrder(){// 1 :保存订单
orderService.saveOrder();// 2: 发送短信服务
messageService.sendSMS("order");//1-2 s// 3: 发送email服务
emailService.sendEmail("order");//1-2 s// 4: 发送APP服务
appService.sendApp("order");}
那么当我们开辟一个线程池去
异步处理
的话,也存在缺点:(最大的原因就是自己去实现起来,因素过多,实现复杂)
存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用
所以MQ就诞生了
只管下单,下单后直接就给用户提示下单成功,别的事交给mq去派发,让别的服务去mq拿消息处理
用户响应耗时 = 下单(主要)耗时(50ms) 别的(次要)处理服务全放到消息队列当中等待处理
publicvoidmakeOrder(){// 1 :保存订单
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");}
解耦
发送方将消息发送到消息队列中,接收方从队列中获取消息进行处理。这种松耦合的通信模式可以提高系统的可扩展性和灵活性。
这样使得下单服务并不受,发短信、发邮件、等等服务的影响(前提是下单不依赖任何一个服务的返回值)
削峰
当系统面临突然的请求高峰时,消息队列可以起到缓冲的作用。请求先进入消息队列排队,然后逐个被处理,使得系统能够逐渐消化高峰期的请求压力,避免过载和故障。
也就是如果某个时刻有大量的请求,此时都会到mq里面去,而不会瞬间开启很多线程去异步执行,从而达到销峰的效果,使得即便大量的用户请求来了,那系统处理请求还是非常平滑的
异步
消息队列支持异步处理,即发送方发送消息后,并不需要等待接收方立即处理完成,而是继续执行其他任务。接收方在合适的时间从队列中获取消息进行处理。这种异步处理可以提高系统的性能和响应速度,尤其适用于处理耗时的操作。
好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
2、Dokcer安装RabbitMQ
2.1安装Dokcer
- yum 包更新到最新
> yum update
- 安装软件包,yum-util提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install-y yum-utils device-mapper-persistent-data lvm2
- 设置yum源为阿里云
> yum-config-manager --add-repo
> http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
- 安装docker
> yum install docker-ce-y
- 安装后查看docker版本
> docker-v
- 安装加速镜像
从阿里云获取镜像加速器:
https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
sudomkdir-p /etc/docker
sudotee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://spukdfwp.mirror.aliyuncs.com"]
}
EOFsudo systemctl daemon-reload
sudo systemctl restart docke
2.2安装rabbitmq
- 路径:
https://www.rabbitmq.com/download.html
- 点击上图中标红线的 community Docker image,跳转到如下地址:
https://registry.hub.docker.com/_/rabbitmq/
当前可以看到安装镜像的时候可以设置用户名,密码,ip。就不用安装完进入容器内部设置
3. 官网给的安装案例
$ docker run -d--hostname my-rabbit --name some-rabbit -eRABBITMQ_DEFAULT_USER=user -eRABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
4.命令讲解
docker run -id--hostname my-rabbit --name=myrabbit -p15672:15672 rabbitmq:3-management
--hostname:指定容器主机名称
--name:指定容器名称
-p:将mq端口号映射到本地
-e 设置
5.修改命令创建并安装
docker run -di--name myrabbit -eRABBITMQ_DEFAULT_USER=admin -eRABBITMQ_DEFAULT_PASS=admin -p15672:15672 -p5672:5672 -p25672:25672 -p61613:61613 -p1883:1883 rabbitmq:3-management
6.阿里云开放上方命令 设置的端口号
-p15672:15672 -p5672:5672 -p25672:25672 -p61613:61613 -p1883:1883
7.安装成功
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3-management 6c3c2a225947 7 months ago 253MB
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management "docker-entrypoint.s…"6 minutes ago Up 6 minutes 4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp myrabbit
[root@iZbp1av1izm1qqcdfa0nndZ ~]#
8.停掉手动安装的rabbimq
systemctl stop rabbitmq-server
9.启动docker的rabbitmq容器
##查看容器[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1de1f1e10cb0 rabbitmq:3-management "docker-entrypoint.s…"9 minutes ago Up 9 minutes 4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp myrabbit
##启动容器 docker start 容器id(CONTAINER ID)[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker start 1de1f1e10cb0
1de1f1e10cb0
[root@iZbp1av1izm1qqcdfa0nndZ ~]#
10.通过服务器(
虚拟机ip+端口号(15672)
)访问RabbitMQ主页
http://192.168.157.128:15672
默认登录账号和密码都是admin
并且在admin账号下可以通过增加用户,给用户不同角色,也就对应不同的操作权限:
详情如下:
3、RabbitMQ入门案例 - Simple 简单模式
1.实现步骤:
1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程
2.构建一个maven工程
3.导入rabbitmq的maven依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
4.启动rabbitmq-server服务
systemctl start rabbitmq-server
或者
docker start myrabbit
5、定义生产者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1",false,false,true,null);// 6: 准备发送消息的内容String message ="你好,学相伴!!!";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容
channel.basicPublish("","queue1",null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}// 8: 关闭连接if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
1:执行发送,这个时候可以在web控制台查看到这个队列queue的信息
2:我们可以进行对队列的消息进行预览和测试如下:
3:进行预览和获取消息进行测试
NACK 只是做消息预览,不会吧消息从队列移除
ACK相当于手动的把消息处理了,这个时候就会把消息从队列剔除,导致消息丢失
6、定义消费者
importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumer{publicstaticvoidmain(String[] args){// 所有的中间件技术都是基于tcp/ip协议基础上构建新型协议规范,只不过rabbitmq遵循的是amqp// ip port// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
channel = connection.createChannel();
channel.basicConsume("queue1",true,newDeliverCallback(){publicvoidhandle(String consumerTag,Delivery message)throwsIOException{System.out.println("收到的消息是:"+newString(message.getBody(),"UTF-8"));}},newCancelCallback(){publicvoidhandle(String s)throwsIOException{System.out.println("接收失败了。。。");}});System.out.println("开始接收消息");System.in.read();}catch(Exception e){
e.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
消费者和生产者的区别在于,消费者是从mq中取消息,而生产者是从mq中存消息
4、RabbitMQ的核心组成部分
核心概念:
- Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
- Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
- Message:消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
- Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
- Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
- Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
- Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
4.1 RabbitMQ整体架构
4.2RabbitMQ的运行流程
所以发送消息的时候没有设置交换机,rabbitmq发送消息一定会有默认一个交换机,并且消息不是直接到队列当中的,而是由交换机根据路由键发送消息到绑定的队列
5、RabbitMQ的模式
5.1 发布订阅模式–fanout
特点:Fanout—发布与订阅模式,是一种
广播机制
,它是
没有路由key的模式。
也就是只要生产者发送一条消息经过交换机加入队列中,左右的消费者都能拿到消息
这里就直接用web界面演示
- 新建一个fanout模式的交换机(让交换机代替生产者去发消息)
- 创建3个消息队列q1、q2、q3
- 将队列绑定到交换机上
- 由交换机代替生产者发送消息
- 然后三个队列都会有一个交换机发来的消息
- q1队列消息正常被消费者拾取(其他队列一样)
- q1队列消息正常被消费者拾取之后,队列消息-1
ACK后 页面在自动会更新队列消息条目,默认5秒
5.2路由模式-direct模式
Direct模式是fanout模式上的一种叠加,增加了
路由RoutingKey的模式
。
*这样就可以给指定设置了路由key的队列发送消息,并且一个队列可以有多个路由key,当发送消息指定了路由key,则
只有设置了相对应的路由key的队列才能接收到消息
5.3路由模式-Topic模式
Topic模式是direct模式上的一种叠加,增加了
模糊路由RoutingKey的模式。
*
代表一级(必须有一级)
#
代表0级或者多级
注意
最好用代码的形式来进行绑定
*在实际开发中,我们既可以在
RabbitMq的web界面进行交换机的创建,队列的创建,绑定路由key等等操作。
还可以在
生产者代码里面通过channel.XXX的方式设置交换机,设置队列,设置路由key,等等,效果是一样的
例如下面代码:生产者(消费者也可以声明) 代码实现【交换机和队列】的声明和绑定
packagecom.xxx.rabbitmq.all;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* RabbitMQ入门案例 - 完整的声明方式创建
* 代码实现创建交换机和队列,并绑定关系
* 生产者
*/publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("121.196.153.197");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 准备发送消息的内容String message ="你好,交换机";// 6:准备交换机。取名规范 : 类型_业务模块_交换机String exchangeName ="direct_order_exchange";// 7: 定义路由keyString routeKeyOrder ="order";String routeKeyCourse ="course";// 8: 指定交换机的类型String exchangeType ="direct";// 9: 声明交换机/注册交换机// @params1: 交换机名称// @params2: 交换机类型// @params3: 是否持久化 所谓的持久化就是值:交换机不会随着服务器的重启造成丢失,如果是true代表不丢失,false重启丢失
channel.exchangeDeclare(exchangeName, exchangeType,true);// 10: 声明队列/注册队列// @params1: 队列名称// @params2: 是否持久化// @params3: 是不是排他性,是否是独占独立// @params4: 是不是自动删除 随着最后一个消费者消息完毕消息以后是否把队列自动删除// @params5: 是不是有参数 参数携带可能会引发headers模式
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);// 11: 绑定队列// @params1: 队列名称// @params2: 交换机名称// @params3: routeKey
channel.queueBind("queue5", exchangeName, routeKeyOrder);
channel.queueBind("queue6", exchangeName, routeKeyOrder);
channel.queueBind("queue7", exchangeName, routeKeyCourse);// 12: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称// @params3: 属性配置// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routeKeyOrder,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 13: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
消费者去队列拿消息:
packagecom.xxx.rabbitmq.all;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* RabbitMQ入门案例 - 完整的声明方式创建
* 消费者
*/publicclassConsumer{privatestaticRunnable runnable =newRunnable(){publicvoidrun(){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("121.196.153.197");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection();// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue5").start();newThread(runnable,"queue6").start();newThread(runnable,"queue7").start();}}
5.4轮询模式 - Work模式
5.4.1Work模式 - 轮询模式(Round-Robin)
轮询模式的分发:
一个消费者一条,按均分配;
生产者:
packagecom.xuexiangban.rabbitmq.work.lunxun;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* @author: 学相伴-飞哥
* @description: Producer 简单队列生产者
* @Date : 2021/3/2
*/publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for(int i =1; i <=20; i++){//消息的内容String msg ="学相伴:"+ i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
channel.basicPublish("","queue1",null, msg.getBytes());}System.out.println("消息发送成功!");}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
消费者work1:
packagecom.xuexiangban.rabbitmq.work.lunxun;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* @author: 学相伴-飞哥
* @description: Consumer
* @Date : 2021/3/2
*/publicclassWork1{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */// 这里如果queue已经被创建过一次了,可以不需要定义// channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1",true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work1-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(2000);}catch(Exception ex){
ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work1-开始接受消息");System.in.read();}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
work2
packagecom.xuexiangban.rabbitmq.work.lunxun;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* @author: 学相伴-飞哥
* @description: Consumer
* @Date : 2021/3/2
*/publicclassWork2{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("192.168.157.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1",true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work2-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(200);}catch(Exception ex){
ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work2-开始接受消息");System.in.read();}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
*work1和work2的消息处理能力不同,但是
最后处理的消息条数相同,是“按均分配
”。*
往队列发送20条消息,
结果就是轮询给消费者拿消息,即便有的消费者消费很快(也只能按照新顺序拿消息),也只能按照轮询一个一个拿,
也就是说,不会因为某个消费者所在的服务器满,而导致少消费,一定是公平消费
5.4.1Work模式 - 公平分发模式(Round-Robin)
根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;
按劳分配
;
相比较轮询模式,公平分发的不同在于:
修改应答方式为手动
qos设置为1,就代表消费者拿到cpu的执行权就每次从只拿走一条消息,一条一条的拿。若不设置,就是默认轮询拿一条
所以根据队列堆积的消息条数以及内存和磁盘空间来合理设置qos
这个时候,性能好的消费者就会消费得多,而性能差的消费者就消费得少,能者多劳
更新中------
参考来自:狂神
版权归原作者 今天你写代码了吗?? 所有, 如有侵权,请联系我们删除。