0


Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp

一 单接口

#!/usr/bin/env python# -*- coding:utf-8 -*-import json
import random
import datetime

import requests
import threading
import time

classPresstest(object):
    headers ={'Content-Type':'application/json; charset=UTF-8'}def__init__(self, login_url, press_url, phone, password):
        self.login_url = login_url
        self.press_url = press_url
        self.phone = phone
        self.password = password
        self.session = requests.Session()
        self.session.headers = self.headers

    deflogin(self):'''登陆获取session'''
        data ={"mobile": self.phone,"pwd": self.password,"type":"1"}
        res = self.session.post(self.login_url, data=json.dumps(data))
        XToken = res.json().get('data').get('token')
        self.session.headers['token']= XToken

    deftestinterface(self):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
        data ={"id":418}global ERROR_NUM
        try:print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
            response = self.session.post(self.press_url, data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
                ERROR_NUM +=1except Exception as e:print(e)
            ERROR_NUM +=1deftestonework(self):'''一次并发处理单个任务'''
        i =0while i < ONE_WORKER_NUM:
            i +=1
            self.testinterface()
        time.sleep(LOOP_SLEEP)defrun(self):'''使用多线程进程并发测试'''
        t1 = time.time()
        Threads =[]for i inrange(THREAD_NUM):
            t = threading.Thread(target=self.testonework, name="T"+str(i))
            t.setDaemon(True)
            Threads.append(t)for t in Threads:
            t.start()for t in Threads:
            t.join()
        t2 = time.time()print("===============压测结果===================")print("URL:", self.press_url)print("任务数量:", THREAD_NUM,"*", ONE_WORKER_NUM,"=", THREAD_NUM * ONE_WORKER_NUM)print("总耗时(秒):", t2 - t1)print("每次请求耗时(秒):",(t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM))print("每秒承载请求数:",1/((t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM)))print("错误数量:", ERROR_NUM)if __name__ =='__main__':
    login_url ='http://test/sale/login/login'
    press_url ='http://test/sale/hsOrder/afterExamineAdopt'
    phone ="150000000"
    password ="123456"

    THREAD_NUM =1# 并发线程总数
    ONE_WORKER_NUM =5# 每个线程的循环次数
    LOOP_SLEEP =0.1# 每次请求时间间隔(秒)
    ERROR_NUM =0# 出错数

    obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
    obj.login()
    obj.run()

二 多接口参数化(实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度)

#!/usr/bin/env python# -*- coding:utf-8 -*-import json
import random
import datetime

import requests
import threading
import time

classPresstest(object):
    headers ={'Content-Type':'application/json; charset=UTF-8'}def__init__(self, login_url, press_url, phone, password):
        self.login_url = login_url
        self.press_url = press_url
        self.phone = phone
        self.password = password
        self.session = requests.Session()
        self.session.headers = self.headers

    deflogin(self):'''登陆获取session'''
        data ={"mobile": self.phone,"pwd": self.password,"type":"1"}
        res = self.session.post(self.login_url, data=json.dumps(data))
        XToken = res.json().get('data').get('token')
        self.session.headers['token']= XToken

    deftestinterface(self):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
        data ={"id":418}global ERROR_NUM
        try:print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
            response = self.session.post(self.press_url, data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
                ERROR_NUM +=1except Exception as e:print(e)
            ERROR_NUM +=1deftestinterface2(self, url, data):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'# data = {"orderId": 1179, "cause": "让人"}global ERROR_NUM
        try:print('开始调接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))print("接口请求入参url,data==", url, data)
            response = self.session.post(url,
                                         data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
                ERROR_NUM +=1except Exception as e:print(e)
            ERROR_NUM +=1deftestonework(self, url, data):'''一次并发处理单个任务'''
        i =0while i < ONE_WORKER_NUM:
            i +=1
            self.testinterface2(url, data)
            self.testinterface()
        time.sleep(LOOP_SLEEP)defrun(self):'''使用多线程进程并发测试'''
        t1 = time.time()
        Threads =[]# 实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度
        data_list =[{"orderId":1194,"cause":"让人"},{"orderId":1193,"cause":"让人"},{"orderId":1192,"cause":"让人"}]
        url_list =["http://test/api1","http://test/api2","http://test/api3"]
        list_arr =list(range(0,len(data_list)))print("index========", list_arr)for i inrange(THREAD_NUM):
            index = random.choice(list_arr)
            list_arr.remove(index)
            t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T"+str(i))
            t.setDaemon(True)
            Threads.append(t)for t in Threads:
            t.start()for t in Threads:
            t.join()
        t2 = time.time()print("===============压测结果===================")print("URL:", self.press_url)print("任务数量:", THREAD_NUM,"*", ONE_WORKER_NUM,"=", THREAD_NUM * ONE_WORKER_NUM)print("总耗时(秒):", t2 - t1)print("每次请求耗时(秒):",(t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM))print("每秒承载请求数:",1/((t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM)))print("错误数量:", ERROR_NUM)if __name__ =='__main__':
    login_url ='http://test/login'
    press_url ='http://test/afterExamineAdopt'
    phone ="1500000000"
    password ="123456"

    THREAD_NUM =3# 并发线程总数
    ONE_WORKER_NUM =1# 每个线程的循环次数
    LOOP_SLEEP =0# 每次请求时间间隔(秒)
    ERROR_NUM =0# 出错数

    obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
    obj.login()
    obj.run()

三 多接口并发调用方法二

import datetime
import json

import requests
import threading
import time

defpost_request(url, data):
    start_time = time.time()print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
    response = requests.post(url, data=json.dumps(data), headers={"Content-Type":"application/json;charset=UTF-8"})print('调用结束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
    end_time = time.time()
    duration = end_time - start_time
    print("Response from", url,":", data, response.text)print("Request duration:", duration *1000,"ms")

data =[{"orderId":1194,"cause":"让人"},{"orderId":1193,"cause":"让人"},{"orderId":1192,"cause":"让人"}]
urls =["http://test/api1","http://test/api2","http://test/api3"]

threads =[]for i inrange(len(urls)):
    t = threading.Thread(target=post_request, args=(urls[i], data[i]))
    threads.append(t)
    t.start()for t in threads:
    t.join()

四 多接口同时并发(相当于集合点)(异步实现集合点)

import asyncio
import datetime
import aiohttp
asyncdefmake_request(session, url, data):print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))asyncwith session.post(url, json=data)as response:
        result =await response.json(content_type='text/html', encoding='utf-8')return result

asyncdefrun_concurrent_requests(urls, datas, max_concurrent_requests):
    headers ={"Content-Type":"application/json;charset=UTF-8"}
    tasks =[]asyncwith aiohttp.ClientSession()as session:
        session.headers = headers
        session.headers["token"]="BEE010F2A6D0696DD90A82FF28B21AF2"
        sem = asyncio.Semaphore(max_concurrent_requests)
        list_arr =list(range(0,len(datas)))for index in list_arr:await sem.acquire()
            task = asyncio.ensure_future(make_request(session, urls[index], datas[index]))
            task.add_done_callback(lambda t: sem.release())
            tasks.append(task)
        responses =await asyncio.gather(*tasks)return responses

if __name__ =='__main__':
    urls =['http://test/SubmitSettlement']*5
    datas =[{"idArr":[2086]},{"idArr":[2090]},{"idArr":[2089]},{"idArr":[2086]},{"idArr":[2090]}]
    max_concurrent_requests =5
    loop = asyncio.get_event_loop()
    responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests))print(responses)

在这里插入图片描述

标签: python 压力测试

本文转载自: https://blog.csdn.net/weixin_43006743/article/details/130892686
版权归原作者 @半良人 所有, 如有侵权,请联系我们删除。

“Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp”的评论:

还没有评论