一 单接口
#!/usr/bin/env python# -*- coding:utf-8 -*-import json
import random
import datetime
import requests
import threading
import time
classPresstest(object):
headers ={'Content-Type':'application/json; charset=UTF-8'}def__init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
deflogin(self):'''登陆获取session'''
data ={"mobile": self.phone,"pwd": self.password,"type":"1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token']= XToken
deftestinterface(self):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data ={"id":418}global ERROR_NUM
try:print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
ERROR_NUM +=1except Exception as e:print(e)
ERROR_NUM +=1deftestonework(self):'''一次并发处理单个任务'''
i =0while i < ONE_WORKER_NUM:
i +=1
self.testinterface()
time.sleep(LOOP_SLEEP)defrun(self):'''使用多线程进程并发测试'''
t1 = time.time()
Threads =[]for i inrange(THREAD_NUM):
t = threading.Thread(target=self.testonework, name="T"+str(i))
t.setDaemon(True)
Threads.append(t)for t in Threads:
t.start()for t in Threads:
t.join()
t2 = time.time()print("===============压测结果===================")print("URL:", self.press_url)print("任务数量:", THREAD_NUM,"*", ONE_WORKER_NUM,"=", THREAD_NUM * ONE_WORKER_NUM)print("总耗时(秒):", t2 - t1)print("每次请求耗时(秒):",(t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM))print("每秒承载请求数:",1/((t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM)))print("错误数量:", ERROR_NUM)if __name__ =='__main__':
login_url ='http://test/sale/login/login'
press_url ='http://test/sale/hsOrder/afterExamineAdopt'
phone ="150000000"
password ="123456"
THREAD_NUM =1# 并发线程总数
ONE_WORKER_NUM =5# 每个线程的循环次数
LOOP_SLEEP =0.1# 每次请求时间间隔(秒)
ERROR_NUM =0# 出错数
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
二 多接口参数化(实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度)
#!/usr/bin/env python# -*- coding:utf-8 -*-import json
import random
import datetime
import requests
import threading
import time
classPresstest(object):
headers ={'Content-Type':'application/json; charset=UTF-8'}def__init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
deflogin(self):'''登陆获取session'''
data ={"mobile": self.phone,"pwd": self.password,"type":"1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token']= XToken
deftestinterface(self):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data ={"id":418}global ERROR_NUM
try:print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
ERROR_NUM +=1except Exception as e:print(e)
ERROR_NUM +=1deftestinterface2(self, url, data):'''压测接口'''# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'# data = {"orderId": 1179, "cause": "让人"}global ERROR_NUM
try:print('开始调接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))print("接口请求入参url,data==", url, data)
response = self.session.post(url,
data=json.dumps(data))if response.json().get('code')!=0:print(response.json())
ERROR_NUM +=1except Exception as e:print(e)
ERROR_NUM +=1deftestonework(self, url, data):'''一次并发处理单个任务'''
i =0while i < ONE_WORKER_NUM:
i +=1
self.testinterface2(url, data)
self.testinterface()
time.sleep(LOOP_SLEEP)defrun(self):'''使用多线程进程并发测试'''
t1 = time.time()
Threads =[]# 实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度
data_list =[{"orderId":1194,"cause":"让人"},{"orderId":1193,"cause":"让人"},{"orderId":1192,"cause":"让人"}]
url_list =["http://test/api1","http://test/api2","http://test/api3"]
list_arr =list(range(0,len(data_list)))print("index========", list_arr)for i inrange(THREAD_NUM):
index = random.choice(list_arr)
list_arr.remove(index)
t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T"+str(i))
t.setDaemon(True)
Threads.append(t)for t in Threads:
t.start()for t in Threads:
t.join()
t2 = time.time()print("===============压测结果===================")print("URL:", self.press_url)print("任务数量:", THREAD_NUM,"*", ONE_WORKER_NUM,"=", THREAD_NUM * ONE_WORKER_NUM)print("总耗时(秒):", t2 - t1)print("每次请求耗时(秒):",(t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM))print("每秒承载请求数:",1/((t2 - t1)/(THREAD_NUM * ONE_WORKER_NUM)))print("错误数量:", ERROR_NUM)if __name__ =='__main__':
login_url ='http://test/login'
press_url ='http://test/afterExamineAdopt'
phone ="1500000000"
password ="123456"
THREAD_NUM =3# 并发线程总数
ONE_WORKER_NUM =1# 每个线程的循环次数
LOOP_SLEEP =0# 每次请求时间间隔(秒)
ERROR_NUM =0# 出错数
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
三 多接口并发调用方法二
import datetime
import json
import requests
import threading
import time
defpost_request(url, data):
start_time = time.time()print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = requests.post(url, data=json.dumps(data), headers={"Content-Type":"application/json;charset=UTF-8"})print('调用结束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
end_time = time.time()
duration = end_time - start_time
print("Response from", url,":", data, response.text)print("Request duration:", duration *1000,"ms")
data =[{"orderId":1194,"cause":"让人"},{"orderId":1193,"cause":"让人"},{"orderId":1192,"cause":"让人"}]
urls =["http://test/api1","http://test/api2","http://test/api3"]
threads =[]for i inrange(len(urls)):
t = threading.Thread(target=post_request, args=(urls[i], data[i]))
threads.append(t)
t.start()for t in threads:
t.join()
四 多接口同时并发(相当于集合点)(异步实现集合点)
import asyncio
import datetime
import aiohttp
asyncdefmake_request(session, url, data):print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))asyncwith session.post(url, json=data)as response:
result =await response.json(content_type='text/html', encoding='utf-8')return result
asyncdefrun_concurrent_requests(urls, datas, max_concurrent_requests):
headers ={"Content-Type":"application/json;charset=UTF-8"}
tasks =[]asyncwith aiohttp.ClientSession()as session:
session.headers = headers
session.headers["token"]="BEE010F2A6D0696DD90A82FF28B21AF2"
sem = asyncio.Semaphore(max_concurrent_requests)
list_arr =list(range(0,len(datas)))for index in list_arr:await sem.acquire()
task = asyncio.ensure_future(make_request(session, urls[index], datas[index]))
task.add_done_callback(lambda t: sem.release())
tasks.append(task)
responses =await asyncio.gather(*tasks)return responses
if __name__ =='__main__':
urls =['http://test/SubmitSettlement']*5
datas =[{"idArr":[2086]},{"idArr":[2090]},{"idArr":[2089]},{"idArr":[2086]},{"idArr":[2090]}]
max_concurrent_requests =5
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests))print(responses)
本文转载自: https://blog.csdn.net/weixin_43006743/article/details/130892686
版权归原作者 @半良人 所有, 如有侵权,请联系我们删除。
版权归原作者 @半良人 所有, 如有侵权,请联系我们删除。