1.为什么使用MQ(消息队列)、RabbitMQ特点
MQ(消息队列),典型的生产者消费者模式,生产者不断向消息队列发送消息,消费者不断从消息队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松实现了系统之间的解耦合。通过高效的可靠的消息传递机制进行系统之间的通信来实现分布式系统。
RabbitMQ官网:https://www.rabbitmq.com/
RabbitMQ官方文档:http://www.weicot.com/dev/guides/v2.0/install-gde/prereq/install-rabbitmq.html
RabbitMQ的Java客户端文档:https://www.rabbitmq.com/api-guide.html
RabbitMQ是实现了高级消息队列(AMQP)的开源消息代理软件(消息中间件).RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
优点:
- 异步处理:消息队列的异步处理机制,不需要立即处理共享数据带来的业务,可以通过消息队列来控制业务的执行。
- 解耦:传统的开发模式中,各个模块之间相互调用,数据共享,每个模块都需要关注其他模块是否挂掉。使用消息队列,可以将共享数据放在消息队列中,新增业务模块,业务模块可以订阅该消息,对原有的系统业务没有任何影响,降低了各个系统之间的耦合度,提高系统的可扩展性。
- 流量削峰:对服务器的请求通过消息队列再次拦截处理。
- 日志处理:通过对消息队列发送消息,来处理日志。
- 消息通讯:
缺点:
- 系统可用性降低
- 系统的复杂度提高
- 消息一致性问题
- 消息顺序消费问题
- 消息重复问题
Kafka、RocketMQ、RabbitMQ、ActiveMQ之间对比
KafkaRocketMQRabbitMQActiveMQ单机吞吐量10万级10万级万级万级开发语言ScalaJavaErlangJava高可用分布式架构分布式架构主从架构主从架构性能msmsusms功能简单的MQ功能分布式扩展性好,解决了顺序消息并发高,性能高,延时低功能完备
2.RabbitMQ的基本概念
- Broker:消息队列服务器实体。
- Exchange:消息交换机,消息接收规则,消息发送到哪个队列。
- Queue:消息的载体,存放消息,队列。
- Binding:绑定Exchange和Queue路由绑定
- Routing key:路由关键字,Exchange根据这个投递消息到哪个队列
- Channel:消息通道,客户端每个连接可以建立通道,一个通道代表一次会话任务。
- Vhost:一个Broker可以有多个Vhost,Vhost可以有不同的Exchange、Queue,不同模块对应不同的Vhost。
- Producer:消息生产者。
- Consumer:消息消费者。
3.Linux系统安装RabbitMQ
Erlang版本和RabbitMQ版本对应关系:https://www.rabbitmq.com/which-erlang.html
Erlang下载官网:https://www.erlang.org/downloads
3.1Erlang环境安装
rpm -qa | grep erlang | xargs rpm -e --nodeps #卸载之前安装的erlang
find [path] -name [filename] 查找文件
netstat -anop | grep 80 #查看80相关的端口是否被占用
yum install -y gcc gcc-c++ unixODBC-devel openssl-devel ncurses-devel #安装编译器wget https://github.com/erlang/otp/releases/download/OTP-23.3.4.2/otp_src_23.3.4.2.tar.gz
tar -zxvf otp_src_23.3.4.2.tar.gz #解压文件mv otp_src_23.3.4.2 /usr/local/ #移动解压文件cd /usr/local/otp_src_23.3.4.2/ #进入解压的目录mkdir../erlang #新建安装目录
./configure --prefix=/usr/local/erlang #配置安装路径makeinstall#安装echo'export PATH=/usr/local/erlang/bin:$PATH'>> /etc/profile # 配置erlang环境变量source /etc/profile #更新配置文件
erl -v # 测试是否安装成功
3.2RabbiMQ安装
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.6/rabbitmq-server-3.10.6-1.el8.noarch.rpm #下载安装包rpm -ivh rabbitmq-server-3.10.6-1.el8.noarch.rpm #如果提示安装的erlang环境低,但是自己安装erlang环境是正确的是,添加参数--nodepsrpm -ivh --nodeps rabbitmq-server-3.10.6-1.el8.noarch.rpm
systemctl start rabbitmq-server
systemctl status rabbitmq-server.service #查看rabbitmq启动状态
journalctl -xe 查询启动日志,发现没有erl环境
vim /usr/lib/rabbitmq/bin/rabbitmq-server #添加erl环境至文件中exportPATH=$PATH:/usr/local/erlang/bin
systemctl start rabbitmq-server # 启动rabbitmq
systemctl status rabbitmq-server #查看rabbitmq状态
注意:在没有安装可视化插件时,访问端口号没有页面
3.3启动rabbitmq并且配置可视化web插件
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management #执行插件安装命令
systemctl restart rabbitmq-server #重启服务#默认本地登陆账户guest 密码guest,远程无法访问
以系统命令方式启动,自动为节点命名为rabbit@hostname,会自动在/var/lib/rabbitmq/mnesia目录下生成对应的文件。使用rabbitmqctl stop关闭服务,会自动删除对应的[email protected]文件,里面对应的每次会不一样。使用systemctl stop rabbitmq-server也会删除pid文件。这里的pid文件和redis启动生成的pid文件应该类似。
3.4账户管理
rabbitmqctl add_user <username><password>#添加账号rabbitmqctl add_user admin <Aliyunoycm1234>
rabbitmqctl set_user_tags <username><administrator>#设置账号adminstrator权限,只有4种权限
rabbitmqctl set_permissions -p / <username>".*"".*"".*"#授予文件管理权限
rabbitmqctl change_password <username><password>#修改密码
rabbitmqctl delete_user <username>#删除用户
rabbitmqctl list_users #查看所用用户
rabbitmqctl add_user admin 1234#添加一个用户
rabbitmqctl set_user_tags admin administrator #设置权限
设置权限对应失败,会导致无法登陆,提示Not management user(非管理用户)
用户权限
- administrator
- monitoring
- policymaker
- managment
- none
3.5卸载RabbitMQ
yum命令安装的软件,yum remove 软件名字
rpm命令安装的软件,rpm -e --nodeps 软件名 --nodeps表示忽略依赖关系卸载
tar包安装,使用make uninstall 软件名
yum list |grep rabbitmq #查出yum安装的软件
yum list |grep rabbitmq |xargs yum remove
3.6 docker安装RabbitMQ
docker pull rabbitmq:management #拉取镜像docker run -di --name my-rabbitmq -e RABBITMQ_DEFUALT_USER=oycm -e RABBITMQ_DEFAULT_PASS=admin1234 -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management #-e配置,并不能直接访问可视化界面# 15672是可视化web端口# 5672是RabbitMQ消息接收中心端口,消息获取和发送的端口
3.7 RabbitMQ-server命令
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged #查看所有队列中的消息确认情况
rabbitmqctl list_exchanges #查看server的交换机name,type
rabbitmqctl list_bindings #查看绑定关系
4.RabbitMQ的6种消息发布模式
4.1、HelloWorld(Simple模式)
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client 客户端依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.15.0</version></dependency>
连接工具类
publicclassFactoryUtil{publicstaticConnectionFactorynewFactory(){ConnectionFactory factory =newConnectionFactory();
factory.setHost("43.143.86.174");
factory.setUsername("admin");
factory.setPassword("oycm1234");
factory.setVirtualHost("/");
factory.setPort(5672);return factory;}}Connection conn = factory.newConnection(addrArr);//可以指定连接时使用的端点列表。将使用第一个可到达的端点。在连接失败的情况下,使用端点列表使应用程序可以在原始节点关闭时连接到不同的节点。//Uri连接
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
factory.newConnection();
amqp_URI ="amqp://"+ amqp_authority +["/" vhost ]["?" query ];
amqp_authority =[amqp_userinfo"@"]+ host +[":" port];
amqp_userinfo = username [":" password];//eg:amqp://admin:1234@hostName(ip):5672/%2f; %2f代表/
消息生产者
publicclassHelloWorld{privatefinalstaticString QUEUE_NAME ="hello_world";publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.queueDeclare(QUEUE_NAME,true,false,false,null);Scanner in =newScanner(System.in);while(in.hasNext()){String message = in.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println(message +",线程===>"+Thread.currentThread().getName());}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}}//通过输入流不断开连接,可以一直往RabbitMQ-server发送消息
消息消费者
publicclassHelloWorldReceive{privatefinalstaticString QUEUE_NAME ="hello_world";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);System.out.println("等待接收消息");DeliverCallback deliverCallback =(String consumerTag,Delivery message)->{System.out.println("接收的消息是:======> "+newString(message.getBody()));System.out.println("comsumerTag ===> "+ consumerTag);System.out.println("线程 ==>"+Thread.currentThread().getName());};
channel.basicConsume(QUEUE_NAME,deliverCallback,(consumerTag)->{System.out.println("取消回调 ===> "+ consumerTag);System.out.println("线程 ===> "+Thread.currentThread().getName());});System.out.println("main 方法结束");}}/*通过接收消息,对线程名字的输入,channel.basicConsume会异步开启一个线程池,去监听RabbitMQ-server指定的队列,根据
发生的事件,指定调用哪个回调方法。
而且在这个生产者和消费者之间,消息被消费者接收之后,消息现在的状态是Unacked(没有被确定消费了的),这个时候关掉
consumer,再次启动就会发现还会继续消费一次。这与调用basicConsume方法传递的参数有关,也就是方法重载,不同方法不同效
果。
*/
4.2、Work queues(工作模式)
Work queues(工作模式)是,多个worker处理一个对应队列的消息,可以通过设置prefetch设置每个worker处理的消息数量来控制。默认情况下是轮询调度,不管消费者的处理能力,平均分配消息,如果有个worker处理能力快,消息处理完了,也不会处理多的消息(因为消息事先已经分配好了)。
设置的prefetch对应消费者的消费能力,加入总共3条消息,a消费者设置3,b消费者设置1,这是a消费者虽然处理消息能力慢,但是还是会获取2条消息。
- 消息持久化(队列的持久化,消息持久化)
- 消息持久化的可靠性(publisher confirm),确定消息发布成功
- 使用消息确认和预取计数,您可以设置工作队列,来确保能顺序消费。
生产者生产消息
publicclassTask{privatefinalstaticString QUEUE_NAME ="worker";publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.queueDeclare(QUEUE_NAME,true,false,false,null);Scanner in =newScanner(System.in);while(in.hasNext()){System.out.println(in.next());for(int i =1; i <11; i++){
channel.basicPublish("",QUEUE_NAME,null,Integer.toString(i).getBytes());}}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}}
worker1消费者
publicclassWorker1{privatefinalstaticString QUEUE_NAME ="worker";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);DeliverCallback deliverCallback =(consumerTag,message)->{String m =newString(message.getBody());try{Thread.sleep(Integer.parseInt(m)*20000);System.out.println("消息:"+ m +",线程===> "+Thread.currentThread().getName());//手动确认消息使用的通道必须和监听队列的通道是一致的,不然会报错并且关闭通道
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch(InterruptedException e){thrownewRuntimeException(e);}};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,(consumerTag)->{System.out.println("取消回调");});}}
worker2消费者
publicclassWorker2{privatefinalstaticString QUEUE_NAME ="worker";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicQos(1);DeliverCallback deliverCallback =(consumerTag,message)->{String m =newString(message.getBody());try{Thread.sleep(Integer.parseInt(m)*10000);System.out.println("消息:"+ m +",线程===> "+Thread.currentThread().getName());
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch(InterruptedException e){thrownewRuntimeException(e);}};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,(consumerTag)->{System.out.println("取消回调");});}}
4.3、Publish/Subscribe(发布订阅模式)
客户端不直接将消息发送至队列,而是发送到交换机,交换机通过和队列的绑定关系,再将消息发送到队列。发布订阅模式要声明的交换机是fanout模式,在将创建的队列和交换机绑定关系到对应的路由key(
空串就可以了
),发送消息至交换机绑定的路由key,就可以实现,一次发送,多次接收。
排他队列不能在连接中删除
Publish生产者
publicclassPublisher{privatestaticfinalString EXCHANGE_NAME ="fanout_";publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner in =newScanner(System.in);while(in.hasNext()){String str = in.nextLine();
channel.basicPublish(EXCHANGE_NAME,"",null,str.getBytes());//这里不声明队列关系,通过消费者声明队列并绑定关系}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}}
Subscribe订阅者
publicclassSubscribe1{privatestaticfinalString EXCHANGE_NAME ="fanout_";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String queue = channel.queueDeclare().getQueue();//声明临时队列,不持久,排他,自动删除
channel.queueBind(queue,EXCHANGE_NAME,"");DeliverCallback deliverCallback =((consumerTag, message)->{System.out.println(newString(message.getBody())+", 线程====>"+Thread.currentThread().getName());
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);});
channel.basicConsume(queue,false,deliverCallback,(consumerTag)->{System.out.println("回调");});}}publicclassSubscribe2{privatestaticfinalString EXCHANGE_NAME ="fanout_";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String queue = channel.queueDeclare().getQueue();//声明临时队列,不持久,排他,自动删除
channel.queueBind(queue,EXCHANGE_NAME,"");DeliverCallback deliverCallback =((consumerTag, message)->{System.out.println(newString(message.getBody())+", 线程====>"+Thread.currentThread().getName());});
channel.basicConsume(queue,false,deliverCallback,(consumerTag)->{System.out.println("回调");});}}
4.4、Routing(路由模式)
路由key模式是对发布订阅模式的补充,如果队列绑定的key对应的都相同,那往这个交换机绑定的队列发送消息就和发布订阅模式没有什么不同,direct模式可以声明多种路由key和队列之间的关系。
生产者
publicclassDirect{privatefinalstaticString EXCHANGE_NAME ="direct_";publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME,"direct");Scanner in =newScanner(System.in);while(in.hasNext()){String str = in.nextLine();if(str.contains("error")){
channel.basicPublish(EXCHANGE_NAME,"error",null,str.getBytes());//控制台输入包含error,发送到error路由键}else{
channel.basicPublish(EXCHANGE_NAME,"log",null,str.getBytes());}}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}}
消费者
publicclassDirectReceive{privatefinalstaticString EXCHANGE_NAME ="direct_";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("log",true,false,false,null);
channel.queueDeclare("error",true,false,false,null);
channel.queueBind("log",EXCHANGE_NAME,"log");
channel.queueBind("error",EXCHANGE_NAME,"error");DeliverCallback deliverCallback =(consumerTag,delivery)->{String str =newString(delivery.getBody());if(str.contains("error")){System.out.println(str);System.exit(0);//error队列接收到消息退出}else{System.out.println(str +" ===== "+Thread.currentThread().getName());}};newThread(()->{try{Connection connection1 = factory.newConnection();Channel channel1 = connection1.createChannel();
channel1.basicConsume("log",true,deliverCallback,(c)->{});}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}).start();newThread(()->{try{Connection connection2 = factory.newConnection();Channel channel2 = connection2.createChannel();
channel2.basicConsume("error",true,deliverCallback,(c)->{});}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}).start();}}
4.5、Topics(Topic主题模式)
虽然路由键kern.critical和topic_4和交换机的绑定都对应上了,但是这个队列只能接收1条消息。
生产者
publicclassTopic{privatestaticfinalString EXCHANGE_NAME ="topic_";publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
channel.queueDeclare("topic_1",true,false,false,null);
channel.queueDeclare("topic_2",true,false,false,null);
channel.queueDeclare("topic_3",true,false,false,null);
channel.queueDeclare("topic_4",true,false,false,null);
channel.queueBind("topic_1",EXCHANGE_NAME,"#");
channel.queueBind("topic_2",EXCHANGE_NAME,"kern.*");
channel.queueBind("topic_3",EXCHANGE_NAME,"*.critical");
channel.queueBind("topic_4",EXCHANGE_NAME,"kern.*");
channel.queueBind("topic_4",EXCHANGE_NAME,"*.critical");Scanner in =newScanner(System.in);while(in.hasNext()){String str = in.nextLine();
channel.basicPublish(EXCHANGE_NAME,"kern.critical",null,str.getBytes());}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}}}
消费者
publicclassTopicReceive{privatestaticfinalString EXCHANGE_NAME ="topic_";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
channel.queueDeclare("topic_1",true,false,false,null);
channel.queueDeclare("topic_2",true,false,false,null);
channel.queueDeclare("topic_3",true,false,false,null);
channel.queueDeclare("topic_4",true,false,false,null);DeliverCallback deliverCallback =((consumerTag, message)->{String msg =newString(message.getBody());System.out.println("消息:"+msg +" . Envelope:"+ message.getEnvelope());System.out.println(Thread.currentThread().getName());});
channel.basicConsume("topic_1",true,deliverCallback,(consumerTag)->{});
channel.basicConsume("topic_2",true,deliverCallback,(consumerTag)->{});
channel.basicConsume("topic_3",true,deliverCallback,(consumerTag)->{});
channel.basicConsume("topic_4",true,deliverCallback,(consumerTag)->{});}}
4.6、Publish Confirms(发布确认模式)
发布者确认模式是RabbitMQ为了实现可靠发布的扩展,在通道上启用发布者确认时,客户端发布的消息将由代理异步确认。
单条消息确认
publicclassPublishConfirmSync{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.confirmSelect();
channel.queueDeclare("queue",true,false,false,null);
channel.queuePurge("queue");String queue ="queue";System.out.println(LocalDateTime.now());for(int i =0; i <200; i++){
channel.basicPublish("",queue,null,(""+i).getBytes());
channel.waitForConfirmsOrDie();}System.out.println(LocalDateTime.now());}}}//发送消息非常慢,200条消息都需要15s
批量消息确认
publicclassPublishConfirmSync{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.confirmSelect();
channel.queueDeclare("queue",true,false,false,null);
channel.queuePurge("queue");int batchSize =100;int count =0;System.out.println(LocalDateTime.now());for(int i =0; i <500; i++){
channel.basicPublish("","queue",null,(""+i).getBytes());
count++;if(count == batchSize){
channel.waitForConfirmsOrDie(5000);//批量确认
count =0;}}if(count >0){
channel.waitForConfirmsOrDie(5000);}System.out.println(LocalDateTime.now());}}}//无法确定哪次发送消息出现问题
异步确定消息
publicclassPublishConfirmASync{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){String queue = UUID.randomUUID().toString();
channel.queueDeclare(queue,false,false,true,null);
channel.confirmSelect();//开启发布确认模式ConcurrentNavigableMap<Long,String> outstandingConfirms =newConcurrentSkipListMap<>();//处理单次确认或者多次确认,类似basicAck手动确认,false确认一条消息,true确认这个之前的所有消息ConfirmCallback cleanOutstandingConfirms =(sequenceNumber, multiple)->{System.out.println(sequenceNumber +"======="+ multiple);//为什么成功会有false,false只表示确认一条消息,true表示确认这个序列之前的所有消息if(multiple){//headMap(key,boolean),key表示一个标志位,true表示是否包含这个标志位ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(sequenceNumber,true);
confirmed.clear();}else{
outstandingConfirms.remove(sequenceNumber);}};
channel.addConfirmListener(cleanOutstandingConfirms,(sequenceNumber, multiple)->{System.out.println(sequenceNumber +"======="+ multiple);String body = outstandingConfirms.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);});long start =System.nanoTime();for(int i =0; i <50000; i++){String body =String.valueOf(i);
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);//getNextPublishSeqNo()发布确认模式下的下一条消息的序列号,其他模式下一直为0
channel.basicPublish("","worker",null, body.getBytes());}if(!waitUntil(Duration.ofSeconds(60),()-> outstandingConfirms.isEmpty())){thrownewIllegalStateException("All messages could not be confirmed in 60 seconds");}long end =System.nanoTime();System.out.println(Duration.ofNanos(end - start).toMillis());}}staticbooleanwaitUntil(Duration timeout,BooleanSupplier condition)throwsInterruptedException{int waited =0;while(!condition.getAsBoolean()&& waited < timeout.toMillis()){Thread.sleep(100L);
waited =+100;}return condition.getAsBoolean();}}
5.实现RPC(Remote Procedure Call)调用
客户端
publicclassRpcClient{publicstaticvoidmain(String[] args){ConnectionFactory factory =FactoryUtil.newFactory();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
channel.queueDeclare("rpc_queue",true,false,false,null);Scanner in =newScanner(System.in);while(in.hasNext()){System.out.println("远程调用结果:"+call(channel, in.next())+"============"+Thread.currentThread().getName());}}catch(IOException e){thrownewRuntimeException(e);}catch(TimeoutException e){thrownewRuntimeException(e);}catch(ExecutionException e){thrownewRuntimeException(e);}catch(InterruptedException e){thrownewRuntimeException(e);}}publicstaticStringcall(Channel channel,String message)throwsIOException,ExecutionException,InterruptedException{String callbackQueueName = channel.queueDeclare().getQueue();AMQP.BasicProperties props =newAMQP.BasicProperties().builder().replyTo(callbackQueueName).correlationId(UUID.randomUUID().toString()).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());finalCompletableFuture<String> response =newCompletableFuture<>();String ctag = channel.basicConsume(callbackQueueName,true,(consumerTag, delivery)->{if(delivery.getProperties().getCorrelationId().equals(props.getCorrelationId())){
response.complete(newString(delivery.getBody(),"UTF-8"));}},consumerTag ->{});String result = response.get();System.out.println("ctag=========>"+ctag);
channel.basicCancel(ctag);return result;}}
远程调用端
publicclassRpcServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =FactoryUtil.newFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("rpc_queue",true,false,false,null);
channel.queuePurge("rpc_queue");//清除队列内容
channel.basicQos(1);System.out.println("等待RPC远程调用");DeliverCallback deliverCallback =(consumerTag, delivery)->{AMQP.BasicProperties props =newAMQP.BasicProperties().builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response ="";try{String message =newString(delivery.getBody(),"UTF-8");System.out.println(message);int i =Integer.parseInt(message);
response +=fib(i);System.out.println(response);}catch(RuntimeException e){System.out.println(e);}finally{
channel.basicPublish("",delivery.getProperties().getReplyTo(),props,response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};
channel.basicConsume("rpc_queue",false,deliverCallback,consumerTag ->{});}privatestaticintfib(int n){if(n ==0)return0;if(n ==1)return1;returnfib(n -1)+fib(n -2);}}
6.RabbitMQ手动确认消息的三种方式
6.1、basicAck
voidbasicAck(long deliveryTag,boolean multiple)throwsIOException;//deliverTag表示消息的标志,监听获取的消息里面会有整个属性,如果multiple=false确认消息,只会确认一条消息//multiple是true表示批量确认消息,可以自己传递一个固定的值测
情况1:channel.basicQos(10),channel.basciAck(10,true)
队列中有足够的消息被消费,消费者能读取到10条(有时候会是20条),确认10条消息,但是消费之后就会造成阻塞(channel被关闭)。
队列中没有10条消息,消费者能读到所有消息,确认0条消息,也是阻塞,稍后队列中有足够的消息也不能被消费。
队列中0条消息,消费者等待接收消息,如果能接收到消息大于10,则读取10条确认10条。没有10就不会确认。回到类似情况2。
通过可视化界面发现,channel是关闭的,connection没有关闭。
情况2:channel.basicQos(9),channel.basicAck(10,true)
队列中有足够的数据,消费者能读取9条数据,确认0条消息。阻塞断开channel连接。
队列中没有9条数据,消费者能读到所有信息,确认0条消息。
队列中0条数据,消费者开始没有数据,只能接收一批消息,后面断开连接。
情况3:channel.basicQos(11),channel.basic(10,true),
队列中有足够的数据,消费者能读取至少11条数据,确认10条消息。阻塞断开channel连接。(队列中消息低于deliver的2倍,能读取19条)
队列中没有11条数据,消费者能读到所有消息,确认0条消息。
队列中0条数据,一批消息数量大,则读取11,确认10,小于qos的2倍()19,能读取全部数据
true的使用类似生产者publish confrim模式下的异步确认,为true表示批量确认消息发布成功。总之在使用固定值或者消息中的属性值来手动确定消息,如果deliveryTag不符合,则断开channel连接。
6.2 basicNack
voidbasicNack(long deliveryTag,boolean multiple,boolean requeue)//deliveryTag通道获取消息的标记//multiple true表示拒绝所有消息 false仅代表拒绝当前消息//requeue true表示重新排队 false表示发送到死信队列或者丢弃(被监听的队列需要绑定死信交换机)Map<String,Object> args =newHashMap<String,Object>();
args.put("x-dead-letter-exchange","some.exchange.name");//死信交换机
args.put("x-dead-letter-routing-key","some-routing-key");//死信交换机路由的key
channel.queueDeclare("myqueue",false,false,false, args);
使用multiple为true的情况类似ack。
6.3basicReject
voidbasicReject(long deliveryTag,boolean requeue)throwsIOException;//deliveryTag通道获取消息的标记//requeue true表示重新排队 false表示发送到死信队列或者丢弃(被监听的队列需要绑定死信交换机)
使用情况和上面requeue一致。
版权归原作者 weixin_47748878 所有, 如有侵权,请联系我们删除。