0


RabbitMQ详解与Java实现

一、RabbitMQ介绍

1.1 现存问题
  • 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用SpringBoot提供的@Async注解实现异步调用,但是这种方式无法确保请求一定回访问到服务B的接口。那如何保证服务A的请求信息一定能送达到服务B去完成一些业务操作呢?| 如何实现异步调用1642517531404.png
  • 海量请求:在我们在做一些秒杀业务时,可能会在某个时间点突然出现大量的并发请求,这可能已经远远超过服务器的并发瓶颈,这时我们需要做一些削峰的操作,也就是将大量的请求缓冲到一个队列中,然后慢慢的消费掉。如何提供一个可以存储千万级别请求的队列呢?1642517747632.png
  • 在微服务架构下,可能一个业务会出现同时调用多个其他服务的场景,而且这些服务之间一般会用到Feign的方式进行轻量级的通讯,如果存在一个业务,用户创建订单成功后,还需要去给用户添加积分、通知商家、通知物流系统、扣减商品库存,而在执行这个操作时,如果任意一个服务出现了问题,都会导致整体的下单业务失败,并且会导致给用户反馈的时间延长。这时就造成了服务之间存在一个较高的耦合性的问题。如何可以降低服务之间的耦合性呢?1642517948196.png
1.2 处理问题

RabbitMQ就可以解决上述的全部问题

  • 服务之间如何想实现可靠的异步调用,可以通过RabbitMQ的方式实现,服务A只需要保证可以把消息发送到RabbitMQ的队列中,服务B就一定会消费到队列中的消息只不过会存在一定的延时。| 异步访问1642518013295.png
  • 忽然的海量请求可以存储在RabbitMQ的队列中,然后由消费者慢慢消费掉,RabbitMQ的队列本身就可以存储上千万条消息 1642518109219.png
  • 在调用其他服务时,如果允许延迟效果的出现,可以将消息发送到RabbitMQ中,再由消费者慢慢消费| 服务解耦1642518233825.png
1.3 RabbitMQ介绍

百度百科:

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

首先RabbitMQ基于AMQP协议开发,所以很多基于AMQP协议的功能RabbitMQ都是支持的,比如SpringCloud中的消息总线bus

其次RabbitMQ是基于Erlang编写,这是也是RabbitMQ天生的优势,Erlang被称为面向并发编程的语言,并发能力极强,在众多的MQ中,RabbitMQ的延迟特别低,在微秒级别,所以一般的业务处理RabbitMQ比Kafka和RocketMQ更有优势。

最后RabbitMQ提供自带了图形化界面,操作方便,还自带了多种集群模式,可以保证RabbitMQ的高可用,并且SpringBoot默认就整合RabbitMQ,使用简单方便。

二、RabbitMQ安装


2.1 安装RabbitMQ

这里推荐搭建采用Docker的方式在Linux中安装RabbitMQ,如果对Docker不了解,推荐去学习一下Docker的应用,不然学习其他的知识时,安装的成本都特别高,这里我们就采用Docker的方式安装RabbitMQ。

直接使用docker-compose.yml文件即可安装RabbitMQ服务

version:'3.1'services:rabbitmq:restart: always
    image: daocloud.io/library/rabbitmq:3.8.8
    volumes:- ./data/:/var/lib/rabbitmq/
      - ./log/:/var/log/rabbitmq/log/
    ports:- 15672:15672- 5672:5672

执行

docker-compose up -d

运行

测试效果:

curl localhost:5672

查看效果image20220121005749217.png

2.2 开启图形化界面

默认情况下,当前镜像的图形化界面默认没有开启,需要进入到容器内部开启图形化管理界面
启动图形化界面插件image20220121005619975.pngimage20220121005624253.png
通过浏览器访问15672,查看图形化界面
查看登录页面image20220121005852818.png
默认用户和密码均为:guest,查看首页
查看首页image20220121005930123.png

三、RabbitMQ构架

RabbitMQ的架构可以查看官方地址:https://rabbitmq.com/tutorials/amqp-concepts.html
官方简单架构image20220121010054992.png
可以看出RabbitMQ中主要分为三个角色:

  • Publisher:消息的发布者,将消息发布到RabbitMQ中的Exchange
  • RabbitMQ服务:Exchange接收Publisher的消息,并且根据Routes策略将消息转发到Queue中
  • Consumer:消息的消费者,监听Queue中的消息并进行消费

官方提供的架构图相对简洁,我们可以自己画一份相对完整一些的架构图:
RabbitMQ架构图image20220121011000157.png
可以看出Publisher和Consumer都是单独和RabbitMQ服务中某一个Virtual Host建立Connection的客户端

后续通过Connection可以构建Channel通道,用来发布、接收消息

一个Virtual Host中可以有多个Exchange和Queue,Exchange可以同时绑定多个Queue

在基于架构图查看图形化界面,会更加清晰
图形化界面信息image20220121011418076.png

四、RabbitMQ通讯方式

RabbitMQ提供了很多中通讯方式,依然可以去官方查看:https://rabbitmq.com/getstarted.html
七种通讯方式image20220121011637076.png

4.1 RabbitMQ提供的通讯方式
  • Hello World!:为了入门操作!
  • Work queues:一个队列被多个消费者消费
  • Publish/Subscribe:手动创建Exchange(FANOUT)
  • Routing:手动创建Exchange(DIRECT)
  • Topics:手动创建Exchange(TOPIC)
  • RPC:RPC方式
  • Publisher Confirms:保证消息可靠性
4.2 构建Connection工具类
  • 导入依赖:amqp-client,junit<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>
  • 构建工具类:packagecom.mashibing.util;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/** * @author zjw * @description */publicclassRabbitMQConnectionUtil{publicstaticfinalStringRABBITMQ_HOST="192.168.11.32";publicstaticfinalintRABBITMQ_PORT=5672;publicstaticfinalStringRABBITMQ_USERNAME="guest";publicstaticfinalStringRABBITMQ_PASSWORD="guest";publicstaticfinalStringRABBITMQ_VIRTUAL_HOST="/";/** * 构建RabbitMQ的连接对象 * @return */publicstaticConnectiongetConnection()throwsException{//1. 创建Connection工厂ConnectionFactory factory =newConnectionFactory();//2. 设置RabbitMQ的连接信息 factory.setHost(RABBITMQ_HOST); factory.setPort(RABBITMQ_PORT); factory.setUsername(RABBITMQ_USERNAME); factory.setPassword(RABBITMQ_PASSWORD); factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//3. 返回连接对象Connection connection = factory.newConnection();return connection;}}
4.3 Hello World

通讯方式image.png
生产者:

packagecom.mashibing.helloworld;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;/**
 * @author zjw
 * @description
 * @date 2022/1/24 22:54
 */publicclassPublisher{publicstaticfinalStringQUEUE_NAME="hello";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message ="Hello World!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!");}}

消费者:

packagecom.mashibing.helloworld;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.*;importorg.junit.Test;importjava.io.IOException;/**
 * @author zjw
 * @description
 * @date 2022/1/24 23:02
 */publicclassConsumer{@Testpublicvoidconsume()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者获取到消息:"+newString(body,"UTF-8"));}};
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听队列");System.in.read();}}
4.4 Work Queues

WorkQueues需要学习的内容image.png

  • 生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。
  • 消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息packagecom.mashibing.workqueues;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.*;importorg.junit.Test;importjava.io.IOException;/** * @author zjw * @description * @date 2022/1/25 19:52 */publicclassConsumer{@Testpublicvoidconsume1()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列 channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.5 设置消息的流控 channel.basicQos(3);//4. 监听消息DefaultConsumer callback =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(100);}catch(InterruptedException e){ e.printStackTrace();}System.out.println("消费者1号-获取到消息:"+newString(body,"UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false);}}; channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();}@Testpublicvoidconsume2()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列 channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null); channel.basicQos(3);//4. 监听消息DefaultConsumer callback =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(1000);}catch(InterruptedException e){ e.printStackTrace();}System.out.println("消费者2号-获取到消息:"+newString(body,"UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false);}}; channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();}}
4.5 Publish/Subscribe

自定义一个交换机image.png
生产者:自行构建Exchange并绑定指定队列(FANOUT类型)

packagecom.mashibing.pubsub;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:08
 */publicclassPublisher{publicstaticfinalStringEXCHANGE_NAME="pubsub";publicstaticfinalStringQUEUE_NAME1="pubsub-one";publicstaticfinalStringQUEUE_NAME2="pubsub-two";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);//4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送!");}}
4.6 Routing

DIRECT类型Exchangeimage.png
生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue

packagecom.mashibing.routing;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:20
 */publicclassPublisher{publicstaticfinalStringEXCHANGE_NAME="routing";publicstaticfinalStringQUEUE_NAME1="routing-one";publicstaticfinalStringQUEUE_NAME2="routing-two";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);//4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送!");}}
4.7 Topic

Topic模式image.png
生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式

packagecom.mashibing.topics;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:28
 */publicclassPublisher{publicstaticfinalStringEXCHANGE_NAME="topic";publicstaticfinalStringQUEUE_NAME1="topic-one";publicstaticfinalStringQUEUE_NAME2="topic-two";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,// TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey// 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());System.out.println("消息成功发送!");}}
4.8 RPC(了解)

因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作

需要让Client发送消息时,携带两个属性:

  • replyTo告知Server将相应信息放到哪个队列
  • correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息

RPC方式image.png
客户端:

packagecom.mashibing.rpc;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.*;importorg.junit.Test;importjava.io.IOException;importjava.util.UUID;/**
 * @author zjw
 * @description
 * @date 2022/2/8 20:03
 */publicclassPublisher{publicstaticfinalStringQUEUE_PUBLISHER="rpc_publisher";publicstaticfinalStringQUEUE_CONSUMER="rpc_consumer";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message ="Hello RPC!";String uuid =UUID.randomUUID().toString();AMQP.BasicProperties props =newAMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();
        channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());

        channel.basicConsume(QUEUE_CONSUMER,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String id = properties.getCorrelationId();if(id !=null&& id.equalsIgnoreCase(uuid)){System.out.println("接收到服务端的响应:"+newString(body,"UTF-8"));}
                channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功!");System.in.read();}}

服务端:

packagecom.mashibing.rpc;importcom.mashibing.helloworld.Publisher;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.*;importorg.junit.Test;importjava.io.IOException;/**
 * @author zjw
 * @description
 * @date 2022/1/24 23:02
 */publicclassConsumer{publicstaticfinalStringQUEUE_PUBLISHER="rpc_publisher";publicstaticfinalStringQUEUE_CONSUMER="rpc_consumer";@Testpublicvoidconsume()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者获取到消息:"+newString(body,"UTF-8"));String resp ="获取到了client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties props =newAMQP.BasicProperties().builder().correlationId(uuid).build();
                channel.basicPublish("",respQueueName,props,resp.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);}};
        channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听队列");System.in.read();}}

五、SpringBoot操作RabbitMQ

5.1 SpringBoot声明信息
  • 创建项目
  • 导入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 配置RabbitMQ信息spring:rabbitmq:host: 192.168.11.32 port:5672username: guest password: guest virtual-host: /
  • 声明交换机&队列packagecom.mashibing.rabbitmqboot.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * @author zjw * @description * @date 2022/2/8 20:25 */@ConfigurationpublicclassRabbitMQConfig{publicstaticfinalStringEXCHANGE="boot-exchange";publicstaticfinalStringQUEUE="boot-queue";publicstaticfinalStringROUTING_KEY="*.black.*";@BeanpublicExchangebootExchange(){// channel.DeclareExchangereturnExchangeBuilder.topicExchange(EXCHANGE).build();}@BeanpublicQueuebootQueue(){returnQueueBuilder.durable(QUEUE).build();}@BeanpublicBindingbootBinding(Exchange bootExchange,Queue bootQueue){returnBindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();}}
5.2 生产者操作
packagecom.mashibing.rabbitmqboot;importcom.mashibing.rabbitmqboot.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
 * @author zjw
 * @description
 * @date 2022/2/8 21:05
 */@SpringBootTestpublicclassPublisherTest{@AutowiredpublicRabbitTemplate rabbitTemplate;@Testpublicvoidpublish(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@TestpublicvoidpublishWithProps(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","messageWithProps",newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
                message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功");}}
 

#### 5.3 消费者操作

```java
packagecom.mashibing.rabbitmqboot;importcom.mashibing.rabbitmqboot.config.RabbitMQConfig;importcom.rabbitmq.client.Channel;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;importjava.io.IOException;/**
 * @author zjw
 * @description
 * @date 2022/2/8 21:11
 */@ComponentpublicclassConsumeListener{@RabbitListener(queues =RabbitMQConfig.QUEUE)publicvoidconsume(String msg,Channel channel,Message message)throwsIOException{System.out.println("队列的消息为:"+ msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("唯一标识为:"+ correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}

六、RabbitMQ保证消息可靠性

6.1 保证消息一定送达到Exchange

Confirm机制

可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果

//4. 开启confirms
channel.confirmSelect();//5. 设置confirms的异步回调
channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{System.out.println("消息成功的发送到Exchange!");}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");}});
6.2 保证消息可以路由到Queue

Return机制

为了保证Exchange上的消息一定可以送达到Queue

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(newReturnListener(){@OverridepublicvoidhandleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");}});//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
6.3 保证Queue可以持久化消息

DeliveryMode设置消息持久化

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

//7. 设置消息持久化AMQP.BasicProperties props =newAMQP.BasicProperties().builder().deliveryMode(2).build();//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());
6.4 保证消费者可以正常消费消息
详情看WorkQueue模式
6.5 SpringBoot实现上述操作
6.5.1 Confirm
  • 编写配置文件开启Confirm机制spring:rabbitmq:publisher-confirm-type: correlated # 新版本publisher-confirms:true# 老版本
  • 在发送消息时,配置RabbitTemplate@TestpublicvoidpublishWithConfirms()throwsIOException{ rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){System.out.println("消息已经送达到交换机!!");}else{System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");}}}); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}
6.5.2 Return
  • 编写配置文件开启Return机制spring:rabbitmq:publisher-returns:true# 开启Return机制
  • 在发送消息时,配置RabbitTemplate@TestpublicvoidpublishWithReturn()throwsIOException{// 新版本用 setReturnsCallback ,老版本用setReturnCallback rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){String msg =newString(returned.getMessage().getBody());System.out.println("消息:"+ msg +"路由队列失败!!做补救操作!!");}}); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}
6.5.3 消息持久化
@TestpublicvoidpublishWithBasicProperties()throwsIOException{
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message",newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{// 设置消息的持久化!
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});System.out.println("消息发送成功");}

七、RabbitMQ死信队列&延迟交换机

7.1 什么是死信

死信&死信队列1644476424544.png
死信队列的应用:

  • 基于死信队列在队列消息已满的情况下,消息也不会丢失
  • 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
7.2 实现死信队列
7.2.1 准备Exchange&Queue
packagecom.mashibing.rabbitmqboot.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @author zjw
 * @description
 * @date 2022/2/10 15:04
 */@ConfigurationpublicclassDeadLetterConfig{publicstaticfinalStringNORMAL_EXCHANGE="normal-exchange";publicstaticfinalStringNORMAL_QUEUE="normal-queue";publicstaticfinalStringNORMAL_ROUTING_KEY="normal.#";publicstaticfinalStringDEAD_EXCHANGE="dead-exchange";publicstaticfinalStringDEAD_QUEUE="dead-queue";publicstaticfinalStringDEAD_ROUTING_KEY="dead.#";@BeanpublicExchangenormalExchange(){returnExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();}@BeanpublicBindingnormalBinding(Queue normalQueue,Exchange normalExchange){returnBindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}@BeanpublicExchangedeadExchange(){returnExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}@BeanpublicQueuedeadQueue(){returnQueueBuilder.durable(DEAD_QUEUE).build();}@BeanpublicBindingdeadBinding(Queue deadQueue,Exchange deadExchange){returnBindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}}
7.2.2 实现效果
  • 基于消费者进行reject或者nack实现死信效果packagecom.mashibing.rabbitmqboot;importcom.mashibing.rabbitmqboot.config.DeadLetterConfig;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;/** * @author zjw * @description * @date 2022/2/10 15:17 */@ComponentpublicclassDeadListener{@RabbitListener(queues =DeadLetterConfig.NORMAL_QUEUE)publicvoidconsume(String msg,Channel channel,Message message)throwsIOException{System.out.println("接收到normal队列的消息:"+ msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}
  • 消息的生存时间- 给消息设置生存时间@TestpublicvoidpublishExpire(){String msg ="dead letter expire"; rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc", msg,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{ message.getMessageProperties().setExpiration("5000");return message;}});}- 给队列设置消息的生存时间@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build();}
  • 设置Queue中的消息最大长度@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build();}只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!
7.3 延迟交换机

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

  • 构建延迟交换机packagecom.mashibing.rabbitmqboot.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/** * @author zjw * @description */@ConfigurationpublicclassDelayedConfig{publicstaticfinalStringDELAYED_EXCHANGE="delayed-exchange";publicstaticfinalStringDELAYED_QUEUE="delayed-queue";publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.#";@BeanpublicExchangedelayedExchange(){Map<String,Object> arguments =newHashMap<>(); arguments.put("x-delayed-type","topic");Exchange exchange =newCustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@BeanpublicQueuedelayedQueue(){returnQueueBuilder.durable(DELAYED_QUEUE).build();}@BeanpublicBindingdelayedBinding(Queue delayedQueue,Exchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}
  • 发送消息packagecom.mashibing.rabbitmqboot;importcom.mashibing.rabbitmqboot.config.DelayedConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/** * @author zjw * @description */@SpringBootTestpublicclassDelayedPublisherTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@Testpublicvoidpublish(){ rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE,"delayed.abc","xxxx",newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{ message.getMessageProperties().setDelay(30000);return message;}});}}

八、RabbitMQ的集群

RabbitMQ的镜像模式
RabbitMQ的集群1644926959251.png
高可用

提升RabbitMQ的效率

搭建RabbitMQ集群

  • 准备两台虚拟机(克隆)
  • 准备RabbitMQ的yml文件rabbitmq1:version:'3.1'services:rabbitmq1:image: rabbitmq:3.8.5-management-alpine container_name: rabbitmq1 hostname: rabbitmq1 extra_hosts:-"rabbitmq1:192.168.11.32"-"rabbitmq2:192.168.11.33"environment:- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS ports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672rabbitmq2:version:'3.1'services:rabbitmq2:image: rabbitmq:3.8.5-management-alpine container_name: rabbitmq2 hostname: rabbitmq2 extra_hosts:-"rabbitmq1:192.168.11.32"-"rabbitmq2:192.168.11.33"environment:- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS ports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672准备完毕之后,启动两台RabbitMQ启动效果1644924815935.png
  • 让RabbitMQ服务实现join操作需要四个命令完成join操作让rabbitmq2 join rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令rabbitmqctl stop_apprabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbitmq1rabbitmqctl start_app执行成功后:执行成功后1644925359203.png
  • 设置镜像模式在指定的RabbitMQ服务中设置好镜像策略即可镜像模式1644925812667.png

九、RabbitMQ其他内容

9.1 Headers类型Exchange

headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则

相比Topic形式,可以采用的类型更丰富。
headers绑定方式1645705080465.png
具体实现方式

packagecom.mashibing.headers;importcom.mashibing.util.RabbitMQConnectionUtil;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importjava.util.HashMap;importjava.util.Map;/**
 * @author zjw
 * @description
 */publicclassPublisher{publicstaticfinalStringHEADER_EXCHANGE="header_exchange";publicstaticfinalStringHEADER_QUEUE="header_queue";@Testpublicvoidpublish()throwsException{//1. 获取连接对象Connection connection =RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机和队列并基于header的方式绑定
        channel.exchangeDeclare(HEADER_EXCHANGE,BuiltinExchangeType.HEADERS);
        channel.queueDeclare(HEADER_QUEUE,true,false,false,null);Map<String,Object> args =newHashMap<>();// 多个header的key-value只要可以匹配上一个就可以// args.put("x-match","any");// 多个header的key-value要求全部匹配上!
        args.put("x-match","all");
        args.put("name","jack");
        args.put("age","23");
        channel.queueBind(HEADER_QUEUE,HEADER_EXCHANGE,"",args);//4. 发送消息String msg ="header测试消息!";Map<String,Object> headers =newHashMap<>();
        headers.put("name","jac");
        headers.put("age","2");AMQP.BasicProperties props =newAMQP.BasicProperties().builder().headers(headers).build();

        channel.basicPublish(HEADER_EXCHANGE,"",props,msg.getBytes());System.out.println("发送消息成功,header = "+ headers);}}

本文转载自: https://blog.csdn.net/qq_45110186/article/details/135659285
版权归原作者 IT 刘工 所有, 如有侵权,请联系我们删除。

“RabbitMQ详解与Java实现”的评论:

还没有评论