0


pyflink 时序异常检测——PEWMA

在这里插入图片描述

PEWMA 和 EWMA 区别

EWMA:

  1. μ
  2. t
  3. =
  4. α
  5. μ
  6. t
  7. 1
  8. +
  9. (
  10. 1
  11. α
  12. )
  13. X
  14. t
  15. \mu_t = \alpha \mu_{t-1} + (1 - \alpha ) X_t
  16. μt​=αμt1​+(1−α)Xt

PEWMA:

  1. μ
  2. t
  3. =
  4. α
  5. (
  6. 1
  7. β
  8. P
  9. t
  10. )
  11. μ
  12. t
  13. 1
  14. +
  15. (
  16. 1
  17. α
  18. (
  19. 1
  20. β
  21. P
  22. t
  23. )
  24. )
  25. X
  26. t
  27. \mu_t = \alpha (1 - \beta P_t) \mu_{t-1} + (1 - \alpha (1 - \beta P_t)) X_t
  28. μt​=α(1−βPt​)μt1​+(1−α(1−βPt​))Xt

其核心思想:

We choose to adapt weights α by 1 − βPt such that samples that are less likely to have been observed offer little influence to the updated estimate.
在这里插入图片描述

pyflink

数据构造

  1. import matplotlib.pyplot as plt
  2. import numpy as np
  3. %matplotlib inline
  4. y = np.array([ np.random.random()*10+50*int(np.random.random()>0.99)*np.sign(np.random.random()-0.5)for _ inrange(1000)])
  5. y[:len(y)//2]+=200
  6. y +=100
  7. plt.figure(figsize=(20,5))
  8. plt.plot(y)

在这里插入图片描述

pyflink

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. # from pyflink.table import (DataTypes, TableDescriptor, Schema, StreamTableEnvironment)from pyflink.datastream.functions import RuntimeContext, MapFunction
  4. from pyflink.datastream.state import ValueStateDescriptor
  5. classPEWMA(MapFunction):def__init__(self):
  6. self.INIT_NUM =30
  7. self.alpha =1-1/self.INIT_NUM
  8. self.exp_x =None
  9. self.sigma_square =None
  10. self.init_count =None
  11. self.beta =0.5defopen(self, runtime_context: RuntimeContext):
  12. self.exp_x = runtime_context.get_state(
  13. ValueStateDescriptor("exp_x", Types.FLOAT())# 信号均值估计)
  14. self.sigma_square = runtime_context.get_state(
  15. ValueStateDescriptor("sigma_square", Types.FLOAT())# 偏离量的方差估计)
  16. self.init_count = runtime_context.get_state(
  17. ValueStateDescriptor("init_count", Types.INT())# 前期计数)defmap(self, value):
  18. x = value[1]# retrieve the current state
  19. exp_x = self.exp_x.value()if self.exp_x.value()isnotNoneelse x
  20. sigma_square = self.sigma_square.value()if self.sigma_square.value()isnotNoneelse0.
  21. init_count = self.init_count.value()if self.init_count.value()isnotNoneelse0
  22. alpha = self.alpha
  23. diff =abs(x-exp_x)# update the stateif init_count < self.INIT_NUM:
  24. init_count +=1
  25. alpha =1-1/init_count # 保证前期的均值估计是准确的,因为EWMA在前期收初值影响大else:
  26. P =0.39894228* np.exp(-0.5*diff*diff/sigma_square)# adapt weights α by 1 βP such that samples that are less likely to have been observed offer little influence to the updated estimate.# 如果当前观测值出现的概率很小,就尽量不要用它来更新均值方差估计
  27. alpha *=1- self.beta * P
  28. # update estimate with adjusted alpha
  29. exp_x = alpha * exp_x +(1- alpha)* x
  30. sigma_square = alpha * sigma_square +(1- alpha)* diff * diff
  31. self.exp_x.update(exp_x)
  32. self.sigma_square.update(sigma_square)
  33. self.init_count.update(init_count)
  34. sigma = np.sqrt(sigma_square)return value[0], x, exp_x, diff, sigma, diff >3*sigma # 返回 (key_by字段,原始信号,期望信号,实际偏移量,偏移量方差,是否异常)
  35. env = StreamExecutionEnvironment.get_execution_environment()# 为了验证分组特性, 添加一个分组字段
  36. ds = env.from_collection(
  37. collection=[('alice',float(i))for i in y
  38. ]+[('bob',float(i))for i in y
  39. ],
  40. type_info=Types.TUPLE([Types.STRING(), Types.FLOAT()]))# apply the process function onto a keyed stream
  41. ds =(
  42. ds.key_by(lambda value: value[0]).map(PEWMA(), output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.FLOAT(),Types.FLOAT(), Types.BOOLEAN()])))
  43. ds.print()# submit for execution
  44. env.execute()
  1. 16> (alice,300.4562,300.4562,0.0,0.0,false)
  2. 16> (alice,304.18646,302.32135,3.7302551,2.6376886,false)
  3. 16> (alice,353.12448,319.2557,50.803146,29.410172,false)
  4. 16> (alice,306.1917,315.98972,13.064006,26.294214,false)
  5. 16> (alice,307.6791,314.3276,8.310608,23.81012,false)
  6. 16> (alice,301.60532,312.2072,12.722278,22.347504,false)
  7. 16> (alice,307.79276,311.57657,4.414459,20.756935,false)
  8. 16> (alice,307.29886,311.04187,4.2777185,19.47515,false)
  9. 16> (alice,300.1238,309.82874,10.918053,18.718546,false)
  10. 16> (alice,302.92087,309.13797,6.9078774,17.891825,false)
  11. ......

转 table api

  1. from pyflink.table import(DataTypes, Schema, StreamTableEnvironment)
  2. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  3. table = t_env.from_data_stream(
  4. ds,
  5. Schema
  6. .new_builder().column("f0", DataTypes.STRING()).column("f1", DataTypes.FLOAT()).column("f2", DataTypes.FLOAT()).column("f3", DataTypes.FLOAT()).column("f4", DataTypes.FLOAT()).column("f5", DataTypes.BOOLEAN()).build()).alias("user","raw","expected","diff","sigma","isAbnomal")
  7. df = table.to_pandas()
  8. df = df[df["user"]=='alice'].reset_index()
  9. df

在这里插入图片描述

matplotlib 画图

  1. _, ax = plt.subplots(1,1,figsize=(20,5))
  2. df[["raw","expected","diff","sigma","isAbnomal"]].plot(ax=ax)
  3. locs =list(df[df['isAbnomal']].index)
  4. plt.plot(locs, y[locs],'ro')

在这里插入图片描述
结果分析:

  • 初始阶段,漏报一次脉冲异常
  • 信号阶跃后,漏报两个脉冲异常
  • 平稳状态下,误报两次,毕竟3sigma

和 EWMA 对比

EWMA 的方差收敛更慢,更容易产生漏报,所以该论文的改进效果是有的
在这里插入图片描述


本文转载自: https://blog.csdn.net/itnerd/article/details/143104774
版权归原作者 颹蕭蕭 所有, 如有侵权,请联系我们删除。

“pyflink 时序异常检测——PEWMA”的评论:

还没有评论