业务背景
本次需求为数据迁移,采用的方式是脚本迁移。即我拿到导出来的数据后跑脚本入库,但个人觉得直接用内置接口快照恢复会更简便(泪目)。
阅读前提
本次的Json文件是一行一条数据。
遇到的问题和解决方法
读取文件的方式-> with open 优于 open,这个应该不用再细说了吧。
读取json文件时解析速度过慢,耗时多 -> 逐行读取
但这个逐行读取是有讲究的,一开始采用网上这种常见的方法:
with open('file.txt', 'r') as f:
lines = f.readlines()
for line in lines:
print(line)
这种方法的本质是先将整个文件读取到一个大列表中,再遍历。但问题是,这个大列表是占用内存的,因此是高CPU开销的,但服务器上只有32G内存,我需要性能更好的代码。后来发现:
with open('file.txt', 'r') as f:
for line in f:
print(line)
使用以上方式更节省内存,并且是最优的,比分块读取,如f.read(8K)、f.readlines(1000)更优。原因在于,f为文件对象,采用缓存I/O来对文件进行读写操作,本身就具有内存管理,可以不用担心内存管理的问题。
文件读取的性能优化后,接下来是入库性能优化。
以前服务器资源充足时,一直使用bulk批处理 + 手动数据分批,直到这次才了解到流式批量执行(stream_bulk)、并发批量执行(parallel_bulk)。
首先阅读使用手册。发现python中yield的用法,可以编写数据迭代生成器,适合于批处理。于是进一步改进代码,和流式批处理结合进行。
from elasticsearch.helpers import streaming_bulk
from elasticsearch.helpers import parallel_bulk
from elasticsearch.helpers import bulk, scan
# 数据迭代生成器
def generate_actions():
with open('file.json') as f:
for line in f:
data = json.loads(line)
yield data
# 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)
# 2. streaming_bulk
for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
pass
# 3. bulk
bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())
代码详见我的GitHub仓库。记录了我在读取大文件时代码更迭的版本。
版权归原作者 duduoott 所有, 如有侵权,请联系我们删除。