0


python峰谷值算法:findpeaks

安装依赖

  1. pip install findpeaks

使用

  1. import numpy as np
  2. from findpeaks import findpeaks
  3. if __name__ == '__main__':
  4. # 初始化
  5. fp = findpeaks(
  6. method='peakdetect', # 检测方式:一维数组【】二维数据【】
  7. whitelist=['valley'], # 检测目标【峰peak,谷valley,峰谷['peak','valley']】
  8. lookahead=1, # 前瞻性优化算法【数据量越少,此数字越小,比如50个数据,最好选择1或者2】
  9. interpolate=10, # 插值,放大横坐标【数字越高,作图的边缘越不锋利】
  10. )
  11. # 数值
  12. data = np.asarray([97, 11, 27, 69, 39, 52, 84, 81, 92, 84, 83, 95, 10, 87, 72, 84, 36, 15, 85, 68, 60, 59, 61, 3,
  13. 13, 12, 4, 80, 28, 53, 24, 32, 10, 2, 9, 57, 15, 66, 99, 26, 40, 63, 97, 22, 27, 98, 15, 84,
  14. 76, 34])
  15. results = fp.fit(
  16. X=data, # 数据
  17. x=None # 作图的x轴坐标(默认0,1,2,3...)
  18. )
  19. # 打印结果
  20. print(results)
  21. # 作图
  22. fp.plot()

使用案例:

  1. from datetime import datetime, timedelta
  2. from findpeaks import findpeaks
  3. from pandas import DataFrame
  4. from db.config import session
  5. from db.db_models import ReportCapacityEnergy
  6. from utils.influxdb_util import InfluxdbUtil, InfluxdbStorage
  7. class PeakValleyDto:
  8. def __init__(self, value, index, time, flag):
  9. """
  10. 峰谷值DTO
  11. :param value: 值
  12. :param index: 所在数组的索引
  13. :param time: 时间戳
  14. :param flag: 自定义标识
  15. """
  16. self.value = value
  17. self.index = index
  18. self.time = time
  19. self.flag = flag
  20. class CapacityEnergyService:
  21. """
  22. 产能与能耗
  23. """
  24. def __init__(self):
  25. """
  26. 初始化:
  27. """
  28. # influxdb工具类
  29. self.influxdb_util = InfluxdbUtil(url="http://xxxxxxxxx.com/",
  30. token="xxxxxxxx",
  31. org="xxxx")
  32. @staticmethod
  33. def check_out_peak_valley(capacity_data) -> list[PeakValleyDto]:
  34. """
  35. 检出的峰谷值
  36. :return:
  37. """
  38. result: list[PeakValleyDto] = []
  39. m = map(lambda x: x.value, capacity_data)
  40. fp = findpeaks(method='peakdetect', whitelist=['peak', 'valley'], lookahead=1, interpolate=1)
  41. # 获取峰谷值
  42. fit: dict = fp.fit(X=list(m))
  43. # 原始数据
  44. df_: DataFrame = fit['df']
  45. # 平滑数据
  46. df_interp_: DataFrame = fit['df_interp']
  47. # 波谷
  48. df__valley_true_: DataFrame = df_[df_['valley'] == True]
  49. for valley in df__valley_true_.iterrows():
  50. row = valley[1]
  51. value = row['y'] # 值
  52. index = row['x'] # 索引
  53. time = capacity_data[row['x']].time # 时间戳
  54. flag = 'valley' # 标记
  55. result.append(PeakValleyDto(value, index, time, flag))
  56. # 波峰
  57. df__peak_true_: DataFrame = df_[df_['peak'] == True]
  58. for peak in df__peak_true_.iterrows():
  59. row = peak[1]
  60. value = row['y'] # 值
  61. index = row['x'] # 索引
  62. time = capacity_data[row['x']].time # 时间戳
  63. flag = 'peak' # 标记
  64. result.append(PeakValleyDto(value, index, time, flag))
  65. # 作图
  66. fp.plot()
  67. if len(result) > 0:
  68. sor = sorted(result, key=lambda x: x.time)
  69. # 起始点和结束点不是峰谷
  70. # sor[0].flag = 'first'
  71. # sor[-1].flag = 'last'
  72. sor[0] = PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first')
  73. sor[-1] = PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
  74. capacity_data[-1].time, 'last')
  75. return sor
  76. else:
  77. if len(capacity_data) == 0:
  78. return [PeakValleyDto(0, 0, None, 'first'),
  79. PeakValleyDto(0, 0, None, 'last')]
  80. return [PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first'),
  81. PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
  82. capacity_data[-1].time, 'last')]
  83. def calculate_capacity(self, start: datetime, end: datetime):
  84. """
  85. 计算产能
  86. :return:
  87. """
  88. capacity = 0
  89. capacity_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="JLKZQ",
  90. measurement="N_PLTS",
  91. every="1m",
  92. start=start,
  93. stop=end,
  94. createEmpty=False)
  95. # 获取峰谷值
  96. peak_valleys = self.check_out_peak_valley(capacity_data)
  97. # 拆成两个一组
  98. group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
  99. for group in group_every_two:
  100. print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
  101. capacity += float(group[1].value) - float(group[0].value)
  102. print("---------------------------")
  103. return capacity
  104. def calculate_energy(self, start: datetime, end: datetime):
  105. """
  106. 计算能耗
  107. :return:
  108. """
  109. energy = 0
  110. energy_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="TRQ",
  111. measurement="HZ_TRQ1_BKJLLTJ_GET_M3",
  112. every="1m",
  113. start=start,
  114. stop=end,
  115. createEmpty=False)
  116. # # 获取峰谷值
  117. # peak_valleys = self.check_out_peak_valley(energy_data)
  118. # # 拆成两个一组
  119. # group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
  120. # for group in group_every_two:
  121. # print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
  122. # energy += float(group[1].value) - float(group[0].value)
  123. # print("---------------------------")
  124. # return energy
  125. # 不需要峰谷计算,直接尾头相减即可11656.67857142858
  126. return float(energy_data[-1].value) - float(energy_data[0].value)
  127. def start(self, start: datetime, end: datetime) -> tuple:
  128. """
  129. 实时查询产能和能耗
  130. :param start:
  131. :param end:
  132. :return:
  133. """
  134. capacity_num = self.calculate_capacity(start, end)
  135. energy_num = self.calculate_energy(start, end)
  136. print("计算范围:", start, " 到 ", end)
  137. print("产能:", capacity_num)
  138. print("能耗:", energy_num)
  139. return capacity_num, energy_num
  140. def capacity_energy_db(self, start_date: datetime, end_date: datetime):
  141. """
  142. 产能能耗-按天计算-数据入库
  143. :return:
  144. """
  145. # 这里应该是根据作业指导书来分别查询一天中不同的作业指导书时段,计算出各个桶数和相应的重量,他们之和作为产能 TODO
  146. start_date = start_date.replace(hour=0, minute=0, second=0)
  147. end_date = end_date.replace(hour=0, minute=0, second=0)
  148. for i in range(end_date.__sub__(start_date).days):
  149. start = start_date + timedelta(days=i)
  150. end = start_date + + timedelta(days=i + 1)
  151. capacity_energy_service = CapacityEnergyService()
  152. capacity_num, energy_num = capacity_energy_service.start(start=start, end=end)
  153. report_capacity_energy = ReportCapacityEnergy(
  154. capacity_bucket='JLKZQ',
  155. energy_bucket='TRQ',
  156. capacity_measurement='N_PLTS',
  157. energy_measurement='HZ_TRQ1_BKJLLTJ_GET_M3',
  158. barrel_num=capacity_num,
  159. gas_num=energy_num,
  160. start_time=start,
  161. end_time=end,
  162. week=start.isocalendar().week,
  163. instruction_ids="",
  164. type=1,
  165. remarks='',
  166. )
  167. count = session.query(ReportCapacityEnergy) \
  168. .filter(ReportCapacityEnergy.start_time == start) \
  169. .filter(ReportCapacityEnergy.end_time == end) \
  170. .count()
  171. if count == 1:
  172. session.query(ReportCapacityEnergy) \
  173. .filter(ReportCapacityEnergy.start_time == start) \
  174. .filter(ReportCapacityEnergy.end_time == end) \
  175. .update({ReportCapacityEnergy.barrel_num: capacity_num, ReportCapacityEnergy.gas_num: energy_num})
  176. else:
  177. session.add(report_capacity_energy)
  178. session.commit()

本文转载自: https://blog.csdn.net/wenxingchen/article/details/128865659
版权归原作者 苍穹之跃 所有, 如有侵权,请联系我们删除。

“python峰谷值算法:findpeaks”的评论:

还没有评论