安装依赖
pip install findpeaks
使用
import numpy as np
from findpeaks import findpeaks
if __name__ == '__main__':
# 初始化
fp = findpeaks(
method='peakdetect', # 检测方式:一维数组【】二维数据【】
whitelist=['valley'], # 检测目标【峰peak,谷valley,峰谷['peak','valley']】
lookahead=1, # 前瞻性优化算法【数据量越少,此数字越小,比如50个数据,最好选择1或者2】
interpolate=10, # 插值,放大横坐标【数字越高,作图的边缘越不锋利】
)
# 数值
data = np.asarray([97, 11, 27, 69, 39, 52, 84, 81, 92, 84, 83, 95, 10, 87, 72, 84, 36, 15, 85, 68, 60, 59, 61, 3,
13, 12, 4, 80, 28, 53, 24, 32, 10, 2, 9, 57, 15, 66, 99, 26, 40, 63, 97, 22, 27, 98, 15, 84,
76, 34])
results = fp.fit(
X=data, # 数据
x=None # 作图的x轴坐标(默认0,1,2,3...)
)
# 打印结果
print(results)
# 作图
fp.plot()
使用案例:
from datetime import datetime, timedelta
from findpeaks import findpeaks
from pandas import DataFrame
from db.config import session
from db.db_models import ReportCapacityEnergy
from utils.influxdb_util import InfluxdbUtil, InfluxdbStorage
class PeakValleyDto:
def __init__(self, value, index, time, flag):
"""
峰谷值DTO
:param value: 值
:param index: 所在数组的索引
:param time: 时间戳
:param flag: 自定义标识
"""
self.value = value
self.index = index
self.time = time
self.flag = flag
class CapacityEnergyService:
"""
产能与能耗
"""
def __init__(self):
"""
初始化:
"""
# influxdb工具类
self.influxdb_util = InfluxdbUtil(url="http://xxxxxxxxx.com/",
token="xxxxxxxx",
org="xxxx")
@staticmethod
def check_out_peak_valley(capacity_data) -> list[PeakValleyDto]:
"""
检出的峰谷值
:return:
"""
result: list[PeakValleyDto] = []
m = map(lambda x: x.value, capacity_data)
fp = findpeaks(method='peakdetect', whitelist=['peak', 'valley'], lookahead=1, interpolate=1)
# 获取峰谷值
fit: dict = fp.fit(X=list(m))
# 原始数据
df_: DataFrame = fit['df']
# 平滑数据
df_interp_: DataFrame = fit['df_interp']
# 波谷
df__valley_true_: DataFrame = df_[df_['valley'] == True]
for valley in df__valley_true_.iterrows():
row = valley[1]
value = row['y'] # 值
index = row['x'] # 索引
time = capacity_data[row['x']].time # 时间戳
flag = 'valley' # 标记
result.append(PeakValleyDto(value, index, time, flag))
# 波峰
df__peak_true_: DataFrame = df_[df_['peak'] == True]
for peak in df__peak_true_.iterrows():
row = peak[1]
value = row['y'] # 值
index = row['x'] # 索引
time = capacity_data[row['x']].time # 时间戳
flag = 'peak' # 标记
result.append(PeakValleyDto(value, index, time, flag))
# 作图
fp.plot()
if len(result) > 0:
sor = sorted(result, key=lambda x: x.time)
# 起始点和结束点不是峰谷
# sor[0].flag = 'first'
# sor[-1].flag = 'last'
sor[0] = PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first')
sor[-1] = PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
capacity_data[-1].time, 'last')
return sor
else:
if len(capacity_data) == 0:
return [PeakValleyDto(0, 0, None, 'first'),
PeakValleyDto(0, 0, None, 'last')]
return [PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first'),
PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
capacity_data[-1].time, 'last')]
def calculate_capacity(self, start: datetime, end: datetime):
"""
计算产能
:return:
"""
capacity = 0
capacity_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="JLKZQ",
measurement="N_PLTS",
every="1m",
start=start,
stop=end,
createEmpty=False)
# 获取峰谷值
peak_valleys = self.check_out_peak_valley(capacity_data)
# 拆成两个一组
group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
for group in group_every_two:
print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
capacity += float(group[1].value) - float(group[0].value)
print("---------------------------")
return capacity
def calculate_energy(self, start: datetime, end: datetime):
"""
计算能耗
:return:
"""
energy = 0
energy_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="TRQ",
measurement="HZ_TRQ1_BKJLLTJ_GET_M3",
every="1m",
start=start,
stop=end,
createEmpty=False)
# # 获取峰谷值
# peak_valleys = self.check_out_peak_valley(energy_data)
# # 拆成两个一组
# group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
# for group in group_every_two:
# print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
# energy += float(group[1].value) - float(group[0].value)
# print("---------------------------")
# return energy
# 不需要峰谷计算,直接尾头相减即可11656.67857142858
return float(energy_data[-1].value) - float(energy_data[0].value)
def start(self, start: datetime, end: datetime) -> tuple:
"""
实时查询产能和能耗
:param start:
:param end:
:return:
"""
capacity_num = self.calculate_capacity(start, end)
energy_num = self.calculate_energy(start, end)
print("计算范围:", start, " 到 ", end)
print("产能:", capacity_num)
print("能耗:", energy_num)
return capacity_num, energy_num
def capacity_energy_db(self, start_date: datetime, end_date: datetime):
"""
产能能耗-按天计算-数据入库
:return:
"""
# 这里应该是根据作业指导书来分别查询一天中不同的作业指导书时段,计算出各个桶数和相应的重量,他们之和作为产能 TODO
start_date = start_date.replace(hour=0, minute=0, second=0)
end_date = end_date.replace(hour=0, minute=0, second=0)
for i in range(end_date.__sub__(start_date).days):
start = start_date + timedelta(days=i)
end = start_date + + timedelta(days=i + 1)
capacity_energy_service = CapacityEnergyService()
capacity_num, energy_num = capacity_energy_service.start(start=start, end=end)
report_capacity_energy = ReportCapacityEnergy(
capacity_bucket='JLKZQ',
energy_bucket='TRQ',
capacity_measurement='N_PLTS',
energy_measurement='HZ_TRQ1_BKJLLTJ_GET_M3',
barrel_num=capacity_num,
gas_num=energy_num,
start_time=start,
end_time=end,
week=start.isocalendar().week,
instruction_ids="",
type=1,
remarks='',
)
count = session.query(ReportCapacityEnergy) \
.filter(ReportCapacityEnergy.start_time == start) \
.filter(ReportCapacityEnergy.end_time == end) \
.count()
if count == 1:
session.query(ReportCapacityEnergy) \
.filter(ReportCapacityEnergy.start_time == start) \
.filter(ReportCapacityEnergy.end_time == end) \
.update({ReportCapacityEnergy.barrel_num: capacity_num, ReportCapacityEnergy.gas_num: energy_num})
else:
session.add(report_capacity_energy)
session.commit()
本文转载自: https://blog.csdn.net/wenxingchen/article/details/128865659
版权归原作者 苍穹之跃 所有, 如有侵权,请联系我们删除。
版权归原作者 苍穹之跃 所有, 如有侵权,请联系我们删除。