0


rabbitMQ~工作模式代码实现【基于python pika模块】

文章目录

1 简单模式

在这里插入图片描述

import pika

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=xxx))

# 创建channel
channel = connection.channel()

# 指定队列,如果不存在队列则创建一个新队列
queue_key = 'test_queue'
channel.queue_declare(queue=queue_key)

# 发送消息,消息永远不可能直接发送到队列,一定会先经过交换机
channel.basic_publish(exchange='', routing_key=queue_key, body=b'test')        #简单模式下不是没有交换机,而是使用rabbitMQ默认的交换机

# 接受消息
# 1.回调函数
def callback(ch, method, properties, body):
    print(body)  # b'test'

# 2.消费者接受消息
channel.basic_consume(queue=queue_key,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()
2 工作模式

在这里插入图片描述

生产者:
import sys
import pika

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=xxx))

# 创建channel
channel = connection.channel()

# 指定队列,如果不存在队列则创建一个新队列
queue_key = 'test_queue'
channel.queue_declare(queue=queue_key)

# 发送消息
body = 'send {}'.format(sys.argv[1:])
channel.basic_publish(exchange='', routing_key=queue_key, body=body)

#工作模式下,默认情况下,RabbitMQ 会将每条消息发送给下一个消费者,通过平均分配的模式,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询;

消费者1:
# 1.回调函数
def callback(ch, method, properties, body):
    print(body)

# 2.消费者接受消息
channel.basic_consume(queue=queue_key,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()

消费者2:
# 1.回调函数
def callback(ch, method, properties, body):
    print(body)

# 2.消费者接受消息
channel.basic_consume(queue=queue_key,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()

print信息:
同时启动消费者1与消费者2,然后循环运行生产者发送消息:
>python tt1.py msg1
>python tt1.py msg2
>python tt1.py msg3
>python tt1.py msg4

消费者1:
b"send ['msg1']"
b"send ['msg3']"

消费者2:
b"send ['msg2']"
b"send ['msg4']"
3 发布订阅模式

在这里插入图片描述

生产者:
import pika
from pika.exchange_type import ExchangeType

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=xxx))

# 创建channel
channel = connection.channel()

# 定义交换机
exchange_key = 'test_exchange'
channel.exchange_declare(exchange=exchange_key,
                         exchange_type=ExchangeType.fanout)

# 发送消息,对于扇形交换机 则routing_key可以指定为:''
channel.basic_publish(exchange=exchange_key, routing_key='', body=b'test_msg.')

消费者:
# 队列随机命名,exclusive=True 命名唯一性 一旦消费者链接关闭,队列删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 绑定指定的队列
channel.queue_bind(exchange=exchange_key, queue=queue_name)

# 1.回调函数
def callback(ch, method, properties, body):
    print(body)

# 2.消费者接受消息
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()

启动两个消费者与生产者:
1.
amq.gen-yKd-NoMgCUHbWf8qjftSpQ
b'test_msg.'

2.
amq.gen-mPMNWmZL4w3BKogP2Uq4EQ
b'test_msg.'

发布订阅模式使用的是fanout交换机,通过交换机将消息发送给所有的消费者
4 路由模式

在这里插入图片描述

生产者:
import pika
from pika.exchange_type import ExchangeType

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='7.220.225.46', port=5672))

# 创建channel
channel = connection.channel()

# 定义交换机(通过直接交换机)
exchange_key = 'test_exchange'
channel.exchange_declare(exchange=exchange_key,
                         exchange_type=ExchangeType.direct)

# 发送消息,通过直接交换机,绑定消息的路由键通过直接交换机发送到绑定相同路由键的队列
channel.basic_publish(exchange=exchange_key, routing_key='q1', body=b'q1: test_msg.')

在这里插入图片描述

5 主题模式

在这里插入图片描述

生产者:

import pika
from pika.exchange_type import ExchangeType

"""
topic模式routing_key必须是一个单词组合,以"."进行连接
① "*"号代表一个词;
② "#"号代表0个或多个单词;
"""

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='xxx', port=xxx))

# 创建channel
channel = connection.channel()

# 定义交换机(topic交换机)
exchange_key = 'test_exchange'
channel.exchange_declare(exchange=exchange_key,
                         exchange_type=ExchangeType.topic)

# 发送消息,发送带有routing_key(topic规则的词组)消息
channel.basic_publish(exchange=exchange_key, routing_key='key.q1.value', body=b'q1:test_msg.')        #绑定routing_key:key.q1.value发送消息
channel.basic_publish(exchange=exchange_key, routing_key='key.q2.value', body=b'q2:test_msg.')        #绑定routing_key:key.q2.value发送消息
消费者1:
# 队列随机命名,exclusive=True 命名唯一性 一旦消费者链接关闭,队列删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 绑定指定的队列,指定绑定的交换机,指定绑定的routing_key 
channel.queue_bind(exchange=exchange_key, queue=queue_name, routing_key='*.q1.*')

交换机会按照队列的规则进行消息的转发,将匹配的消息入队 消费者进行消费

# 1.回调函数
def callback(ch, method, properties, body):
    print(body)

# 2.消费者接受消息
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()
消费者2:
# 队列随机命名,exclusive=True 命名唯一性 一旦消费者链接关闭,队列删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 绑定指定的队列,指定绑定的交换机,指定绑定的routing_key
channel.queue_bind(exchange=exchange_key, queue=queue_name, routing_key='*.q2.*')

# 1.回调函数
def callback(ch, method, properties, body):
    print(body)

# 2.消费者接受消息
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

# 3.消费
channel.start_consuming()
打印信息:
消费者1:routing_key='*.q1.*'
amq.gen-KbJ3qpqCJpNVW3ANtZXP4w
b'test_msg.'b'q1:test_msg.'b'q1:test_msg.'

消费者2:routing_key='*.q2.*'
amq.gen-3Os4GkVl_5mLd4fzc3dJ2g
b'test_msg.'b'q2:test_msg.'b'q2:test_msg.'b'q2:test_msg.'b'q2:test_msg.'

本文转载自: https://blog.csdn.net/Grit_my/article/details/131316529
版权归原作者 不知名美食探索家 所有, 如有侵权,请联系我们删除。

“rabbitMQ~工作模式代码实现【基于python pika模块】”的评论:

还没有评论