PEWMA 和 EWMA 区别
EWMA:
μ
t
=
α
μ
t
−
1
+
(
1
−
α
)
X
t
\mu_t = \alpha \mu_{t-1} + (1 - \alpha ) X_t
μt=αμt−1+(1−α)Xt
PEWMA:
μ
t
=
α
(
1
−
β
P
t
)
μ
t
−
1
+
(
1
−
α
(
1
−
β
P
t
)
)
X
t
\mu_t = \alpha (1 - \beta P_t) \mu_{t-1} + (1 - \alpha (1 - \beta P_t)) X_t
μt=α(1−βPt)μt−1+(1−α(1−βPt))Xt
其核心思想:
We choose to adapt weights α by 1 − βPt such that samples that are less likely to have been observed offer little influence to the updated estimate.
pyflink
数据构造
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
y = np.array([ np.random.random()*10+50*int(np.random.random()>0.99)*np.sign(np.random.random()-0.5)for _ inrange(1000)])
y[:len(y)//2]+=200
y +=100
plt.figure(figsize=(20,5))
plt.plot(y)
pyflink
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
# from pyflink.table import (DataTypes, TableDescriptor, Schema, StreamTableEnvironment)from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
classPEWMA(MapFunction):def__init__(self):
self.INIT_NUM =30
self.alpha =1-1/self.INIT_NUM
self.exp_x =None
self.sigma_square =None
self.init_count =None
self.beta =0.5defopen(self, runtime_context: RuntimeContext):
self.exp_x = runtime_context.get_state(
ValueStateDescriptor("exp_x", Types.FLOAT())# 信号均值估计)
self.sigma_square = runtime_context.get_state(
ValueStateDescriptor("sigma_square", Types.FLOAT())# 偏离量的方差估计)
self.init_count = runtime_context.get_state(
ValueStateDescriptor("init_count", Types.INT())# 前期计数)defmap(self, value):
x = value[1]# retrieve the current state
exp_x = self.exp_x.value()if self.exp_x.value()isnotNoneelse x
sigma_square = self.sigma_square.value()if self.sigma_square.value()isnotNoneelse0.
init_count = self.init_count.value()if self.init_count.value()isnotNoneelse0
alpha = self.alpha
diff =abs(x-exp_x)# update the stateif init_count < self.INIT_NUM:
init_count +=1
alpha =1-1/init_count # 保证前期的均值估计是准确的,因为EWMA在前期收初值影响大else:
P =0.39894228* np.exp(-0.5*diff*diff/sigma_square)# adapt weights α by 1 − βP such that samples that are less likely to have been observed offer little influence to the updated estimate.# 如果当前观测值出现的概率很小,就尽量不要用它来更新均值方差估计
alpha *=1- self.beta * P
# update estimate with adjusted alpha
exp_x = alpha * exp_x +(1- alpha)* x
sigma_square = alpha * sigma_square +(1- alpha)* diff * diff
self.exp_x.update(exp_x)
self.sigma_square.update(sigma_square)
self.init_count.update(init_count)
sigma = np.sqrt(sigma_square)return value[0], x, exp_x, diff, sigma, diff >3*sigma # 返回 (key_by字段,原始信号,期望信号,实际偏移量,偏移量方差,是否异常)
env = StreamExecutionEnvironment.get_execution_environment()# 为了验证分组特性, 添加一个分组字段
ds = env.from_collection(
collection=[('alice',float(i))for i in y
]+[('bob',float(i))for i in y
],
type_info=Types.TUPLE([Types.STRING(), Types.FLOAT()]))# apply the process function onto a keyed stream
ds =(
ds.key_by(lambda value: value[0]).map(PEWMA(), output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.FLOAT(),Types.FLOAT(), Types.BOOLEAN()])))
ds.print()# submit for execution
env.execute()
16> (alice,300.4562,300.4562,0.0,0.0,false)
16> (alice,304.18646,302.32135,3.7302551,2.6376886,false)
16> (alice,353.12448,319.2557,50.803146,29.410172,false)
16> (alice,306.1917,315.98972,13.064006,26.294214,false)
16> (alice,307.6791,314.3276,8.310608,23.81012,false)
16> (alice,301.60532,312.2072,12.722278,22.347504,false)
16> (alice,307.79276,311.57657,4.414459,20.756935,false)
16> (alice,307.29886,311.04187,4.2777185,19.47515,false)
16> (alice,300.1238,309.82874,10.918053,18.718546,false)
16> (alice,302.92087,309.13797,6.9078774,17.891825,false)
......
转 table api
from pyflink.table import(DataTypes, Schema, StreamTableEnvironment)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
table = t_env.from_data_stream(
ds,
Schema
.new_builder().column("f0", DataTypes.STRING()).column("f1", DataTypes.FLOAT()).column("f2", DataTypes.FLOAT()).column("f3", DataTypes.FLOAT()).column("f4", DataTypes.FLOAT()).column("f5", DataTypes.BOOLEAN()).build()).alias("user","raw","expected","diff","sigma","isAbnomal")
df = table.to_pandas()
df = df[df["user"]=='alice'].reset_index()
df
matplotlib 画图
_, ax = plt.subplots(1,1,figsize=(20,5))
df[["raw","expected","diff","sigma","isAbnomal"]].plot(ax=ax)
locs =list(df[df['isAbnomal']].index)
plt.plot(locs, y[locs],'ro')
结果分析:
- 初始阶段,漏报一次脉冲异常
- 信号阶跃后,漏报两个脉冲异常
- 平稳状态下,误报两次,毕竟3sigma
和 EWMA 对比
EWMA 的方差收敛更慢,更容易产生漏报,所以该论文的改进效果是有的
版权归原作者 颹蕭蕭 所有, 如有侵权,请联系我们删除。