拉取、入库的优化点:
- 请求接口:- 场景:原来一次请求一个月的数据,可能会面临接口卡顿的问题,而且数据量过大,有时候会溢出- 解决:可以考虑由一次请求 切分为 小而多次请求- 加个for循环,分为小批量去请求,避免接口数据量过大- 将多次请求采用多线程进行,避免单线程I/O阻塞( 可以异步操作,可以先发送请求而不等待结果返回)
with futures.ThreadPoolExecutor()as executor:# 可传入线程数# 传入执行的方法 和 参数列表,会等待所有线程执行完毕 query_info_list =list(executor.map(self.api.get_pipeline_info,query_list))
- 注意点:- 接口性能:需要接口方面也支持并发处理,不然你这边同时发送多个请求,接口方还是一个一个给你处理,时间上并无提升- 风险与收益并存:在这过程发生了错误,会比较难处理,不能第一时间捕捉,而且可能会一错毁所有- 限定场景:网上有传言python的多线程只用到了一个CPU核心,由于对python底层暂不了解,所以盲猜这里的多线程操作只适用于I/0密集的场景 - 数据入库:- 场景:入库时候可能会一次性插入上万条大数据,数据库和本地内存会承受较大压力- 解决:可以采用python yield关键字,把数据分片处理- 使用举例:
defchunk_request():for xxx in 一堆数据:yield xxx # 1. 先返回一个结果defchunk(iterator, bulk_size=50):# 2. 会等待,凑满50个yield的结果,然后统一返回""" 工具方法, 返回一批结果的迭代器 """for item in iterator:yield chain([item], islice(iterator, bulk_size -1))for items in chunk(self.chunk_request(start=start, end=end)):# 3. 收满50个yield为一组# 进行小批量入库操作 # obj.create(xxx)
- 对于yield自己的理解:有点像协程的概念,平时我们一个return,方法就直接结束,返回结果了,但是使用了yield就很好玩了,你可以先返回一个结果,然后再回到上次返回结果的地方,继续往下执行。所以利用这个特性,我们就可以将数据给分片处理了,先返回一小部分结果,然后入库,再返回执行,拿到下一批结果,再入库,而不是一次return一大堆数据。- 升级版本: 不仅做到控制数据小批量入库,还可以控制到源头数据小批量拉取(利用上边讲的特性,会返回方法里继续往下执行代码)-# 拉取:for 一次拉取 in 多次调用api拉取:yield 数据 # 这样可以做到先返回数据,再回来执行for循环 继续往下调用api拉取# 入库:for 数据 in chunk(一小批数据): 入库
标签:
python
本文转载自: https://blog.csdn.net/weixin_43960117/article/details/119044026
版权归原作者 lingling-fa 所有, 如有侵权,请联系我们删除。
版权归原作者 lingling-fa 所有, 如有侵权,请联系我们删除。