0


RabbitMQ练习(Publish/Subscribe)

1、RabbitMQ教程

《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》和《RabbitMQ练习(Work Queues)》。

确保RabbitMQ、Sender、Receiver、Receiver2容器正常安装和启动:

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

root@k0test1:~# docker ps -a
CONTAINER ID   IMAGE                      COMMAND                  CREATED              STATUS                     PORTS                                                                                                                                                 NAMES
c9883c363706   rabbitmq:3.13-management   "docker-entrypoint.s…"   About a minute ago   Up About a minute          4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
695b9c05829f   ubtest:22.04               "/usr/sbin/sshd -D"      5 days ago           Exited (255) 9 hours ago                                                                                                                                                         receiver2
17d06c4aca4a   ubtest:22.04               "/usr/sbin/sshd -D"      7 days ago           Exited (255) 9 hours ago                                                                                                                                                         receiver
4a86598c2892   ubtest:22.04               "/usr/sbin/sshd -D"      7 days ago           Exited (255) 9 hours ago                                                                                                                                                         sender
6b2f31995df7   hello-world                "/hello"                 11 days ago          Exited (0) 11 days ago                                                                                                                                                           busy_nightingale
root@k0test1:~# docker start sender
sender
root@k0test1:~# docker start receiver
receiver
root@k0test1:~# docker start receiver2
receiver2
root@k0test1:~# 
root@k0test1:~# docker ps
CONTAINER ID   IMAGE                      COMMAND                  CREATED         STATUS          PORTS                                                                                                                                                 NAMES
c9883c363706   rabbitmq:3.13-management   "docker-entrypoint.s…"   2 minutes ago   Up 2 minutes    4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
695b9c05829f   ubtest:22.04               "/usr/sbin/sshd -D"      5 days ago      Up 25 seconds                                                                                                                                                         receiver2
17d06c4aca4a   ubtest:22.04               "/usr/sbin/sshd -D"      7 days ago      Up 44 seconds                                                                                                                                                         receiver
4a86598c2892   ubtest:22.04               "/usr/sbin/sshd -D"      7 days ago      Up 50 seconds                                                                                                                                                         sender
root@k0test1:~# 

root@k0test1:~# docker network inspect bridge 查看各容器IP地址分配情况。

网络拓扑:

3、Publish/Subscribe练习

3.1 概述

将把同一个消息传递给多个消费者,这种模式被称为"发布/订阅"(publish/subscribe)模式。下面是对这个概念的解读:

  1. 发布/订阅模式:这是一种消息通信模式,其中消息的发布者(发布者)不会将消息直接发送给特定的接收者(订阅者)。相反,消息会被发布到一个主题(topic)或频道(channel),任何对该主题或频道感兴趣的接收者都可以接收到消息。
  2. 发布者(Publisher):发布者是发送消息的实体。它不关心谁将接收消息,只是将消息发布到一个公共的频道或主题。
  3. 订阅者(Subscriber):订阅者是接收消息的实体。订阅者对特定的主题或频道感兴趣,并订阅这些主题或频道以接收发布到这些主题的消息。
  4. 消息(Message):在发布/订阅模式中,消息是被传递的数据。它可以是任何形式的数据,比如文本、JSON对象、二进制数据等。
  5. 主题(Topic)/频道(Channel):这是消息的分类方式。发布者将消息发布到特定的主题或频道,而订阅者则订阅感兴趣的主题或频道。
  6. 解耦:发布/订阅模式的一个主要优点是它允许发布者和订阅者之间的解耦。发布者不需要知道谁是消息的接收者,同样,订阅者也不需要知道谁是消息的发送者。
  7. 可扩展性:由于消息可以被多个订阅者接收,这种模式非常适合构建可扩展的系统,其中消息的消费者数量可能会有很大的变化。
  8. 消息队列系统:在实际应用中,发布/订阅模式通常通过消息队列系统实现,如RabbitMQ、Kafka等,这些系统提供了消息的持久化、传递保证和负载均衡等功能。

通过使用发布/订阅模式,可以构建出灵活、可扩展且易于维护的分布式系统。

注:本次练习使用exchange的fanout机制实现Publisher/Subscriber,未使用主题或频道。

3.2 RabbitMQ的完整消息模型

RabbitMQ的完整消息模型中,包括生产者、交换机、队列和消费者四个基本的组件:

  1. 生产者(Producer):是发送消息的应用程序。生产者创建消息并将其发送到消息队列系统中,但它并不直接将消息发送到特定的队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列中(见下面的说明)。
  2. 交换机(Exchange):是RabbitMQ中的核心组件,它充当消息的中转站。生产者不直接将消息发送到队列,而是发送到交换机。交换机接收来自生产者的消息,并根据一定的规则(这些规则由交换机类型和绑定定义)将消息路由到一个或多个队列。
  3. 队列(Queue):充当缓冲区,用于存储消息直到它们被消费者处理。队列是消息的终点,消费者从这里接收消息。
  4. 消费者(Consumer):是接收消息的应用程序。消费者连接到队列,监听并处理队列中的消息。

下面是对“生产者通常甚至不知道消息是否会被传递到任何队列中”这句话的说明:

  • 生产者的角色:在RabbitMQ中,生产者负责生成消息并将其发送到消息队列系统中。生产者的主要任务是确保消息被发送出去,但它不负责消息的最终目的地。
  • 解耦合:生产者和队列之间的解耦合意味着生产者不需要关心消息是如何被处理的,也不需要知道消息是否被传递到特定的队列。这种设计允许生产者和消费者独立地工作,减少了它们之间的依赖。
  • 交换机的作用:生产者发送消息到交换机,而不是直接发送到队列。交换机根据配置的规则(如路由键、交换机类型等)来决定消息的路由。如果交换机没有找到合适的队列来接收消息,消息可能不会被传递。
  • 消息的不确定性:由于生产者不直接与队列交互,它无法确定消息是否最终被传递到队列。这种不确定性是RabbitMQ设计的一部分,它提供了灵活性和可扩展性,但也意味着生产者需要信任系统的其他部分来正确处理消息。
  • 系统的复杂性:这种设计增加了系统的复杂性,因为生产者需要依赖交换机和绑定的配置来确保消息被正确地传递。然而,这也提供了更大的灵活性,允许系统根据不同的需求进行调整。
  • 可靠性和容错性:尽管生产者不能保证消息一定会被传递到队列,但RabbitMQ提供了一些机制(如持久化消息、死信交换机等)来提高消息传递的可靠性和容错性。

总的来说,这句话强调了RabbitMQ中生产者和队列之间的解耦合关系,以及生产者在消息传递过程中的不确定性。这种设计提供了灵活性和可扩展性,但也需要生产者信任系统的其他部分来正确处理消息。

RabbitMQ的完整消息模型其他相关概念如下:

  1. 交换机类型:决定了交换机如何处理接收到的消息。RabbitMQ支持多种类型的交换机,包括:- direct:直接交换机根据路由键将消息路由到具有匹配路由键的队列。- fanout:扇出交换机将消息发送到所有绑定的队列,不使用路由键。- topic:主题交换机根据路由键模式匹配将消息路由到队列。- headers:头交换机根据消息的header属性进行路由,而不是路由键。
  2. 绑定(Binding):是交换机和队列之间的一种关系,它定义了消息如何从交换机路由到特定的队列。绑定可以有路由键,也可以没有。
  3. 路由键(Routing Key):是一个字符串,用于direct和topic类型的交换机中,它帮助决定消息应该路由到哪些队列。
  4. 消息的丢弃:如果交换机接收到一个消息,但没有找到合适的队列进行路由(即没有绑定匹配),那么这个消息可能会被丢弃,除非它被设置为持久化(persistent)并且队列也支持持久化。
  5. 消息的持久化:生产者可以标记消息为持久化,这意味着即使在系统崩溃的情况下,消息也不会丢失。同样,队列也可以被设置为持久化,以确保存储在其中的消息在系统重启后仍然存在。

通过这种方式,RabbitMQ提供了一个灵活且强大的消息传递系统,允许应用程序以解耦的方式进行通信,并且可以根据不同的业务需求选择不同的交换机类型和路由规则。

3.3 简单日志系统演示说明

本次练习将通过构建一个简单的日志系统来演示publish/subscribe模式。日志系统由两个程序组成:一个用于发出日志消息,另一个用于接收这些消息。以下是对这段描述的解读:

  1. 日志消息的发出:第一个程序负责生成日志消息。这些消息可能包含程序运行时的状态、错误信息或其他重要的事件。
  2. 日志消息的接收与打印:第二个程序负责接收这些日志消息,并可以选择将它们打印出来。这个程序可以同时运行多个副本(或者在多个服务器上运行),每个副本/服务器都会接收到所有发出的日志消息。
  3. 多接收者模式:日志系统的设计允许多个接收者同时接收日志消息。这意味着,可以一个接收者程序将日志输出到文件,另一个接收者程序将日志显示在屏幕上。
  4. 广播机制:发出的日志消息将被广播给所有运行中的接收者。这种设计确保了所有接收者都能实时接收到日志消息,无论它们是用于输出到文件还是显示在屏幕上。
  5. 应用场景:这种日志系统可以用于多种场景,比如开发过程中的错误跟踪、系统运行时的状态监控等。通过同时运行多个接收者,可以灵活地选择日志的展示方式和存储位置。

简而言之,这个日志系统通过广播机制实现了日志消息的多接收者分发,使得日志既可以被记录到文件,也可以实时显示在屏幕上,增加了日志处理的灵活性和实用性。

3.4 fanout交换机和临时队列

这里介绍RabbitMQ

fanout

交换机,以及如何使用默认交换机、临时队列和绑定的概念。以下是对这些概念的概括说明:

  1. 交换机类型:RabbitMQ 支持多种交换机类型,包括 directtopicheadersfanoutfanout 交换机会将接收到的所有消息广播给所有绑定到该交换机的队列。
  2. 创建 fanout 交换机Channel.exchange_declare(exchange='logs', exchange_type='fanout')这行代码创建了一个名为 logsfanout 类型交换机,用于广播消息。
  3. 列出交换机:使用rabbitmqctl list_exchanges命令可以列出服务器上的所有交换机,包括默认的 amq.* 交换机和未命名的默认交换机。root@6648904dff64:/# rabbitmqctl list_exchanges Listing exchanges for vhost / ...name typeamq.topic topicamq.fanout fanoutamq.direct directamq.headers headersamq.match headersamq.rabbitmq.trace topic direct
  4. 默认交换机:在之前的练习中,即使没有明确指定交换机,也能够向队列发送消息,这是因为使用了默认交换机(用空字符串 "" 表示)。消息会根据 routing_key 路由到具有相同名称的队列。
  5. 发布消息到命名交换机channel.basic_publish(exchange='logs', routing_key='', body=message)使用命名交换机发布消息,routing_key 为空,因为 fanout 交换机不使用路由键。
  6. 临时队列:与之前使用具有特定名称的队列不同,对于日志记录,我们希望接收所有流动的消息,而不是其中的一部分,并且只关心当前的消息,不关心旧消息。为此,我们使用临时队列:result = channel.queue_declare(queue='', exclusive=True)这会创建一个由服务器随机命名的队列,并在连接关闭时自动删除。
  7. 绑定:绑定是交换机和队列之间的关系,它定义了消息如何从交换机路由到队列:channel.queue_bind(exchange='logs', queue=result.method.queue)这行代码将 logs 交换机绑定到我们创建的队列,使得交换机将消息追加到该队列。
  8. 列出绑定:使用 rabbitmqctl list_bindings 命令可以列出现有的绑定关系。

总的来说,本练习中RabbitMQ 中使用

fanout

交换机进行消息广播,创建临时队列并将其绑定到交换机,以便接收所有相关消息。

3.5 Sending

关于生产者(消息发布者)程序,有以下几点说明:

  1. 生产者程序变化:生产者程序在发送日志消息时,与之前的练习相比,主要的不同在于它现在要将消息发布到一个名为 logs 的特定交换机,而不是默认的无名交换机。
  2. 发布到 logs 交换机:在发送消息时,需要指定交换机的名称,这里是 logs
  3. 路由键的使用:尽管在发送消息时需要提供一个 routing_key,但对于 fanout 类型的交换机来说,这个键的值是被忽略的。fanout 交换机会将所有接收到的消息广播给所有绑定到它的队列,不论路由键的值是什么。

进入sender容器,vi编写emit_log.py:

root@sender:/# vi emit_log.py
root@sender:/# cat emit_log.py 
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()
root@sender:/# 

这段代码是一个 Python 脚本,用于将消息发布到 RabbitMQ 服务器的

logs

交换机。以下是详细说明:

  1. 导入所需模块:- import pika:导入 pika,这是 RabbitMQ 的 Python 客户端库。- import sys:导入 sys 模块,用于访问命令行参数。
  2. 建立连接connection = pika.BlockingConnection( pika.ConnectionParameters(host='172.17.0.2'))- 创建一个到 RabbitMQ 服务器的连接。- host='172.17.0.2' 指定了 RabbitMQ 服务器的 IP 地址。这里假设 RabbitMQ 服务运行在 IP 为 172.17.0.2 的机器上。
  3. 创建通道Channel = connection.channel()- 创建一个新的通道(channel),用于在生产者和 RabbitMQ 之间发送和接收消息。
  4. 声明交换机Channel.exchange_declare(exchange='logs', exchange_type='fanout')- 声明一个名为 logsfanout 类型的交换机。- fanout 交换机会将所有接收到的消息广播给所有绑定到该交换机的队列。
  5. 构造消息内容message = ' '.join(sys.argv[1:]) or "info: Hello World!"- 从命令行参数构造消息内容。如果没有提供参数,则使用默认消息 "info: Hello World!"
  6. 发布消息channel.basic_publish(exchange='logs', routing_key='', body=message)- 使用 basic_publish 方法将消息发布到 logs 交换机。- routing_key 参数为空字符串,对于 fanout 交换机,这个值会被忽略。
  7. 打印消息发送确认print(f" [x] Sent {message}")- 打印一条消息,确认消息已经发送。
  8. 关闭连接connection.close()- 关闭与 RabbitMQ 服务器的连接。

这个脚本是一个生产者程序,用于向 RabbitMQ 发送消息。它使用了

pika

库来与 RabbitMQ 进行交互,并通过命令行参数来确定发送的消息内容。如果没有提供命令行参数,它将发送一个默认的消息。这个消息会被发送到一个

fanout

类型的交换机,然后由该交换机广播给所有绑定到它的队列。这对于日志记录等场景非常有用,因为它允许多个消费者监听相同的消息流。

在这里,有以下几点需要考虑:

  1. 连接后声明交换机:在建立到 RabbitMQ 服务器的连接之后,需要先声明交换机。这是必要的步骤,因为向一个不存在的交换机发布消息是不允许的。
  2. 交换机不存在时消息丢失:如果尝试向尚未存在的交换机发布消息,消息将会丢失。但在某些情况下,比如日志记录,如果没有队列绑定到交换机,或者没有消费者正在监听,我们可以安全地放弃这些消息。
  3. 发布消息的逻辑:在发布消息之前,确保交换机已经存在是很重要的。这可以通过在代码中显式声明交换机来实现,如前述 Python 脚本中的 channel.exchange_declare() 调用。
  4. 消息丢失的容忍性:在某些应用场景中,比如日志信息的发布,如果当前没有消费者(如日志监控系统)正在监听消息,那么丢失这些消息是可以接受的。这种设计允许系统在没有消费者的情况下继续运行,而不会导致错误或异常。
  5. 交换机的作用:交换机在 RabbitMQ 中充当消息路由的角色,它根据类型和绑定的队列来决定消息的分发。fanout 类型的交换机会将消息发送给所有绑定的队列,而不考虑路由键。

总之,在使用 RabbitMQ 发布消息时,需要先确保交换机存在,以避免消息丢失。同时,在某些情况下,如日志记录,消息的丢失是可以接受的,特别是当没有消费者监听时。

3.6 Receiving

进入receiver/receiver2容器,vi编写receive_logs.py:

root@receiver:/# vi receive_logs.py
root@receiver:/# cat receive_logs.py
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {body}")

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
root@receiver:/# 

这段代码是使用 Python 的

pika

库来实现与 RabbitMQ 消息代理的交互。RabbitMQ 是一个开源的消息代理软件,它使用高级消息队列协议(AMQP)来提供可靠的消息队列服务。下面是代码的逐行解释:

  1. import pika: 导入 pika 模块,这是 Python 用于与 RabbitMQ 交互的库。
  2. 创建一个到 RabbitMQ 服务器的连接:connection = pika.BlockingConnection( pika.ConnectionParameters(host='172.17.0.2'))这里使用 pika.BlockingConnection 创建了一个连接。pika.ConnectionParameters 用于定义连接参数,这里指定了 RabbitMQ 服务器的主机地址。
  3. 创建一个通道(channel):channel = connection.channel()通道是 AMQP 协议中的一个概念,提供了消息传递的逻辑通道。
  4. 声明一个交换机(exchange):channel.exchange_declare(exchange='logs', exchange_type='fanout')这里声明了一个名为 logs 的交换机,类型为 fanoutfanout 类型的交换机会将消息广播到所有绑定到它的队列。
  5. 声明一个队列并获取队列名称:result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue使用 queue_declare 方法声明一个队列,queue 参数为空字符串表示让 RabbitMQ 自动生成一个唯一的队列名称。exclusive=True 参数表示这个队列是独占的,仅对声明它的连接可见。
  6. 将队列绑定到交换机:channel.queue_bind(exchange='logs', queue=queue_name)这将之前声明的队列绑定到 logs 交换机,这样交换机就可以将消息路由到这个队列。
  7. 打印提示信息,表示程序正在等待接收日志消息:print(' [*] Waiting for logs. To exit press CTRL+C')
  8. 定义回调函数 callback,用于处理接收到的消息:def callback(ch, method, properties, body): print(f" [x] {body}")当消息到达队列时,这个函数会被调用。参数 ch 是通道对象,method 是接收消息的方法,properties 是消息的属性,body 是消息体。
  9. 使用 basic_consume 方法设置消费者:channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)这里指定了队列名称、消息到达时调用的回调函数,以及 auto_ack=True 表示自动确认消息。
  10. 启动消息消费:channel.start_consuming()调用 start_consuming 开始接收消息,直到程序被手动中断(例如通过按 CTRL+C)。

这段代码的目的是创建一个消费者,它连接到 RabbitMQ 服务器,声明一个交换机和队列,并将队列绑定到交换机上。然后,它等待接收通过交换机发送的消息,并在接收到消息时打印出来。

3.7 开始测试

1、保存日志到文件:

  • 进入receiver容器。
  • 输入命令 python3 receive_logs.py > logs_from_rabbit.log 来运行 receive_logs.py 脚本。
  • 这会将脚本的输出重定向到 logs_from_rabbit.log 文件中,从而保存日志。
root@receiver:/# python3 receive_logs.py > logs_from_rabbit.log

2、在屏幕上查看日志:

  • 进入receiver2容器。
  • 运行 python3 receive_logs.py 命令。
  • 这将启动 receive_logs.py 脚本,并且日志消息将直接显示在终端屏幕上。
root@receiver2:/# python3 receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C

3、验证绑定和队列:

  • 使用 rabbitmqctl list_bindings 命令来检查 RabbitMQ 的绑定情况。
  • 这个命令会列出所有的绑定关系,包括交换机、队列和它们之间的绑定。
root@05a4c44b32bb:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-KSY2L6SebZr96Vi1DoOYkQ  queue   amq.gen-KSY2L6SebZr96Vi1DoOYkQ  []
        exchange        amq.gen-anB8NnMUFbC1lsN-cZE4mQ  queue   amq.gen-anB8NnMUFbC1lsN-cZE4mQ  []
logs    exchange        amq.gen-KSY2L6SebZr96Vi1DoOYkQ  queue   amq.gen-KSY2L6SebZr96Vi1DoOYkQ  []
logs    exchange        amq.gen-anB8NnMUFbC1lsN-cZE4mQ  queue   amq.gen-anB8NnMUFbC1lsN-cZE4mQ  []
root@05a4c44b32bb:/# 
logs

交换机已经绑定了两个队列,队列名称是由 RabbitMQ 服务器自动生成的,如下图所示:

4、发送日志:

  • 进入sender容器。
  • 要发送日志到 RabbitMQ,需要运行 python3 emit_log.py 命令。
  • 这将执行 emit_log.py 脚本,它负责将日志消息发送到 RabbitMQ。
root@sender:/# python3 emit_log.py 
 [x] Sent info: Hello World!
root@sender:/# python3 emit_log.py Second message!
 [x] Sent Second message!
root@sender:/# 

5、输出结果:

  • receiver容器:
root@receiver:/# cat logs_from_rabbit.log 
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'Second message!'
root@receiver:/# 
  • receiver2容器:
root@receiver2:/# python3 receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'Second message!'

输出结果说明从

logs

交换机发出的数据被正确地发送到了两个队列,这些队列的名称是服务器分配的。这符合我们的预期,即交换机应该将消息广播到所有绑定的队列。

4、Wireshark抓包

4.1 抓包方式

参考:《RabbitMQ练习(Hello World)》

4.2 抓包信息和典型数据包

4.2.1 消费者等待消息

receiver容器创建到 RabbitMQ 服务器的连接信息:

典型数据包:

1、声明(创建)一个交换机(exchange)——Frame 18

Frame 18: 97 bytes on wire (776 bits), 97 bytes captured (776 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:04 (02:42:ac:11:00:04), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.4 (172.17.0.4), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 40738, Dst Port: 5672, Seq: 357, Ack: 571, Len: 31
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 23
    Class: Exchange (40)
    Method: Declare (10)
    Arguments
        Ticket: 0
        Exchange: logs
        Type: fanout
        .... ...0 = Passive: False
        .... ..0. = Durable: False
        .... .0.. = Auto-Delete: False
        .... 0... = Internal: False
        ...0 .... = Nowait: False
        Arguments

描述的是使用高级消息队列协议(AMQP)中的一个方法调用,具体是

Exchange.Declare

方法。这个方法用于声明(创建)一个新的交换器。下面是对这段文本中各个字段的详细解释:

  1. Type: 表示这是一个方法(Method)类型的消息。在AMQP协议中,消息类型包括方法(Method)、属性(Header)、体(Body)等。
  2. Channel: 指定了这个命令是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息的长度,单位是字节。这里的长度是 23 字节。
  4. Class: 表示这个方法属于哪个类。在这个例子中,类是 Exchange,AMQP 中的类包括 QueueExchangeBasic 等。
  5. Method: 表示调用的方法名称。这里是 Declare,用于声明一个新的交换器。
  6. Arguments: 包含方法调用时需要的参数。- Ticket: 一个内部使用的标识符,用于在客户端和服务器之间传递请求。值为 0 通常表示这是客户端生成的第一个请求。- Exchange: 交换器的名称,这里是 logs。- Type: 交换器的类型,这里是 fanout,表示这是一个扇出交换器,它会将消息发送到所有绑定的队列。- Passive: 如果设置为 True,则表示检查是否存在交换器,如果不存在则抛出异常。这里设置为 False,表示不检查,而是创建交换器。- Durable: 如果设置为 True,则表示交换器是持久化的,即使在服务器重启后也会存在。这里设置为 False,表示交换器是非持久化的。- Auto-Delete: 如果设置为 True,则表示当没有队列绑定到交换器时,交换器将自动删除。这里设置为 False。- Internal: 如果设置为 True,则表示交换器仅用于内部目的,不会被客户端使用。这里设置为 False,表示交换器可以被客户端使用。- Nowait: 如果设置为 True,则表示方法调用不需要服务器响应。这里设置为 False,表示需要服务器响应。
  7. Arguments: 参数列表的结束标识。

这个

Exchange.Declare

方法调用的目的是创建一个新的名为

logs

的扇出交换器,这个交换器不是持久化的,也不是自动删除的,也不是内部使用的,并且需要服务器的响应。

2、声明(创建)一个队列——Frame 20

Frame 20: 86 bytes on wire (688 bits), 86 bytes captured (688 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:04 (02:42:ac:11:00:04), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.4 (172.17.0.4), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 40738, Dst Port: 5672, Seq: 388, Ack: 583, Len: 20
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 12
    Class: Queue (50)
    Method: Declare (10)
    Arguments
        Ticket: 0
        Queue: 
        .... ...0 = Passive: False
        .... ..0. = Durable: False
        .... .1.. = Exclusive: True
        .... 0... = Auto-Delete: False
        ...0 .... = Nowait: False
        Arguments

描述的是使用高级消息队列协议(AMQP)中的一个方法调用,具体是

Queue.Declare

方法。AMQP 是一个提供高度可靠的异步消息传递的协议,广泛应用于分布式系统和微服务架构中。下面是对这段文本中各个字段的解释:

  1. Type: 表示这是一个方法(Method)类型的消息,AMQP 消息类型包括方法(Method)、属性(Header)、体(Body)等。
  2. Channel: 指定了这个命令是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息的长度,单位是字节。这里的长度是 12 字节。
  4. Class: 表示这个方法属于哪个类。在这个例子中,类是 Queue,AMQP 中的类包括 QueueExchangeBasic 等。
  5. Method: 表示调用的方法名称。这里是 Declare,用于声明一个新的队列。
  6. Arguments: 包含方法调用时需要的参数。- Ticket: 一个内部使用的标识符,用于在客户端和服务器之间传递请求。- Queue: 队列的名称,这里没有提供,可能表示客户端需要提供队列名称。- Passive: 如果设置为 True,则表示检查是否存在队列,如果不存在则抛出异常。这里设置为 False,表示不检查,而是创建队列。- Durable: 如果设置为 True,则表示队列是持久化的,即使在服务器重启后也会存在。这里设置为 False,表示队列是非持久化的。- Exclusive: 如果设置为 True,则表示队列是独占的,只能由连接到它的客户端使用。这里设置为 True。- Auto-Delete: 如果设置为 True,则表示当没有消费者连接到队列时,队列将自动删除。这里设置为 False。- Nowait: 如果设置为 True,则表示方法调用不需要服务器响应。这里设置为 False,表示需要服务器响应。
  7. Arguments: 表示参数的结束。

这个

Queue.Declare

方法调用的目的是创建一个新的队列,但队列名称没有在文本中给出。根据参数设置,这个队列是独占的,不是持久化的,也不是自动删除的,并且需要服务器的响应。

3、声明(创建)队列的响应——Frame 21

Frame 21: 117 bytes on wire (936 bits), 117 bytes captured (936 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:02 (02:42:ac:11:00:02), Dst: 02:42:ac:11:00:04 (02:42:ac:11:00:04)
Internet Protocol Version 4, Src: 172.17.0.2 (172.17.0.2), Dst: 172.17.0.4 (172.17.0.4)
Transmission Control Protocol, Src Port: 5672, Dst Port: 40738, Seq: 583, Ack: 408, Len: 51
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 43
    Class: Queue (50)
    Method: Declare-Ok (11)
    Arguments
        Queue: amq.gen-n1_VgX3pWoYEt6CV9STq8A
        Message-Count: 0
        Consumer-Count: 0

描述的是使用高级消息队列协议(AMQP)中的一个方法响应,具体是

Queue.Declare-Ok

方法。这是一个响应消息,表明之前发送的

Queue.Declare

方法调用已经成功执行。下面是对这段文本中各个字段的解释:

  1. Type: 表示这是一个方法响应(Method)类型的消息。在AMQP协议中,消息类型包括方法(Method)、属性(Header)、体(Body)等。
  2. Channel: 指定了这个响应是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息的长度,单位是字节。这里的长度是 43 字节。
  4. Class: 表示这个方法响应属于哪个类。在这个例子中,类是 Queue,表示与队列相关。
  5. Method: 表示调用的方法名称。这里是 Declare-Ok,表示 Queue.Declare 方法调用成功。
  6. Arguments: 包含方法响应时提供的参数。- Queue: 声明的队列名称。在这个例子中,队列名称是 amq.gen-n1_VgX3pWoYEt6CV9STq8A。这个名称通常是自动生成的,特别是当 Queue.Declare 方法调用中没有指定队列名称时。- Message-Count: 队列中的消息数量。这里是 0,表示队列中没有消息。- Consumer-Count: 连接到队列的消费者数量。这里也是 0,表示当前没有消费者连接到这个队列。
Queue.Declare-Ok

方法响应表明客户端成功地声明了一个队列,并且队列是空的,没有消息,也没有消费者。这是客户端和服务器之间交互的一部分,确保队列已经按照客户端的要求正确设置。

4、 队列绑定 —— Frame 22

Frame 22: 152 bytes on wire (1216 bits), 152 bytes captured (1216 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:04 (02:42:ac:11:00:04), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.4 (172.17.0.4), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 40738, Dst Port: 5672, Seq: 408, Ack: 634, Len: 86
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 78
    Class: Queue (50)
    Method: Bind (20)
    Arguments
        Ticket: 0
        Queue: amq.gen-n1_VgX3pWoYEt6CV9STq8A
        Exchange: logs
        Routing-Key: amq.gen-n1_VgX3pWoYEt6CV9STq8A
        .... ...0 = Nowait: False
        Arguments

描述的是使用高级消息队列协议(AMQP)中的一个方法调用,具体是

Queue.Bind

方法。这个方法用于将队列绑定到一个交换器上,这样队列就可以接收通过交换器路由的消息。下面是对这段文本中各个字段的解释:

  1. Type: 表示这是一个方法(Method)类型的消息。在AMQP协议中,消息类型包括方法(Method)、属性(Header)、体(Body)等。
  2. Channel: 指定了这个命令是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息的长度,单位是字节。这里的长度是 78 字节。
  4. Class: 表示这个方法属于哪个类。在这个例子中,类是 Queue,表示与队列相关。
  5. Method: 表示调用的方法名称。这里是 Bind,用于将队列绑定到交换器。
  6. Arguments: 包含方法调用时需要的参数。- Ticket: 一个内部使用的标识符,用于在客户端和服务器之间传递请求。值为 0 通常表示这是客户端生成的第一个请求。- Queue: 要绑定的队列名称。在这个例子中,队列名称是 amq.gen-n1_VgX3pWoYEt6CV9STq8A。- Exchange: 队列要绑定到的交换器名称。这里是 logs。- Routing-Key: 路由键,用于确定消息如何从交换器路由到队列。在这个例子中,路由键与队列名称相同,这可能是一个巧合,也可能是有意为之,以确保所有发送到 logs 交换器的消息都使用相同的路由键。- Nowait: 如果设置为 True,则表示方法调用不需要服务器响应。这里设置为 False,表示需要服务器响应。
Queue.Bind

方法调用的目的是将指定的队列

amq.gen-n1_VgX3pWoYEt6CV9STq8A

绑定到名为

logs

的交换器上,使用相同的路由键。这通常意味着所有发送到

logs

交换器的消息,如果指定了与队列名称相同的路由键,都将被路由到这个队列。如果需要服务器响应,客户端将等待服务器发送

Queue.Bind-Ok

方法,以确认绑定操作成功。

4.2.2 生产者发送消息

sender容器(生产者)创建到 RabbitMQ 服务器的连接,发送消息:

典型数据包:

1、发布(发送)消息到交换机——Frame 96

Frame 98: 87 bytes on wire (696 bits), 87 bytes captured (696 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 37252, Dst Port: 5672, Seq: 388, Ack: 583, Len: 21
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 13
    Class: Basic (60)
    Method: Publish (40)
    Arguments
        Ticket: 0
        Exchange: logs
        Routing-Key: 
        .... ...0 = Mandatory: False
        .... ..0. = Immediate: False

描述的是使用高级消息队列协议(AMQP)中的一个方法调用,具体是

Basic.Publish

方法。这个方法用于发布(发送)消息到交换器。下面是对这段文本中各个字段的详细解释:

  1. Type: 表示这是一个方法(Method)类型的消息。在AMQP协议中,消息类型包括方法(Method)、属性(Header)、体(Body)等。
  2. Channel: 指定了这个命令是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息的长度,单位是字节。这里的长度是 13 字节。
  4. Class: 表示这个方法属于哪个类。在这个例子中,类是 Basic,这是AMQP协议中用于基本消息操作的类。
  5. Method: 表示调用的方法名称。这里是 Publish,用于发布消息。
  6. Arguments: 包含方法调用时需要的参数。- Ticket: 一个内部使用的标识符,通常用于客户端和服务器之间的请求和响应。值为 0 通常表示这是客户端生成的第一个请求。- Exchange: 消息要发送到的交换器名称。在这个例子中,交换器名称是 logs。- Routing-Key: 路由键,用于确定消息如何从交换器路由到队列。在这个例子中,路由键是空的,这可能意味着使用默认路由键或者交换器类型(如 fanout)不需要路由键。- Mandatory: 如果设置为 True,则表示如果消息不能被路由到任何队列,则服务器将返回一个错误。这里设置为 False,表示即使消息不能被路由,也不会有错误返回。- Immediate: 如果设置为 True,则表示消息应该被立即处理,如果队列上没有消费者,则服务器将返回一个错误。这里设置为 False,表示即使队列上没有消费者,消息也会被发送到队列。
Basic.Publish

方法调用的目的是将消息发送到名为

logs

的交换器。由于路由键为空,这取决于交换器的类型和配置,消息可能会被发送到所有绑定的队列,或者根据交换器的类型(如

fanout

)被广播到所有队列。

Mandatory

Immediate

参数的设置表明,即使消息不能被路由或队列上没有消费者,也不会有错误返回。

2、消息体 —— Frame 100

Frame 100: 92 bytes on wire (736 bits), 92 bytes captured (736 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 37252, Dst Port: 5672, Seq: 431, Ack: 583, Len: 26
Advanced Message Queuing Protocol
    Type: Content body (3)
    Channel: 1
    Length: 18
    Payload: 696e666f3a2048656c6c6f20576f726c6421

描述的是使用高级消息队列协议(AMQP)中的一个消息体(Content body)部分。在AMQP协议中,消息体通常包含实际的消息数据,而消息头(Header)则包含有关消息的元数据。下面是对这段文本中各个字段的解释:

  1. Type: 表示这是一个内容体(Content body)类型的消息。在AMQP协议中,消息体是消息数据的实际部分。
  2. Channel: 指定了这个内容体是针对哪个信道(Channel)的。在这个例子中,信道编号是 1。
  3. Length: 表示整个消息体的长度,单位是字节。这里的长度是 18 字节。
  4. Payload: 这是实际的消息数据。在这个例子中,数据是 696e666f3a2048656c6c6f20576f726c6421,这是一串十六进制编码的字符。将其转换为可读的文本格式,我们得到 "info: Hello World!"

消息体的

Length

字段告诉我们消息体的大小,而

Payload

字段包含了消息的实际内容。在这个例子中,消息体包含了一个简单的字符串 "info: Hello World!",这可能是一个日志消息或者测试消息,用于演示消息队列的基本功能。

4.2.3 RabbitMQ 向消费者发送消息

典型数据包:

1、 消息传递给消费者

Frame 102: 180 bytes on wire (1440 bits), 180 bytes captured (1440 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:02 (02:42:ac:11:00:02), Dst: 02:42:ac:11:00:04 (02:42:ac:11:00:04)
Internet Protocol Version 4, Src: 172.17.0.2 (172.17.0.2), Dst: 172.17.0.4 (172.17.0.4)
Transmission Control Protocol, Src Port: 5672, Dst Port: 40738, Seq: 721, Ack: 607, Len: 114
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 58
    Class: Basic (60)
    Method: Deliver (60)
    Arguments
        Consumer-Tag: ctag1.0bcc83bc3a97497595947e4916f4663f
        Delivery-Tag: 1
        .... ...0 = Redelivered: False
        Exchange: logs
        Routing-Key: 
Advanced Message Queuing Protocol
    Type: Content header (2)
    Channel: 1
    Length: 14
    Class ID: Basic (60)
    Weight: 0
    Body size: 18
    Property flags: 0x0000
    Properties
Advanced Message Queuing Protocol
    Type: Content body (3)
    Channel: 1
    Length: 18
    Payload: 696e666f3a2048656c6c6f20576f726c6421

在AMQP部分,我们可以看到三个不同类型的帧:

  • Type: Method (1): 这是一个方法帧,用于执行AMQP方法调用。这里的方法帧包含了Basic.Deliver方法,它用于将消息传递给消费者。- Channel: 1,指定了AMQP通道。- Length: 58,指定了帧的字节长度。- Class: Basic (60),指定了AMQP类。- Method: Deliver (60),指定了方法名称。- Arguments: - Consumer-Tag: 用于标识消费者的标签。- Delivery-Tag: 消息的唯一标识符。- Redelivered: 表示消息是否是重新投递的。- Exchange: 消息最初发送到的交换器。- Routing-Key: 消息的路由键。
  • Type: Content header (2): 这是一个内容头帧,它包含了消息体的元数据。- Channel: 1,指定了AMQP通道。- Length: 14,指定了帧的字节长度。- Class ID: Basic (60),指定了AMQP类。- Weight: 0,消息的优先级。- Body size: 18,消息体的大小。- Property flags: 0x0000,消息属性的标志。
  • Type: Content body (3): 这是一个内容体帧,它包含了消息的实际数据。- Channel: 1,指定了AMQP通道。- Length: 18,指定了帧的字节长度。- Payload: 696e666f3a2048656c6c6f20576f726c6421,这是十六进制编码的字符串,解码后为 "info: Hello World!"。

这个数据包表示一个消息("info: Hello World!")通过AMQP协议传递给一个消费者(使用消费者标签 ctag1.0bcc83bc3a97497595947e4916f4663f)。消息最初发送到名为

logs

的交换器,没有特定的路由键,并且没有被重新投递。消息体的大小是18字节。

4.3 只显示消息发送过程的抓包信息

4.4 只显示消息发送过程的抓包流量图


本文转载自: https://blog.csdn.net/zkyqss/article/details/141403497
版权归原作者 zkyqss 所有, 如有侵权,请联系我们删除。

“RabbitMQ练习(Publish/Subscribe)”的评论:

还没有评论