0


初步了解 RabbitMQ

一、MQ 概述

1、MQ 的简介

MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大。

2、MQ 的用途

MQ 的用途有很多,但总结起来其实就是以下三点:

(1)限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

比如:用户请求 5000/s,但是只能处理 2000/s 的数据,那么这多出来的 3000 我们就称之为 “超量”

这多出来的超量,可能会导致系统直接被压垮,为了防止系统垮掉,一般会直接将多余的超量丢弃

为了防止这种情况的发生,可以引入 MQ

系统可以直接从 MQ 中读取 2000,剩下的多余的 3000,存储在 MQ 内部,只要 MQ 足够大,那么这些数据就是不会丢失的,系统会慢慢对这些数据进行处理


(2)异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。
而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。


(3)数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。

针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。


二、RabbitMQ 概述

1、RabbitMQ 简介

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。

RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据

2、四大核心概念

生产者:

产生数据发送消息的程序是生产者

交换机:

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列:

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者:

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

3、RabbitMQ 的核心部分

4、名词解释:

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发
消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据


三、Hello World

1、依赖

  1. <!--指定 jdk 编译版本-->
  2. <build>
  3. <plugins>
  4. <plugin>
  5. <groupId>org.apache.maven.plugins</groupId>
  6. <artifactId>maven-compiler-plugin</artifactId>
  7. <configuration>
  8. <source>8</source>
  9. <target>8</target>
  10. </configuration>
  11. </plugin>
  12. </plugins>
  13. </build>
  14. <dependencies>
  15. <!--rabbitmq 依赖客户端-->
  16. <dependency>
  17. <groupId>com.rabbitmq</groupId>
  18. <artifactId>amqp-client</artifactId>
  19. <version>5.8.0</version>
  20. </dependency>
  21. <!--操作文件流的一个依赖-->
  22. <dependency>
  23. <groupId>commons-io</groupId>
  24. <artifactId>commons-io</artifactId>
  25. <version>2.6</version>
  26. </dependency>
  27. </dependencies>

2、消息生产者

  1. /**
  2. * 生产者 : 发消息
  3. */
  4. public class Producer {
  5. // 队列名称
  6. public static final String QUEUE_NAME = "hello";
  7. // 发消息
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 创建一个连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. // 工厂 IP 连接 rabbitmq 队列
  12. factory.setHost("111.229.153.16");
  13. // 用户名
  14. factory.setUsername("admin");
  15. // 密码
  16. factory.setPassword("123");
  17. // 创建连接
  18. Connection connection = factory.newConnection();
  19. // 获取信道
  20. Channel channel = connection.createChannel();
  21. /*
  22. 生成一个队列
  23. 1、队列名称
  24. 2、队列里的消息是否持久化(磁盘)(默认存储在内存中)
  25. 3、是否进行消费的共享
  26. 4、是否自动删除 (最后一个消费者断开连接之后,是否自动删除)
  27. 5、其它参数
  28. */
  29. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  30. // 发消息
  31. String message = "hello world";
  32. /*
  33. 1、发送到哪个交换机
  34. 2、路由的 key 值,本次是队列名称
  35. 3、其它参数信息
  36. 4、发送消息的消息体
  37. */
  38. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  39. System.out.println("消息发送完毕");
  40. }
  41. }

** 3、消息消费者**

  1. /**
  2. * 消费者,接受消息
  3. */
  4. public class Consumer {
  5. // 队列的名词
  6. public static final String QUEUE_NAME = "hello";
  7. // 接收信息
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 创建连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("111.229.153.16");
  12. factory.setUsername("admin");
  13. factory.setPassword("123");
  14. // 创建连接
  15. Connection connection = factory.newConnection();
  16. // 信道
  17. Channel channel = connection.createChannel();
  18. // 声明 接收消息
  19. DeliverCallback deliverCallback = (consumerTag,message) ->{
  20. System.out.println(new String(message.getBody()) );
  21. };
  22. // 取消消息的回调
  23. CancelCallback cancelCallback = consumerTag ->{
  24. System.out.println("消息消费被中断");
  25. };
  26. /*
  27. 消费者接收消息
  28. 1、队列名
  29. 2、消费成功之后,是否要自动答应
  30. 3、消费者成功消费的回调
  31. 4、消费者取消消费的回调
  32. */
  33. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  34. }
  35. }

本文转载自: https://blog.csdn.net/weixin_73616913/article/details/134126175
版权归原作者 馒头警告 所有, 如有侵权,请联系我们删除。

“初步了解 RabbitMQ”的评论:

还没有评论