分析结论
多进程可以实现逐行遍历同一个文件(可以保证每一个进程遍历的行都是完整且互不重复的),且可以提高遍历性能。
多进程 / 多线程遍历文件速度
- 单进程、多线程读取同一个文件时,每个线程的运行时间并不能随线程数的增加的降低;
- 多进程读取同一个文件时,每个进程的运行时间随线程数的增加而降低。
进一步优化方法
通过统计读取到的字符串长度,计算当前文件指针位置,从而避免在每次遍历中均需使用
file.tell()
获取当前文件指针位置。
分析过程
构造 11202 MB、691130 行的测试数据。具体测试数据特征如下:
- 文件大小:11746098941 Bytes(11202 MB)
- 行数:691130
import time
1. 单进程、单线程遍历文件
t1 = time.time()withopen(path,"r", encoding="UTF-8")asfile:for _ infile:pass
t2 = time.time()print(t2 - t1)
运行时间:21.79 秒(三次测试 23.55、20.84、21.00 取平均值)
2. 多线程遍历文件
import os
from theading import Thread
定义每个线程 / 进程的遍历函数如下:通过捕获
UnicodeDecodeError
异常,避免出现刚好切分到半个字的情况;通过在遍历前先
file.readline()
,使每一个切分点中尚未结束的行一定归属上一进程 / 线程而不是下一进程 / 线程,从而保证每一个进程 / 线程遍历的每一行都是完整且互不重复的。
defread(path, start, end):"""每个进程 / 线程的遍历函数
Parameters
----------
path : str
文件路径
start : int
本分块的开始位置
end : int
本分块的结束位置
"""
cnt =0withopen(path,"r", encoding="UTF-8")asfile:file.seek(start)# 移动到目标位置if start !=0:whileTrue:try:file.readline()# 跳过当前行(所有未遍历完的行属于上一段)breakexcept UnicodeDecodeError:# 刚好切分到半个字,向后移动一个字符file.seek(start +1)whilefile.tell()<= end:file.readline()
cnt +=1
定义多线程遍历函数:
defmulti_thread_load_csv(path:str, n_thread:int):"""多线程遍历函数"""
size = os.path.getsize(path)# 获取文件大小用于分块
thread_lst =[]for i inrange(n_thread):
s = size // n_thread * i # 计算当前分块开始位置
e = size // n_thread *(i +1)# 计算当前分块结束位置
thread = Thread(target=read, args=(s, e))
thread.start()
thread_lst.append(thread)for thread in thread_lst:
thread.join()
测试线程数为 1 - 10 之间的读取时间,每种线程数测试 10 次,测试代码及结果如下:
import numpy as np
for k inrange(1,11):
use_time =[]for _ inrange(10):
t1 = time.time()
multi_thread_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)print(f"线程={k} 平均时间={np.average(use_time)} 标准差={np.std(use_time)}")
线程数用时的平均值用时的标准差149.08410.6299253.21891.4267353.52901.3273456.49231.4843556.66793.2745656.51641.7789758.23521.1148858.23530.6817960.98961.33651064.20632.3251
因为在每一行的遍历中需要增加一次
file.tell()
,所以单线程时的速度相较于直接读取会更慢。
3. 多进程遍历文件
from multiprocessing import Process
定义多进程遍历函数:
defmulti_process_load_csv(path:str, n_process:int):"""多进程遍历函数"""
size = os.path.getsize(path)# 获取文件大小用于分块
process_lst =[]for i inrange(n_process):
s = size // n_process * i # 计算当前分块开始位置
e = size // n_process *(i +1)# 计算当前分块结束位置
process = Process(target=read, args=(s, e))
process.start()
process_lst.append(process)for process in process_lst:
process.join()
测试线程数为 1 - 10 之间的读取时间,每种线程数测试 10 次,测试代码及结果如下:
import numpy as np
for k inrange(1,11):
use_time =[]for _ inrange(10):
t1 = time.time()
multi_process_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)print(f"线程={k} 平均时间={np.average(use_time)} 标准差={np.std(use_time)}")
进程数用时的平均值用时的标准差150.15610.8431226.50890.5581317.76630.2771413.43380.3024510.76540.295069.19500.347177.71600.176487.03210.193896.34840.2150105.63540.1271115.12830.2361124.78410.0512134.51490.2186144.15250.0533153.95540.1442163.84810.1167173.64550.0763183.40300.0255193.37320.2159203.19330.0674213.00910.0845222.92350.0646232.94740.2234242.75000.0382252.65920.0340262.56870.0333272.62730.3457282.43430.0253292.36470.0223302.25720.0343
因为在每一行的遍历中需要增加一次
file.tell()
,所以单进程时的速度相较于直接读取会更慢。
版权归原作者 长行 所有, 如有侵权,请联系我们删除。