0


Python读取大json文件性能优化及入库(100G+)

业务背景

本次需求为数据迁移,采用的方式是脚本迁移。即我拿到导出来的数据后跑脚本入库,但个人觉得直接用内置接口快照恢复会更简便(泪目)。

阅读前提

本次的Json文件是一行一条数据。

遇到的问题和解决方法

读取文件的方式-> with open 优于 open,这个应该不用再细说了吧。

读取json文件时解析速度过慢,耗时多 -> 逐行读取

但这个逐行读取是有讲究的,一开始采用网上这种常见的方法:

with open('file.txt', 'r') as f:
    lines = f.readlines()
    for line in lines:
        print(line)

这种方法的本质是先将整个文件读取到一个大列表中,再遍历。但问题是,这个大列表是占用内存的,因此是高CPU开销的,但服务器上只有32G内存,我需要性能更好的代码。后来发现:

with open('file.txt', 'r') as f:
    for line in f:
        print(line)

使用以上方式更节省内存,并且是最优的,比分块读取,如f.read(8K)、f.readlines(1000)更优。原因在于,f为文件对象,采用缓存I/O来对文件进行读写操作,本身就具有内存管理,可以不用担心内存管理的问题。

文件读取的性能优化后,接下来是入库性能优化。

以前服务器资源充足时,一直使用bulk批处理 + 手动数据分批,直到这次才了解到流式批量执行(stream_bulk)、并发批量执行(parallel_bulk)。

首先阅读使用手册。发现python中yield的用法,可以编写数据迭代生成器,适合于批处理。于是进一步改进代码,和流式批处理结合进行。

from elasticsearch.helpers import streaming_bulk
from elasticsearch.helpers import parallel_bulk
from elasticsearch.helpers import bulk, scan

# 数据迭代生成器 
def generate_actions():
    with open('file.json') as f:
        for line in f:
            data = json.loads(line)
            yield data

# 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)

# 2. streaming_bulk
for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
    pass

# 3. bulk
bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())

代码详见我的GitHub仓库。记录了我在读取大文件时代码更迭的版本。


本文转载自: https://blog.csdn.net/duduoott/article/details/135893144
版权归原作者 duduoott 所有, 如有侵权,请联系我们删除。

“Python读取大json文件性能优化及入库(100G+)”的评论:

还没有评论