并发接口测试
1 | import http.client |
上面通过创建多线程的方式来并发,线程多可能会遇上电脑性能瓶颈,下面采用第三方包异步并发的方式适用于大量的情况请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 import aiohttp
import asyncio
import time
# 设置要请求的接口信息
REQUEST_URL = 'http://xxx.xx.xx.xxx:9011/api/v1/schedules/queryPage?pageNum=1&pageSize=10&sortName=createdTime&sortType=desc' # 请替换为你的接口地址
TOKEN = 'eyJUeXBlIjoiSnd0IiwidHlwIjoiSldUIiwiYWxnIjoiSFMyNTYifQ' # 请替换为你的token
# 设置并发请求的次数
concurrency = 100 # 可以根据需要调整并发次数
# 记录响应时间
response_times = []
async def fetch(session):
start_time = time.time() # 记录开始时间
headers = {
'Content-Type': 'application/json;charset=UTF-8',
'token': TOKEN
}
async with session.get(REQUEST_URL, headers=headers) as response:
end_time = time.time() # 记录结束时间
response_times.append(end_time - start_time) # 添加响应时间
if response.status != 200:
content = await response.text()
print(f'请求失败: 状态码 {response.status}, 响应内容: {content}') # 打印失败日志
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session) for _ in range(concurrency)]
await asyncio.gather(*tasks)
# 运行主函数并记录响应时间
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
# 计算最大、最小和平均响应时间
if response_times:
max_time = max(response_times)
min_time = min(response_times)
avg_time = sum(response_times) / len(response_times)
print(f'总请求数: {concurrency}')
print(f'最大响应时间: {max_time:.4f} 秒')
print(f'最小响应时间: {min_time:.4f} 秒')
print(f'平均响应时间: {avg_time:.4f} 秒')
print(f'总耗时: {end - start:.4f} 秒')
else:
print('没有成功的响应时间记录.')
1 pip install aiohttp -i https://pypi.tuna.tsinghua.edu.cn/simple
本文作者:
ionluo
本文链接: http://www.ionluo.cn/blog/posts/1d5fcbfb.html
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
本文链接: http://www.ionluo.cn/blog/posts/1d5fcbfb.html
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
