0


RabbitMQ安装配置与简单使用

文章目录


前言

本文为学习RabbitMQ后的学习总结记录,大致包含包含以下部分:

  1. RabbitMQ的安装配置
  2. RabbitMQ中的消息模型
  3. RabbitMQ其他重要知识
  4. RabbitMQ的基本使用

一、RabbitMQ的安装配置

1. 下载镜像

这里选择在Centos7虚拟机中使用Docker来安装。

进行在线拉取:

docker pull rabbitmq:3-management

2. 单机部署MQ

为了防止自己以后看不懂,这里稍微解释一下参数:

  • [-e] : 指定环境变量。这里主要设置用户名和密码,用来登录RabbitMQ的浏览器页面
  • [–name] : 指定启动容器(MQ镜像运行的进程)的名称。
  • [–hostname] : 指定集群名称。(这里使用单机部署)
  • [-p] : 指定宿主机与docker端口暴露。(5672为RabbitMQ的通信端口,15672为RabbitMQ的浏览器控制台端口)
  • [-d] : 后台执行。
  • 最后为指定运行的镜像名称及其版本。
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

3. 集群部署MQ

xxx(挖个坑,下次来做)


二、RabbitMQ中的消息模型

RabbitMQ共有七种消息模型,这里只介绍最常使用的五种消息模型。首先这五种消息模型可以大致分为使用交换机(Exchage)与未使用交换机。

1. 未使用交换机。 消息发布者直接把消息发送给消息队列(Queue),再由消息队列将消息分发给对应的消费者。

  • 基本消息队列(BasicQueue)在这里插入图片描述publisher 将消息发给指定的队列,consumer通过监听队列消息,来获取publisher发布的数据。一个队列只有一个消费者。
  • 工作消息队列(WorkQueue)在这里插入图片描述工作队列,在基本消息队列的基础上,增加消费者个数,提高消息的处理速度,避免消息队列中的消息堆积。一个消息只能交给某一个消费者进行处理。这里消费者消费消息默认采取预取措施,即轮流向队列中拿取数据进行消费,不考虑具体的消费速度。举个实际的例子,假如队列中一共有50条数据,由两个消费者进行消费。消费者1的消费速度较消费者2更快,但是最终的效果是两个消费者每个消费25条数据。这里可以设置预取消息的上限:spring.rabbitmq.listener.simple.prefetch= 1

2. 使用交换机。 这里称为发布订阅模式。该种方式能够将同一消息发送给多个消费者。实现方式是加入exchange(交换机)。

常见的exchange类型包括:广播(Fanout)路由(Direct)主题(Topic)

 **注意:** exchange只负责消息路由,不提供存储功能,路由失败则消息丢失。
  • 广播(Fanout Exchange)在这里插入图片描述Fanout Exchange 会将接收到的消息路由到每一个与其绑定的queue。
  • 路由(Direct Exchange)在这里插入图片描述Direct Exchange 会将接收到的消息根据规则路由到指定的Queue。这里简述此路由规则:Queue 与 Exchange 进行绑定时,会设置一个 bindingkey ,在publisher发布消息时,会为消息设置一个routeKey,Exchange将消息路由到bindingkeyrouteKey一致的队列。
  • 主题(Topic Exchange)TopicExchange 与 DirectExchange 类似,区别在于TopicExchange为多个单词的列表,用 . 进行分隔,DirectExchange 只能指定一个单词。同时Queue与Exchange 通过BindingKey绑定时,可以使用通配符。通配符有两种类型: a) #:代指0个或多个单词。如:China.# b) *:代指一个单词。如:※.news在这里插入图片描述

三、RabbitMQ其他重要知识

  1. AMQP:全称 Advanced Message Queue Protocol,是一个在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关。
  2. Spring AMQP:这是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

四、RabbitMQ的基本使用

  1. 未使用spring-amqp 实现发送数据与消费数据 这里仅使用接收简单队列模型原生的代码编写来引出spring-amqp,下面则全部以spring-amqp为基础,来实现消息的发送与消费。// 1.建立连接ConnectionFactory factory =newConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.220.137"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName ="simple.queue"; channel.queueDeclare(queueName,false,false,false,null);// 4.发送消息String message ="hello, rabbitmq!"; channel.basicPublish("", queueName,null, message.getBytes());System.out.println("发送消息成功:【"+ message +"】");// 5.关闭通道和连接 channel.close(); connection.close();
  2. 导入AMQP依赖。publiser 与 consumer 服务都需要amqp 依赖。<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  3. amqp 相关的环境配置在application.yml中进行相关的配置:``````spring: rabbitmq: host: 192.168.220.137 # rabbitmq的IP 地址 port: 5672 # 端口 username: itcast # 登录用户名 password: 123321 # 登录密码 virtual-host: / # 虚拟主机目录spring-amqp 封装RabbitTemplate 来实现数据的发送与消费。@AutowiredprivateRabbitTemplate rabbitTemplate;
  4. 简单队列模型发送数据与消费数据代码实现:发送数据:``````@TestpublicvoidtestSendMessage2SimpleQueue(){// 指定队列名称String queueName ="simple.queue";// 具体发送的消息String msg ="hello.SpringAmqp"; rabbitTemplate.convertAndSend(queueName,msg);}``````接收数据:``````@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){System.out.println("Consumer 1 consume get message from queue【"+ msg+"】");}
  5. 工作队列模型发送数据与消费数据代码实现:发送数据:``````@TestpublicvoidtestSendMessageWorkQueue()throwsInterruptedException{// 指定队列名称String queueName ="work.queue";String msg ="hello.SpringAmqp";// 在一秒钟发送50 条数据for(int i=1;i<=50;i++){ rabbitTemplate.convertAndSend(queueName,msg + i);Thread.sleep(20);}}``````接收数据:``````@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg)throwsInterruptedException{System.out.println("Consumer 1 consume get message from work.queue:["+ msg+"]"+LocalDate.now());// 模拟不同消费者的不同消费速度Thread.sleep(20);}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{System.err.println("Consumer 2 consume get message from work.queue:【"+ msg+"】"+LocalDate.now());// 模拟不同消费者的不同消费速度Thread.sleep(50);}
  6. Fanout Exchange 模型发送数据与消费数据代码实现:发送数据:``````@TestpublicvoidtestSendFanoutExchange()throwsInterruptedException{String exchangeName ="itcast.fanout";String msg ="hello,every one!";// 此处,设置routeKey 为空 rabbitTemplate.convertAndSend(exchangeName,"",msg);}``````接收数据:``````@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg)throwsInterruptedException{System.err.println("Consumer consume get message from fanout.queue1: ["+ msg+"]");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg)throwsInterruptedException{System.err.println("Consumer consume get message from fanout.queue2: ["+ msg+"]");}
  7. Direct Exchange 模型发送数据与消费数据代码实现:发送数据:``````@TestpublicvoidtestSendDirectExchange()throwsInterruptedException{String exchangeName ="itcast.direct";String msg ="hello,red!";// 设置routeKey 为red rabbitTemplate.convertAndSend(exchangeName,"red",msg);}``````接收数据:``````@RabbitListener( bindings =@QueueBinding(// 指定队列名称 value =@Queue(name ="direct.queue1"),// 指定交换机名称以及类型 exchange =@Exchange(value ="itcast.direct",type =ExchangeTypes.DIRECT),// 指定队列与交换机关联的 bindingKey key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.err.println("Consumer consume get message from direct.queue1: ["+ msg+"]");}@RabbitListener( bindings =@QueueBinding( value =@Queue("direct.queue2"), exchange =@Exchange(value ="itcast.direct",type =ExchangeTypes.DIRECT), key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.err.println("Consumer consume get message from direct.queue2: ["+ msg+"]");}
  8. Topic Exchange 模型发送数据与消费数据代码实现:发送数据:``````@TestpublicvoidtestSendTopicExchange()throwsInterruptedException{String exchangeName ="itcast.topic";String msg ="hello";// 设置多个routeKey,以'.' 分隔String routeKey ="china.news.weather"; rabbitTemplate.convertAndSend(exchangeName,routeKey,msg);}``````接收数据:``````@RabbitListener(bindings =@QueueBinding(// 指定队列名称 value =@Queue("topic.queue1"),// 指定交换机名称以及类型 exchange =@Exchange(value ="itcast.topic",type =ExchangeTypes.TOPIC),// 指定队列与交换机关联的 bindingKey。可使用通配符 '#' 或者 '*' key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.err.println("Consumer consume get message from topic.queue1: ["+ msg+"]");}@RabbitListener(bindings =@QueueBinding( value =@Queue("topic.queue2"), exchange =@Exchange(value ="itcast.topic",type =ExchangeTypes.TOPIC), key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.err.println("Consumer consume get message from topic.queue2: ["+ msg+"]");}
  9. 除了使用注解来定义队列、交换机及其两种进行绑定的方式外,还可以自定义组件来实现该功能。@ConfigurationpublicclassFanoutConfig{@Bean// 声明FanoutExchange publicFanoutExchangefanoutExchange(){returnnewFanoutExchange("itcast.fanout");}@Bean// 声明队列publicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}@Bean// 声明交换机与队列的绑定关系publicBindingfanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingfanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}@BeanpublicQueueobjectQueue(){returnnewQueue("object.queue");}
  10. SpringAMQP-消息转换器 在SpringAMQP的发送数据的方法中,发送数据的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会在底层帮我们先序列化为字节后再发送。Sring中对消息对象是由org.springframework.amqp.suppoer.converter.MessageConverter来处理的。默认实现为SimpleMessageConverter,这是基于JDK的ObjectOutputStream 完成序列化的。该方法序列化后的字节可读性非常差。在这里插入图片描述如果要修改,只需定义一个MessageConverter类型的Bean即可。这里可以使用JSON方式的序列化。导入坐标``````<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>``````定义MessageConverter类型的Bean``````@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}此时,可读性将大大提升:在这里插入图片描述

以上就为本篇文章的全部内容啦!

如果本篇内容对您有帮助的话,请多多点赞支持一下呗!

本文转载自: https://blog.csdn.net/qq_53574266/article/details/129537949
版权归原作者 putaojuzi 所有, 如有侵权,请联系我们删除。

“RabbitMQ安装配置与简单使用”的评论:

还没有评论