0


基于RabbitMQ的RPC通信

基于RabbitMQ的RPC通信

当需要调用局域网中的服务时,可以用frp进行穿透,也可以在公网搭建RabbitMQ服务器做消息中转,本文演示了这个步骤。

版本信息

属性值RabbitMQ3.13.0pika1.3.1python3.6.8CPU2颗 Intel® Xeon® Gold 6248R CPU @ 3.00GHz

操作步骤

搭建RabbitMQ(默认用户名:guest 密码:guest )

docker run -d--name rabbitmq -p5671:5671 -p5672:5672 -p4369:4369 \-p25672:25672 -p15671:15671 -p15672:15672 rabbitmq:management

服务端实现(srv.py)

# -*- coding:utf-8 -*-import pika
import time
import sys
import os

defrequest_handler(message):#content=message.decode('utf-8')#print(content)return message
    
defon_request(ch, method, props, body):
    response = request_handler(body)
    ch.basic_publish(exchange="",
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         content_encoding='utf-8',
                         correlation_id=props.correlation_id,),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)whileTrue:try:
        credentials = pika.PlainCredentials("guest","guest")
        connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1",port=5672, credentials=credentials))
        channel = connection.channel()
        channel.queue_declare(queue="rpc_queue")    
        channel.basic_consume("rpc_queue", on_request)print(" waiting requests")
        channel.start_consuming()except:print("error restart")pass
    time.sleep(1)

客户端实现(client.py)

# -*- coding:utf-8 -*-import os
import io
import time
import uuid
import sys
import requests
import warnings
import numpy as np
import pika
import time
import sys
import traceback
import json
import queue
import uuid
import time
import argparse

classMqRpcClient(object):def__init__(self):
        credentials = pika.PlainCredentials("guest","guest")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1",port=5672, credentials=credentials))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue="", exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.callback_queue, self.on_response,True)defon_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:
            self.response = body

    defsend(self, message,timeout=5):
        self.response =None
        self.corr_id =str(uuid.uuid4())
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       content_encoding='utf-8',
                                       correlation_id=self.corr_id,),
                                   body=message.encode('utf-8')) 
        t0=time.time()while self.response isNone:
            self.connection.process_data_events()
            t1=time.time()if t1-t0>timeout:returnNonereturn self.response

defsend_message(message,loop):
    rpc = MqRpcClient()
    t0=time.time()for i inrange(loop):
        response = rpc.send(message,1)if response isNoneand response==message:print("Failed")break
    t1=time.time()print("qps:{:.2f}".format(loop/(t1-t0)))returnTrueif __name__ =="__main__":
    parse = argparse.ArgumentParser()
    parse.add_argument('--msg',type=str, default='',help='')
    parse.add_argument('--loop',type=int, default=1,help='')
    args = parse.parse_args()
    send_message(args.msg,args.loop)

性能测试(4919 qps)

# 安装依赖
pip3 installpika==1.3.1

# 启动服务端(服务端收到请求后,直接回复)
python3 srv.py &# 客户端多路请求,测试总吞吐cat<<EOF|tee rpc.sh
for i in \`seq0\$1\`
do
  python3 client.py  --msg HelloWorld --loop 1000 &
done
wait
EOFbash rpc.sh  16|awk -F: '{SUM+=$2};END{print SUM}'
标签: rabbitmq rpc

本文转载自: https://blog.csdn.net/m0_61864577/article/details/136354804
版权归原作者 Hi20240217 所有, 如有侵权,请联系我们删除。

“基于RabbitMQ的RPC通信”的评论:

还没有评论