0


实际业务读取Hive数据库(2023年2月)

背景:

在这篇文章之前,我读取数据库的数据没有形成规范,并且代码扩展性不好,使用率不高,而且比较混乱。数据库信息的替换也比较混乱。坏习惯包括:连接数据库之后就开始读数,读完就结束,数据的存放也没有规范,而且容易重复读取。

现在将代码分为几层,一层是底层,就是单独连接数据库,在这基础上封装第二个类别,加上了线程锁和时间表,用于确保读数的稳定和超时错误提醒。第三层才是真正的业务,第三层的类里面封装了很多读取不同数据表的方法,每一个方法就是读一个表,然后将数据缓存起来,并且设置好更新数据缓存的时间(例如24小时),和维护多线程读数。

第四层也就是简单的调用第三层即可,然后所有的数据都可以读取然后缓存到我们在配置项中指定的文件夹目录了

文章目录

实际业务读取hive数据库的代码

  1. import logging
  2. import pandas as pd
  3. from impala.dbapi import connect
  4. import sqlalchemy
  5. from sqlalchemy.orm import sessionmaker
  6. import os
  7. import time
  8. import os
  9. import datetime
  10. from dateutil.relativedelta import relativedelta
  11. from typing import Dict, List
  12. import logging
  13. import threading
  14. import pandas as pd
  15. import pickle
  16. classHiveHelper(object):def__init__(
  17. self,
  18. host='10.2.32.22',
  19. port=21051,
  20. database='ur_ai_dw',
  21. auth_mechanism='LDAP',
  22. user='urbi',
  23. password='Ur#730xd',
  24. logger:logging.Logger=None):
  25. self.host = host
  26. self.port = port
  27. self.database = database
  28. self.auth_mechanism = auth_mechanism
  29. self.user = user
  30. self.password = password
  31. self.logger = logger
  32. self.impala_conn =None
  33. self.conn =None
  34. self.cursor =None
  35. self.engine =None
  36. self.session =Nonedefcreate_table_code(self, file_name):'''创建表类代码'''
  37. os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conn
  38. defget_conn(self):'''创建连接或获取连接'''if self.conn isNone:
  39. engine = self.get_engine()
  40. self.conn = engine.connect()return self.conn
  41. defget_impala_conn(self):'''创建连接或获取连接'''if self.impala_conn isNone:
  42. self.impala_conn = connect(
  43. host=self.host,
  44. port=self.port,
  45. database=self.database,
  46. auth_mechanism=self.auth_mechanism,
  47. user=self.user,
  48. password=self.password
  49. )return self.impala_conn
  50. defget_engine(self):'''创建连接或获取连接'''if self.engine isNone:
  51. self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)return self.engine
  52. defget_cursor(self):'''创建连接或获取连接'''if self.cursor isNone:
  53. self.cursor = self.conn.cursor()return self.cursor
  54. defget_session(self)-> sessionmaker:'''创建连接或获取连接'''if self.session isNone:
  55. engine = self.get_engine()
  56. Session = sessionmaker(bind=engine)
  57. self.session = Session()return self.session
  58. defclose_conn(self):'''关闭连接'''if self.conn isnotNone:
  59. self.conn.close()
  60. self.conn =None
  61. self.dispose_engine()
  62. self.close_impala_conn()defclose_impala_conn(self):'''关闭impala连接'''if self.impala_conn isnotNone:
  63. self.impala_conn.close()
  64. self.impala_conn =Nonedefclose_session(self):'''关闭连接'''if self.session isnotNone:
  65. self.session.close()
  66. self.session =None
  67. self.dispose_engine()defdispose_engine(self):'''释放engine'''if self.engine isnotNone:# self.engine.dispose(close=False)
  68. self.engine.dispose()
  69. self.engine =Nonedefclose_cursor(self):'''关闭cursor'''if self.cursor isnotNone:
  70. self.cursor.close()
  71. self.cursor =Nonedefget_data(self, sql, auto_close=True)-> pd.DataFrame:'''查询数据'''
  72. conn = self.get_conn()
  73. data =Nonetry:# 异常重试3for i inrange(3):try:
  74. data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i ==2:raise ex # 往外抛出异常
  75. time.sleep(60)# 一分钟后重试except Exception as ex:
  76. self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:
  77. self.close_conn()return data
  78. passclassVarsHelper():def__init__(self, save_dir, auto_save=True):
  79. self.save_dir = save_dir
  80. self.auto_save = auto_save
  81. self.values ={}ifnot os.path.exists(os.path.dirname(self.save_dir)):
  82. os.makedirs(os.path.dirname(self.save_dir))if os.path.exists(self.save_dir):withopen(self.save_dir,'rb')as f:
  83. self.values = pickle.load(f)
  84. f.close()defset_value(self, key, value):
  85. self.values[key]= value
  86. if self.auto_save:
  87. self.save_file()defget_value(self, key):return self.values[key]defhas_key(self, key):return key in self.values.keys()defsave_file(self):withopen(self.save_dir,'wb')as f:
  88. pickle.dump(self.values, f)
  89. f.close()passclassGlobalShareArgs():
  90. args ={"debug":False}defget_args():return GlobalShareArgs.args
  91. defset_args(args):
  92. GlobalShareArgs.args = args
  93. defset_args_value(key, value):
  94. GlobalShareArgs.args[key]= value
  95. defget_args_value(key, default_value=None):return GlobalShareArgs.args.get(key, default_value)defcontain_key(key):return key in GlobalShareArgs.args.keys()defupdate(args):
  96. GlobalShareArgs.args.update(args)passclassShareArgs():
  97. args ={"labels_dir":"./hjx/shop_group/month_w_amt/data/labels",# 标签目录"labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output",# 聚类导出标签目录"common_datas_dir":"./hjx/data",# 共用数据目录。ur_bi_dw的公共"only_predict":False,# 只识别,不训练"delete_model":True,# 先删除模型,仅在训练时使用"export_excel":False,# 导出excel"classes":12,# 聚类数"batch_size":16,"hidden_size":32,"max_nrof_epochs":100,"learning_rate":0.0005,"loss_type":"categorical_crossentropy","avg_model_num":10,"steps_per_epoch":4.0,# 4.0"lr_callback_patience":4,"lr_callback_cooldown":1,"early_stopping_callback_patience":6,"get_data":True,}defget_args():return ShareArgs.args
  98. defset_args(args):
  99. ShareArgs.args = args
  100. defset_args_value(key, value):
  101. ShareArgs.args[key]= value
  102. defget_args_value(key, default_value=None):return ShareArgs.args.get(key, default_value)defcontain_key(key):return key in ShareArgs.args.keys()defupdate(args):
  103. ShareArgs.args.update(args)passclassUrBiGetDatasBase():# 线程锁列表,同保存路径共用锁
  104. lock_dict:Dict[str, threading.Lock]={}# 时间列表,用于判断是否超时
  105. time_dict:Dict[str, datetime.datetime]={}# 用于记录是否需要更新超时时间
  106. get_data_timeout_dict:Dict[str,bool]={}def__init__(
  107. self,
  108. host='10.2.32.22',
  109. port=21051,
  110. database='ur_ai_dw',
  111. auth_mechanism='LDAP',
  112. user='urbi',
  113. password='Ur#730xd',
  114. save_dir=None,
  115. logger:logging.Logger=None,):
  116. self.save_dir = save_dir
  117. self.logger = logger
  118. self.db_helper = HiveHelper(
  119. host=host,
  120. port=port,
  121. database=database,
  122. auth_mechanism=auth_mechanism,
  123. user=user,
  124. password=password,
  125. logger=logger
  126. )# 创建子目录if self.save_dir isnotNoneandnot os.path.exists(self.save_dir):
  127. os.makedirs(self.save_dir)
  128. self.vars_helper =Noneif GlobalShareArgs.get_args_value('debug'):
  129. self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')defclose(self):'''关闭连接'''
  130. self.db_helper.close_conn()defget_last_time(self, key_name)->bool:'''获取是否超时'''# 转静态路径,确保唯一性
  131. key_name = os.path.abspath(key_name)if self.vars_helper isnotNoneand self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
  132. UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
  133. timeout =12# 12小时if GlobalShareArgs.get_args_value('debug'):
  134. timeout =24# 24小时
  135. get_data_timeout =Falseif key_name notin UrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()- UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
  136. self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)# UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
  137. get_data_timeout =Trueelse:
  138. self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)# if self.vars_helper is not None :# self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
  139. UrBiGetDatasBase.get_data_timeout_dict[key_name]= get_data_timeout
  140. return get_data_timeout
  141. defsave_last_time(self, key_name):'''更新状态超时'''# 转静态路径,确保唯一性
  142. key_name = os.path.abspath(key_name)if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
  143. UrBiGetDatasBase.time_dict[key_name]= datetime.datetime.today()if self.vars_helper isnotNone:
  144. UrBiGetDatasBase.time_dict[key_name]= datetime.datetime.today()
  145. self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)defget_lock(self, key_name)-> threading.Lock:'''获取锁'''# 转静态路径,确保唯一性
  146. key_name = os.path.abspath(key_name)if key_name notin UrBiGetDatasBase.lock_dict.keys():
  147. UrBiGetDatasBase.lock_dict[key_name]= threading.Lock()return UrBiGetDatasBase.lock_dict[key_name]defget_data_of_date(
  148. self,
  149. save_dir,
  150. sql,
  151. sort_columns:List[str],
  152. del_index_list=[-1],# 删除最后下标
  153. start_date = datetime.datetime(2017,1,1),# 开始时间
  154. offset = relativedelta(months=3),# 时间间隔
  155. date_format_fun =lambda d:'%04d%02d01'%(d.year, d.month),# 查询语句中替代时间参数的格式化
  156. filename_format_fun =lambda d:'%04d%02d.csv'%(d.year, d.month),# 查询语句中替代时间参数的格式化
  157. stop_date ='20700101',# 超过时间则停止
  158. data_format_fun =None,# 格式化数据):'''分时间增量读取数据'''# 创建文件夹ifnot os.path.exists(save_dir):
  159. os.makedirs(save_dir)else:#删除最后一个文件
  160. file_list = os.listdir(save_dir)iflen(file_list)>0:
  161. file_list.sort()for del_index in del_index_list:
  162. os.remove(os.path.join(save_dir,file_list[del_index]))print('删除最后一个文件:', file_list[del_index])
  163. select_index =-1# start_date = datetime.datetime(2017, 1, 1)whileTrue:
  164. end_date = start_date + offset
  165. start_date_str = date_format_fun(start_date)
  166. end_date_str = date_format_fun(end_date)
  167. self.logger.info('date: %s-%s', start_date_str, end_date_str)
  168. file_path = os.path.join(save_dir, filename_format_fun(start_date))# self.logger.info('file_path: %s', file_path)ifnot os.path.exists(file_path):
  169. data:pd.DataFrame = self.db_helper.get_data(sql %(start_date_str, end_date_str))if data isNone:break
  170. self.logger.info('data: %d',len(data))# self.logger.info('data: %d', data.columns)iflen(data)>0:
  171. select_index+=1if data_format_fun isnotNone:
  172. data = data_format_fun(data)# 排序
  173. data = data.sort_values(sort_columns)
  174. data.to_csv(file_path)elif select_index!=-1:breakelif stop_date < start_date_str:raise Exception("读取数据异常,时间超出最大值!")
  175. start_date = end_date
  176. passclassUrBiGetDatas(UrBiGetDatasBase):def__init__(
  177. self,
  178. host='10.2.32.22',
  179. port=21051,
  180. database='ur_ai_dw',
  181. auth_mechanism='LDAP',
  182. user='urbi',
  183. password='Ur#730xd',
  184. save_dir='./hjx/data/ur_bi_dw_data',
  185. logger:logging.Logger=None):
  186. self.save_dir = save_dir
  187. self.logger = logger
  188. super().__init__(
  189. host=host,
  190. port=port,
  191. database=database,
  192. auth_mechanism=auth_mechanism,
  193. user=user,
  194. password=password,
  195. save_dir=save_dir,
  196. logger=logger
  197. )defget_dim_date(self):'''日期数据'''
  198. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
  199. now_lock = self.get_lock(file_path)
  200. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  201. sql ='SELECT * FROM ur_bi_dw.dim_date'
  202. data:pd.DataFrame = self.db_helper.get_data(sql)
  203. columns =list(data.columns)
  204. columns ={c:'dim_date.'+c for c in columns}
  205. data = data.rename(columns=columns)
  206. data = data.sort_values(['dim_date.date_key'])
  207. data.to_csv(file_path)# 更新超时时间
  208. self.save_last_time(file_path)except Exception as ex:
  209. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  210. now_lock.release()# 释放锁defget_dim_shop(self):'''店铺数据'''
  211. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
  212. now_lock = self.get_lock(file_path)
  213. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  214. sql ='SELECT * FROM ur_bi_dw.dim_shop'
  215. data:pd.DataFrame = self.db_helper.get_data(sql)
  216. columns =list(data.columns)
  217. columns ={c:'dim_shop.'+c for c in columns}
  218. data = data.rename(columns=columns)
  219. data = data.sort_values(['dim_shop.shop_no'])
  220. data.to_csv(file_path)# 更新超时时间
  221. self.save_last_time(file_path)except Exception as ex:
  222. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  223. now_lock.release()# 释放锁defget_dim_vip(self):'''会员数据'''
  224. sub_dir = os.path.join(self.save_dir,'vip_no')
  225. now_lock = self.get_lock(sub_dir)
  226. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(sub_dir):return
  227. sql ='''SELECT dv.*, dd.date_key, dd.date_name2
  228. FROM ur_bi_dw.dim_vip as dv
  229. INNER JOIN ur_bi_dw.dim_date as dd
  230. ON dv.card_create_date=dd.date_name2
  231. where dd.date_key >= %s
  232. and dd.date_key < %s'''# data:pd.DataFrame = self.db_helper.get_data(sql)
  233. sort_columns =['dv.vip_no']# TODO:
  234. self.get_data_of_date(
  235. save_dir=sub_dir,
  236. sql=sql,
  237. sort_columns=sort_columns,
  238. start_date=datetime.datetime(2017,1,1),# 开始时间
  239. offset=relativedelta(years=1))# 更新超时时间
  240. self.save_last_time(sub_dir)except Exception as ex:
  241. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  242. now_lock.release()# 释放锁defget_weather(self):'''天气数据'''
  243. sub_dir = os.path.join(self.save_dir,'weather')
  244. now_lock = self.get_lock(sub_dir)
  245. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(sub_dir):return
  246. sql ="""
  247. select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
  248. where weather.date_key>=%s and weather.date_key<%s
  249. """
  250. sort_columns =['weather.date_key','weather.areaid']defdata_format_fun(data):
  251. columns =list(data.columns)
  252. columns ={c:'weather.'+c for c in columns}
  253. data = data.rename(columns=columns)return data
  254. self.get_data_of_date(
  255. save_dir=sub_dir,
  256. sql=sql,
  257. sort_columns=sort_columns,
  258. del_index_list=[-2,-1],# 删除最后下标
  259. data_format_fun=data_format_fun,)# 更新超时时间
  260. self.save_last_time(sub_dir)except Exception as ex:
  261. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  262. now_lock.release()# 释放锁defget_weather_city(self):'''天气城市数据'''
  263. file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
  264. now_lock = self.get_lock(file_path)
  265. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  266. sql ='SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'
  267. data:pd.DataFrame = self.db_helper.get_data(sql)
  268. columns =list(data.columns)
  269. columns ={c:'weather_city.'+c for c in columns}
  270. data = data.rename(columns=columns)
  271. data.to_csv(file_path)# 更新超时时间
  272. self.save_last_time(file_path)except Exception as ex:
  273. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  274. now_lock.release()# 释放锁defget_dim_goods(self):'''货品数据'''
  275. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
  276. now_lock = self.get_lock(file_path)
  277. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  278. sql ='SELECT * FROM ur_bi_dw.dim_goods'
  279. data:pd.DataFrame = self.db_helper.get_data(sql)
  280. columns =list(data.columns)
  281. columns ={c:'dim_goods.'+c for c in columns}
  282. data = data.rename(columns=columns)
  283. data.to_csv(file_path)# 更新超时时间
  284. self.save_last_time(file_path)except Exception as ex:
  285. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  286. now_lock.release()# 释放锁defget_dim_goods_market_shop_date(self):'''店铺商品生命周期数据'''
  287. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
  288. now_lock = self.get_lock(file_path)
  289. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return# sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
  290. sql ='''
  291. select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
  292. FROM ur_bi_dw.dim_goods_market_shop_date
  293. where lifecycle_end_date is not null
  294. '''
  295. data:pd.DataFrame = self.db_helper.get_data(sql)
  296. columns =list(data.columns)
  297. columns ={c:c.replace('lifecycle_end_date.','')for c in columns}
  298. data = data.rename(columns=columns)
  299. data = data.sort_values(['shop_market_date'])
  300. data.to_csv(file_path, index=False)# 更新超时时间
  301. self.save_last_time(file_path)except Exception as ex:
  302. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  303. now_lock.release()# 释放锁defget_dim_goods_market_date(self):'''全国商品生命周期数据'''
  304. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
  305. now_lock = self.get_lock(file_path)
  306. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  307. sql ='''
  308. select * FROM ur_bi_dw.dim_goods_market_date
  309. '''
  310. data:pd.DataFrame = self.db_helper.get_data(sql)
  311. columns =list(data.columns)
  312. columns ={c:'dim_goods_market_date.'+c for c in columns}
  313. data = data.rename(columns=columns)
  314. data = data.sort_values(['dim_goods_market_date.sku_no'])
  315. data.to_csv(file_path, index=False)# 更新超时时间
  316. self.save_last_time(file_path)except Exception as ex:
  317. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  318. now_lock.release()# 释放锁defget_dim_goods_color_dev_sizes(self):'''商品开发码数数据'''
  319. file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
  320. now_lock = self.get_lock(file_path)
  321. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return# sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
  322. sql ='SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
  323. data:pd.DataFrame = self.db_helper.get_data(sql)
  324. columns =list(data.columns)
  325. columns ={c:c.replace('dim_goods_color_dev_sizes.','')for c in columns}
  326. data = data.rename(columns=columns)
  327. data.to_csv(file_path, index=False)# 更新超时时间
  328. self.save_last_time(file_path)except Exception as ex:
  329. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  330. now_lock.release()# 释放锁defget_dwd_daily_sales_size(self):'''实际销售金额'''
  331. sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
  332. now_lock = self.get_lock(sub_dir)
  333. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(sub_dir):return
  334. sql ="""
  335. select shop_no,sku_no,date_key,`size`,
  336. sum(tag_price) as `tag_price`,
  337. sum(sales_qty) as `sales_qty`,
  338. sum(sales_tag_amt) as `sales_tag_amt`,
  339. sum(sales_amt) as `sales_amt`,
  340. count(0) as `sales_count`
  341. from ur_bi_dw.dwd_daily_sales_size as sales
  342. where sales.date_key>=%s and sales.date_key<%s
  343. and sales.currency_code='CNY'
  344. group by shop_no,sku_no,date_key,`size`
  345. """
  346. sort_columns =['date_key','shop_no','sku_no']
  347. self.get_data_of_date(
  348. save_dir=sub_dir,
  349. sql=sql,
  350. sort_columns=sort_columns,
  351. start_date=datetime.datetime(2017,1,1),# 开始时间)# 更新超时时间
  352. self.save_last_time(sub_dir)except Exception as ex:
  353. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  354. now_lock.release()# 释放锁defget_dwd_daily_delivery_size(self):'''实际配货金额'''
  355. sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
  356. now_lock = self.get_lock(sub_dir)
  357. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(sub_dir):return
  358. sql ="""
  359. select shop_no,sku_no,date_key,`size`,
  360. sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
  361. sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
  362. sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
  363. sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
  364. sum(delivery.pr_received_qty) as `pr_received_qty`,
  365. count(0) as `delivery_count`
  366. from ur_bi_dw.dwd_daily_delivery_size as delivery
  367. where delivery.date_key>=%s and delivery.date_key<%s
  368. and delivery.currency_code='CNY'
  369. group by shop_no,sku_no,date_key,`size`
  370. """
  371. sort_columns =['date_key','shop_no','sku_no']
  372. self.get_data_of_date(
  373. save_dir=sub_dir,
  374. sql=sql,
  375. sort_columns=sort_columns,
  376. start_date=datetime.datetime(2017,1,1),# 开始时间)# 更新超时时间
  377. self.save_last_time(sub_dir)except Exception as ex:
  378. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  379. now_lock.release()# 释放锁defget_v_last_nation_sales_status(self):'''商品畅滞销数据'''
  380. file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
  381. now_lock = self.get_lock(file_path)
  382. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  383. sql ='SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
  384. data:pd.DataFrame = self.db_helper.get_data(sql)
  385. columns =list(data.columns)
  386. columns ={c:c.replace('v_last_nation_sales_status.','')for c in columns}
  387. data = data.rename(columns=columns)
  388. data.to_csv(file_path, index=False)# 更新超时时间
  389. self.save_last_time(file_path)except Exception as ex:
  390. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  391. now_lock.release()# 释放锁defget_dwd_daily_finacial_goods(self):'''商品成本价数据'''
  392. file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
  393. now_lock = self.get_lock(file_path)
  394. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  395. sql ="""
  396. select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
  397. inner join (
  398. select sku_no,`size`,max(date_key) as date_key
  399. from ur_bi_dw.dwd_daily_finacial_goods
  400. where currency_code='CNY' and country_code='CN'
  401. group by sku_no,`size`
  402. ) as t2
  403. on t2.sku_no=t1.sku_no
  404. and t2.`size`=t1.`size`
  405. and t2.date_key=t1.date_key
  406. where t1.currency_code='CNY' and t1.country_code='CN'
  407. """
  408. data:pd.DataFrame = self.db_helper.get_data(sql)
  409. columns =list(data.columns)
  410. columns ={c:c.replace('t1.','')for c in columns}
  411. data = data.rename(columns=columns)
  412. data.to_csv(file_path, index=False)# 更新超时时间
  413. self.save_last_time(file_path)except Exception as ex:
  414. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  415. now_lock.release()# 释放锁defget_dim_size_group(self):'''尺码映射数据'''
  416. file_path = os.path.join(self.save_dir,'dim_size_group.csv')
  417. now_lock = self.get_lock(file_path)
  418. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  419. sql ="""select * from ur_bi_dw.dim_size_group"""
  420. data:pd.DataFrame = self.db_helper.get_data(sql)
  421. columns =list(data.columns)
  422. columns ={c:c.replace('dim_size_group.','')for c in columns}
  423. data = data.rename(columns=columns)
  424. data.to_csv(file_path, index=False)# 更新超时时间
  425. self.save_last_time(file_path)except Exception as ex:
  426. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  427. now_lock.release()# 释放锁passdefget_common_datas(
  428. host='10.2.32.22',
  429. port=21051,
  430. database='ur_ai_dw',
  431. auth_mechanism='LDAP',
  432. user='urbi',
  433. password='Ur#730xd',
  434. logger:logging.Logger=None):# 共用文件
  435. common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
  436. common_ur_bi_dir = os.path.join(common_datas_dir,'ur_bi_data')
  437. ur_bi_get_datas = UrBiGetDatas(
  438. host=host,
  439. port=port,
  440. database=database,
  441. auth_mechanism=auth_mechanism,
  442. user=user,
  443. password=password,
  444. save_dir=common_ur_bi_dir,
  445. logger=logger
  446. )try:
  447. logger.info('正在查询日期数据...')
  448. ur_bi_get_datas.get_dim_date()
  449. logger.info('查询日期数据完成!')
  450. logger.info('正在查询店铺数据...')
  451. ur_bi_get_datas.get_dim_shop()
  452. logger.info('查询店铺数据完成!')
  453. logger.info('正在查询天气数据...')
  454. ur_bi_get_datas.get_weather()
  455. logger.info('查询天气数据完成!')
  456. logger.info('正在查询天气城市数据...')
  457. ur_bi_get_datas.get_weather_city()
  458. logger.info('查询天气城市数据完成!')
  459. logger.info('正在查询货品数据...')
  460. ur_bi_get_datas.get_dim_goods()
  461. logger.info('查询货品数据完成!')
  462. logger.info('正在查询实际销量数据...')
  463. ur_bi_get_datas.get_dwd_daily_sales_size()
  464. logger.info('查询实际销量数据完成!')except Exception as ex:
  465. logger.exception(ex)raise ex # 往外抛出异常finally:
  466. ur_bi_get_datas.close()passclassCustomUrBiGetDatas(UrBiGetDatasBase):def__init__(
  467. self,
  468. host='10.2.32.22',
  469. port=21051,
  470. database='ur_ai_dw',
  471. auth_mechanism='LDAP',
  472. user='urbi',
  473. password='Ur#730xd',
  474. save_dir='./hjx/data/ur_bi_data',
  475. logger:logging.Logger=None):
  476. self.save_dir = save_dir
  477. self.logger = logger
  478. super().__init__(
  479. host=host,
  480. port=port,
  481. database=database,
  482. auth_mechanism=auth_mechanism,
  483. user=user,
  484. password=password,
  485. save_dir=save_dir,
  486. logger=logger
  487. )defget_sales_goal_amt(self):'''销售目标金额'''
  488. file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
  489. now_lock = self.get_lock(file_path)
  490. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  491. sql ='''
  492. select sales_goal.shop_no,
  493. if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,
  494. dates.month_of_year,
  495. sum(sales_goal.sales_goal_amt) as sales_goal_amt
  496. from ur_bi_dw.dwd_sales_goal_west as sales_goal
  497. inner join ur_bi_dw.dim_date as dates
  498. on sales_goal.date_key = dates.date_key
  499. group by sales_goal.shop_no,
  500. if(sales_goal.serial='Y','W',sales_goal.serial),
  501. dates.month_of_year
  502. '''
  503. data:pd.DataFrame = self.db_helper.get_data(sql)
  504. data = data.rename(columns={'shop_no':'sales_goal.shop_no','serial':'sales_goal.serial','month_of_year':'dates.month_of_year',})# 排序
  505. data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
  506. data.to_csv(file_path)# 更新超时时间
  507. self.save_last_time(file_path)except Exception as ex:
  508. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  509. now_lock.release()# 释放锁defget_shop_serial_area(self):'''店-系列面积'''
  510. file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
  511. now_lock = self.get_lock(file_path)
  512. now_lock.acquire()# 加锁try:# 设置超时4小时才重新查数据ifnot self.get_last_time(file_path):return
  513. sql ='''
  514. select shop_serial_area.shop_no,
  515. if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,
  516. shop_serial_area.month_of_year,
  517. sum(shop_serial_area.area) as `shop_serial_area.area`
  518. from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
  519. where shop_serial_area.area is not null
  520. group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
  521. '''
  522. data:pd.DataFrame = self.db_helper.get_data(sql)
  523. data = data.rename(columns={'shop_no':'shop_serial_area.shop_no','serial':'shop_serial_area.serial','month_of_year':'shop_serial_area.month_of_year','area':'shop_serial_area.area',})# 排序
  524. data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
  525. data.to_csv(file_path)# 更新超时时间
  526. self.save_last_time(file_path)except Exception as ex:
  527. self.logger.exception(ex)raise ex # 往外抛出异常finally:
  528. now_lock.release()# 释放锁passdefget_datas(
  529. host='10.2.32.22',
  530. port=21051,
  531. database='ur_ai_dw',
  532. auth_mechanism='LDAP',
  533. user='urbi',
  534. password='Ur#730xd',
  535. save_dir='./data/sales_forecast/ur_bi_dw_data',
  536. logger:logging.Logger=None):
  537. ur_bi_get_datas = CustomUrBiGetDatas(
  538. host=host,
  539. port=port,
  540. database=database,
  541. auth_mechanism=auth_mechanism,
  542. user=user,
  543. password=password,
  544. save_dir=save_dir,
  545. logger=logger
  546. )try:# 店,系列,品类,年月,销售目标金额
  547. logger.info('正在查询年月销售目标金额数据...')
  548. ur_bi_get_datas.get_sales_goal_amt()
  549. logger.info('查询年月销售目标金额数据完成!')except Exception as ex:
  550. logger.exception(ex)raise ex # 往外抛出异常finally:
  551. ur_bi_get_datas.close()passdefgetdata_ur_bi_dw(
  552. host='10.2.32.22',
  553. port=21051,
  554. database='ur_ai_dw',
  555. auth_mechanism='LDAP',
  556. user='urbi',
  557. password='Ur#730xd',
  558. save_dir='./data/sales_forecast/ur_bi_dw_data',
  559. logger=None):
  560. get_common_datas(
  561. host=host,
  562. port=port,
  563. database=database,
  564. auth_mechanism=auth_mechanism,
  565. user=user,
  566. password=password,
  567. logger=logger
  568. )
  569. get_datas(
  570. host=host,
  571. port=port,
  572. database=database,
  573. auth_mechanism=auth_mechanism,
  574. user=user,
  575. password=password,
  576. save_dir=save_dir,
  577. logger=logger
  578. )pass# 代码入口# getdata_ur_bi_dw(# host=ur_bi_dw_host,# port=ur_bi_dw_port,# database=ur_bi_dw_database,# auth_mechanism=ur_bi_dw_auth_mechanism,# user=ur_bi_dw_user,# password=ur_bi_dw_password,# save_dir=ur_bi_dw_save_dir,# logger=logger# )

代码说明和领悟

每个类的具体作用说明,代码需要根据下面的文字说明进行“食用”:

(第一层)HiveHelper完成了连接数据库、关闭数据库连接、生成事务、执行、引擎、连接等功能

VarsHelper提供了一个简单的持久化功能,可以将对象以文件的形式存放在磁盘上。并提供设置值、获取值、判断值是否存在的方法

GlobalShareArgs提供了一个字典,并且提供了获取字典、设置字典、设置字典键值对、设置字典键的值、判断键是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs类似,只是一开始字典的初始化的键值对比较多

(第二层)UrBiGetDataBase类,提供了线程锁字典、时间字典、超时判断字典,都是类变量;使用了HiveHelper类,但注意,不是继承。在具体的sql读数时,提供了线程固定和时间判断

(第三层)UrBiGetDatas类,获取hive数据库那边的日期数据、店铺数据、会员数据、天气数据、天气城市数据、商品数据、店铺生命周期数据、全国商品生命周期数据、商品开发码数数据、实际销售金额、实际配货金额、商品畅滞销数据、商品成本价数据、尺码映射数据等。

(第四层)get_common_data函数,使用URBiGetData类读取日期、店铺、天气、天气城市、货品、实际销量数据,并缓存到文件夹./yongjian/data/ur_bi_data下面

CustomUrBiGetData类,继承了UrBiGetDatasBase类,读取销售目标金额、点系列面积数据。

(这个也是第四层)get_datas函数,通过CustomUrBiGetData类,读取年月销售目标金额。

总的函数:(这个是总的调用入口函数)get_data_ur_bi_dw函数,调用了get_common_data和get_datas函数进行读取数据,然后将数据保存到某个文件夹目录下面。

举一反三,如果你不是hive数据库,你可以将第一层这个底层更换成mysql。主页有解释如果进行更换。第二层不需要改变,第三层就是你想要进行读取的数据表,不同的数据库你想要读取的数据表也不同,所以sql需要你在这里写,套用里面的方法即可,基本上就是修改sql就好了。

这种方法的好处在于,数据不会重复读取,并且读取的数据都可以得到高效的使用。

后续附上修改成mysql的一个例子代码

  1. import logging
  2. import pandas as pd
  3. from impala.dbapi import connect
  4. import sqlalchemy
  5. from sqlalchemy.orm import sessionmaker
  6. import os
  7. import time
  8. import os
  9. import datetime
  10. from dateutil.relativedelta import relativedelta
  11. from typing import Dict, List
  12. import logging
  13. import threading
  14. import pandas as pd
  15. import pickle
  16. class MySqlHelper(object):
  17. def __init__(
  18. self,
  19. host='192.168.15.144',
  20. port=3306,
  21. database='test_ims',
  22. user='spkjz_writer',
  23. password='7cmoP3QDtueVJQj2q4Az',
  24. logger:logging.Logger=None
  25. ):
  26. self.host = host
  27. self.port = port
  28. self.database = database
  29. self.user = user
  30. self.password = password
  31. self.logger = logger
  32. self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
  33. self.user, self.password, self.host, self.port, self.database
  34. )
  35. self.conn = None
  36. self.cursor = None
  37. self.engine = None
  38. self.session = None
  39. def create_table_code(self, file_name):
  40. '''创建表类代码'''
  41. os.system(f'sqlacodegen {self.connection_str} > {file_name}')
  42. return self.conn
  43. def get_conn(self):
  44. '''创建连接或获取连接'''
  45. if self.conn is None:
  46. engine = self.get_engine()
  47. self.conn = engine.connect()
  48. return self.conn
  49. def get_engine(self):
  50. '''创建连接或获取连接'''
  51. if self.engine is None:
  52. self.engine = sqlalchemy.create_engine(self.connection_str)
  53. return self.engine
  54. def get_cursor(self):
  55. '''创建连接或获取连接'''
  56. if self.cursor is None:
  57. self.cursor = self.conn.cursor()
  58. return self.cursor
  59. def get_session(self) -> sessionmaker:
  60. '''创建连接或获取连接'''
  61. if self.session is None:
  62. engine = self.get_engine()
  63. Session = sessionmaker(bind=engine)
  64. self.session = Session()
  65. return self.session
  66. def close_conn(self):
  67. '''关闭连接'''
  68. if self.conn is not None:
  69. self.conn.close()
  70. self.conn = None
  71. self.dispose_engine()
  72. def close_session(self):
  73. '''关闭连接'''
  74. if self.session is not None:
  75. self.session.close()
  76. self.session = None
  77. self.dispose_engine()
  78. def dispose_engine(self):
  79. '''释放engine'''
  80. if self.engine is not None:
  81. # self.engine.dispose(close=False)
  82. self.engine.dispose()
  83. self.engine = None
  84. def close_cursor(self):
  85. '''关闭cursor'''
  86. if self.cursor is not None:
  87. self.cursor.close()
  88. self.cursor = None
  89. def get_data(self, sql, auto_close=True) -> pd.DataFrame:
  90. '''查询数据'''
  91. conn = self.get_conn()
  92. data = None
  93. try:
  94. # 异常重试3次
  95. for i in range(3):
  96. try:
  97. data = pd.read_sql(sql, conn)
  98. break
  99. except Exception as ex:
  100. if i == 2:
  101. raise ex # 往外抛出异常
  102. time.sleep(60) # 一分钟后重试
  103. except Exception as ex:
  104. self.logger.exception(ex)
  105. raise ex # 往外抛出异常
  106. finally:
  107. if auto_close:
  108. self.close_conn()
  109. return data
  110. pass
  111. class VarsHelper():
  112. def __init__(self, save_dir, auto_save=True):
  113. self.save_dir = save_dir
  114. self.auto_save = auto_save
  115. self.values = {}
  116. if not os.path.exists(os.path.dirname(self.save_dir)):
  117. os.makedirs(os.path.dirname(self.save_dir))
  118. if os.path.exists(self.save_dir):
  119. with open(self.save_dir, 'rb') as f:
  120. self.values = pickle.load(f)
  121. f.close()
  122. def set_value(self, key, value):
  123. self.values[key] = value
  124. if self.auto_save:
  125. self.save_file()
  126. def get_value(self, key):
  127. return self.values[key]
  128. def has_key(self, key):
  129. return key in self.values.keys()
  130. def save_file(self):
  131. with open(self.save_dir, 'wb') as f:
  132. pickle.dump(self.values, f)
  133. f.close()
  134. pass
  135. class GlobalShareArgs():
  136. args = {
  137. "debug": False
  138. }
  139. def get_args():
  140. return GlobalShareArgs.args
  141. def set_args(args):
  142. GlobalShareArgs.args = args
  143. def set_args_value(key, value):
  144. GlobalShareArgs.args[key] = value
  145. def get_args_value(key, default_value=None):
  146. return GlobalShareArgs.args.get(key, default_value)
  147. def contain_key(key):
  148. return key in GlobalShareArgs.args.keys()
  149. def update(args):
  150. GlobalShareArgs.args.update(args)
  151. pass
  152. class ShareArgs():
  153. args = {
  154. "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
  155. "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
  156. "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
  157. "only_predict": False, # 只识别,不训练
  158. "delete_model": True, # 先删除模型,仅在训练时使用
  159. "export_excel": False, # 导出excel
  160. "classes": 12, # 聚类数
  161. "batch_size": 16,
  162. "hidden_size": 32,
  163. "max_nrof_epochs": 100,
  164. "learning_rate": 0.0005,
  165. "loss_type": "categorical_crossentropy",
  166. "avg_model_num": 10,
  167. "steps_per_epoch": 4.0, # 4.0
  168. "lr_callback_patience": 4,
  169. "lr_callback_cooldown": 1,
  170. "early_stopping_callback_patience": 6,
  171. "get_data": True,
  172. }
  173. def get_args():
  174. return ShareArgs.args
  175. def set_args(args):
  176. ShareArgs.args = args
  177. def set_args_value(key, value):
  178. ShareArgs.args[key] = value
  179. def get_args_value(key, default_value=None):
  180. return ShareArgs.args.get(key, default_value)
  181. def contain_key(key):
  182. return key in ShareArgs.args.keys()
  183. def update(args):
  184. ShareArgs.args.update(args)
  185. pass
  186. class IMSGetDatasBase():
  187. # 线程锁列表,同保存路径共用锁
  188. lock_dict:Dict[str, threading.Lock] = {}
  189. # 时间列表,用于判断是否超时
  190. time_dict:Dict[str, datetime.datetime] = {}
  191. # 用于记录是否需要更新超时时间
  192. get_data_timeout_dict:Dict[str, bool] = {}
  193. def __init__(
  194. self,
  195. host='192.168.15.144',
  196. port=3306,
  197. database='test_ims',
  198. user='spkjz_writer',
  199. password='Ur#7cmoP3QDtueVJQj2q4Az',
  200. save_dir=None,
  201. logger:logging.Logger=None,
  202. ):
  203. self.save_dir = save_dir
  204. self.logger = logger
  205. self.db_helper = MySqlHelper(
  206. host=host,
  207. port=port,
  208. database=database,
  209. user=user,
  210. password=password,
  211. logger=logger
  212. )
  213. # 创建子目录
  214. if self.save_dir is not None and not os.path.exists(self.save_dir):
  215. os.makedirs(self.save_dir)
  216. self.vars_helper = None
  217. if GlobalShareArgs.get_args_value('debug'):
  218. self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试
  219. def close(self):
  220. '''关闭连接'''
  221. self.db_helper.close_conn()
  222. def get_last_time(self, key_name) -> bool:
  223. '''获取是否超时'''
  224. # 转静态路径,确保唯一性
  225. key_name = os.path.abspath(key_name)
  226. if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):
  227. IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')
  228. timeout = 12 # 12小时
  229. if GlobalShareArgs.get_args_value('debug'):
  230. timeout = 24 # 24小时
  231. get_data_timeout = False
  232. if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
  233. self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
  234. # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
  235. get_data_timeout = True
  236. else:
  237. self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
  238. # if self.vars_helper is not None :
  239. # self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)
  240. IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
  241. return get_data_timeout
  242. def save_last_time(self, key_name):
  243. '''更新状态超时'''
  244. # 转静态路径,确保唯一性
  245. key_name = os.path.abspath(key_name)
  246. if IMSGetDatasBase.get_data_timeout_dict[key_name]:
  247. IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
  248. if self.vars_helper is not None :
  249. IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
  250. self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)
  251. def get_lock(self, key_name) -> threading.Lock:
  252. '''获取锁'''
  253. # 转静态路径,确保唯一性
  254. key_name = os.path.abspath(key_name)
  255. if key_name not in IMSGetDatasBase.lock_dict.keys():
  256. IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
  257. return IMSGetDatasBase.lock_dict[key_name]
  258. def get_data_of_date(
  259. self,
  260. save_dir,
  261. sql,
  262. sort_columns:List[str],
  263. del_index_list=[-1], # 删除最后下标
  264. start_date = datetime.datetime(2017, 1, 1), # 开始时间
  265. offset = relativedelta(months=3), # 时间间隔
  266. date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
  267. filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
  268. stop_date = '20700101', # 超过时间则停止
  269. ):
  270. '''分时间增量读取数据'''
  271. # 创建文件夹
  272. if not os.path.exists(save_dir):
  273. os.makedirs(save_dir)
  274. else:
  275. #删除最后一个文件
  276. file_list = os.listdir(save_dir)
  277. if len(file_list)>0:
  278. file_list.sort()
  279. for del_index in del_index_list:
  280. os.remove(os.path.join(save_dir,file_list[del_index]))
  281. print('删除最后一个文件:', file_list[del_index])
  282. select_index = -1
  283. # start_date = datetime.datetime(2017, 1, 1)
  284. while True:
  285. end_date = start_date + offset
  286. start_date_str = date_format_fun(start_date)
  287. end_date_str = date_format_fun(end_date)
  288. self.logger.info('date: %s-%s', start_date_str, end_date_str)
  289. file_path = os.path.join(save_dir, filename_format_fun(start_date))
  290. # self.logger.info('file_path: %s', file_path)
  291. if not os.path.exists(file_path):
  292. data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
  293. if data is None:
  294. break
  295. self.logger.info('data: %d', len(data))
  296. # self.logger.info('data: %d', data.columns)
  297. if len(data)>0:
  298. select_index+=1
  299. # 排序
  300. data = data.sort_values(sort_columns)
  301. data.to_csv(file_path)
  302. elif select_index!=-1:
  303. break
  304. elif stop_date < start_date_str:
  305. raise Exception("读取数据异常,时间超出最大值!")
  306. start_date = end_date
  307. pass
  308. class CustomIMSGetDatas(IMSGetDatasBase):
  309. def __init__(
  310. self,
  311. host='192.168.13.134',
  312. port=4000,
  313. database='test_ims',
  314. user='root',
  315. password='rootimmsadmin',
  316. save_dir='./hjx/data/export_ims_data',
  317. logger:logging.Logger=None
  318. ):
  319. self.save_dir = save_dir
  320. self.logger = logger
  321. super().__init__(
  322. host=host,
  323. port=port,
  324. database=database,
  325. user=user,
  326. password=password,
  327. save_dir=save_dir,
  328. logger=logger
  329. )
  330. def get_ims_w_amt_pro(self):
  331. '''年月系列占比数据'''
  332. file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')
  333. now_lock = self.get_lock(file_path)
  334. now_lock.acquire() # 加锁
  335. try:
  336. # 设置超时4小时才重新查数据
  337. # if not self.get_last_time(file_path):
  338. # return
  339. sql = 'SELECT * FROM ims_w_amt_pro'
  340. data:pd.DataFrame = self.db_helper.get_data(sql)
  341. data = data.rename(columns={
  342. 'serial_forecast_proportion': 'forecast_proportion',
  343. })
  344. data.to_csv(file_path)
  345. # # 更新超时时间
  346. # self.save_last_time(file_path)
  347. except Exception as ex:
  348. self.logger.exception(ex)
  349. raise ex # 往外抛出异常
  350. finally:
  351. now_lock.release() # 释放锁
  352. pass
  353. def get_datas(
  354. host='192.168.13.134',
  355. port=4000,
  356. database='test_ims',
  357. user='root',
  358. password='rootimmsadmin',
  359. save_dir='./hjx/data/export_ims_data',
  360. logger:logging.Logger=None
  361. ):
  362. ur_bi_get_datas = CustomIMSGetDatas(
  363. host=host,
  364. port=port,
  365. database=database,
  366. user=user,
  367. password=password,
  368. save_dir=save_dir,
  369. logger=logger
  370. )
  371. try:
  372. # 年月系列占比数据
  373. logger.info('正在查询年月系列占比数据...')
  374. ur_bi_get_datas.get_ims_w_amt_pro()
  375. logger.info('查询年月系列占比数据完成!')
  376. except Exception as ex:
  377. logger.exception(ex)
  378. raise ex # 往外抛出异常
  379. finally:
  380. ur_bi_get_datas.close()
  381. pass
  382. def getdata_export_ims(
  383. host='192.168.13.134',
  384. port=4000,
  385. database='test_ims',
  386. user='root',
  387. password='rootimmsadmin',
  388. save_dir='./hjx/data/export_ims_data',
  389. logger:logging.Logger=None
  390. ):
  391. get_datas(
  392. host=host,
  393. port=port,
  394. database=database,
  395. user=user,
  396. password=password,
  397. save_dir=save_dir,
  398. logger=logger
  399. )
  400. pass

本文转载自: https://blog.csdn.net/my_name_is_learn/article/details/129243879
版权归原作者 是猪哥不是诸葛 所有, 如有侵权,请联系我们删除。

“实际业务读取Hive数据库(2023年2月)”的评论:

还没有评论