赛题
比赛链接:第三届阿里云磐久智维算法大赛-天池大赛-阿里云天池 (aliyun.com)
大赛概况
庸医只知头痛医头脚痛医脚,凡良医者,必会抽丝剥茧,察其根本,方得药到病除。第一届和第二届磐久智维算法大赛,我们针对异常预测开展了积极的探索和卓有成效的实践。本届大赛我们延续对异常/故障这一领域的深入挖掘,以根因诊断为赛题,和各界同仁一起探讨根因诊断的新思路,共同追逐这一人工智能应用的明珠。
在大规模IT设备、应用运维过程中,故障无可避免,而关键日志则是技术人员排查根因,对症下药的重要依据。近年来围绕日志分析,涌现出了众多先进技术,在不断突破技术瓶颈的同时,也为解决工业难题提供了越来越先进的武器装备。本次大赛开放大量不同类别的服务器运行日志,这些日志反应了服务器运行过程中各类部件的状态,能够在出现服务器故障时用来快速的定位出故障所在,这对于高效的修复故障、避免维修时间的浪费、降低服务器换件成本、提升系统可用性甚至减少客户投诉都至关重要。但准确的识别出故障根因并非易事,需要创新的思维,细致的分析。祝愿各界同仁享受比赛,开拓创新,取得好成绩。
问题描述
给定一段时间的系统日志数据,参赛者应提出自己的解决方案,以诊断服务器发生了哪种故障。具体来说,参赛者需要从组委会提供的数据中挖掘出和各类故障相关的特征,并采用合适的机器学习算法予以训练,最终得到可以区分故障类型的最优模型。数据处理方法和算法不限,但选手应该综合考虑算法的效果和复杂度,以构建相对高效的解决方案。
初赛会提供训练数据集,供参赛选手训练模型并验证模型效果使用。同时,也将提供测试集,选手需要对测试集中的数据诊断识别出故障类型,并将模型判断出的结果上传至竞赛平台,平台会根据提交的诊断结果,来评估模型的效果。
在复赛中,我们会进一步增加数据,并提供额外的其他种类的数据。面对进一步的问题和任务,选手需要提交一个docker镜像,镜像中需要包含用来进行故障诊断所需的所有内容,也即完整解决方案代码。其中,镜像中的代码需要能够根据输入的测试集文件(文件夹)位置,来对测试集中的故障数据进行诊断,并把诊断结果以指定的CSV文件格式输出到指定位置。
数据描述
1.初赛数据
1.1 SEL日志数据:
Table 1: SEL日志数据, 数据文件名: preliminary_sel_log_dataset.csv
FieldTypeDescriptionsnstringserver serial numbertimestringlog reported timemsgstringlogserver_modelstringserver model
1.2 训练标签数据:
Table 2: 训练标签数据, 数据文件名: preliminary_train_label_dataset.csv, preliminary_train_label_dataset_s.csv
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of serverlabelintfailure label, 0,1,2,3 四类故障
其中0类和1类表示CPU相关故障,2类表示内存相关故障,3类表示其他类型故障
注: 上述两个文件的总label数据对应”preliminary_sel_log_dataset.csv“中所有的日志。在比赛之初,组委会曾在”preliminary_sel_log_dataset.csv“中开放过一份不带label的log数据交由选手提交答案测评,现该部分log数据不变,并将其对应的的label(preliminary_train_label_dataset_s.csv)一并开放,选手有了更多的数据用于训练。
1.3 选手提交数据:
Table 3: 选手提交数据, 数据文件名: preliminary_submit_dataset_a.csv, 对应的log文件名:preliminary_sel_log_dataset_a.csv
选手需要使用preliminary_sel_log_dataset_a.csv中的日志内容,评测出对应的诊断结果,并填充到preliminary_submit_dataset_a.csv中,preliminary_submit_dataset_a.csv是选手需要提交到系统的最终结果文件。
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of server
注: 给定sn、fault_time两个字段信息,选手需要根据SEL日志信息给出最终的label。提交文件需要包含例子中给定的header(sn,fault_time,label),提交格式例子:
sn,fault_time,label
server_123,2019-08-16 02:12:00,0
注:选手提交文件请参见preliminary_submit_dataset_a.csv, 不是preliminary_submit_dataset.csv;选手提交文件请参见preliminary_submit_dataset_a.csv, 不是preliminary_submit_dataset.csv;选手提交文件请参见preliminary_submit_dataset_a.csv, 不是preliminary_submit_dataset.csv
1.4 SEL日志语料数据:
Table 4: SEL日志语料数据, 数据文件名: additional_sel_log_dataset.csv
FieldTypeDescriptionsnstringserver serial numbertimestringlog reported timemsgstringlog
注: 主要是给选手进行预训练用的数据,该数据集没有对应的label标签,也没有server_model字段,选手可以酌情使用
2.复赛数据(暂定)
2.1 SEL日志数据:
Table 5: SEL日志数据, 数据文件名: final_sel_log_dataset_*.csv
FieldTypeDescriptionsnstringserver serial numbertimestringlog reported timemsgstringlogserver_modelstringserver model
2.2 训练标签数据:
Table 6: 训练标签数据, 数据文件名: final_train_label_dataset_*.csv
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of serverlabelintfailure label, 0,1,2,3四类故障
2.3 选手提交数据:
Table 7: 选手提交数据, 数据文件名: final_submit_dataset_*.csv
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of server
注: 给定sn、fault_time两个字段信息,选手需要根据SEL日志信息给出最终的label。提交格式例子:
sn,fault_time,label
server_123,2019-08-16 02:12:00,0
2.4 补充日志数据1:
Table 8: 补充日志数据1, 数据文件名: final_venus_dataset_*.csv
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of servermodule_causestringmodule causemodulestringmodule
2.5 补充日志数据2:
Table 9: 补充日志数据2, 数据文件名: final_crashdump_dataset_*.csv
FieldTypeDescriptionsnstringserver serial numberfault_timestringfault time of serverfault_codestringfault code
提交格式
初赛阶段,选手需要将模型在测试集上的诊断结果保存为csv格式,并打包成zip压缩文件进行提交。形式如下:
sn,fault_time,label
server_123,2019-08-16 02:12:00,0
复赛阶段,最后的输出形式如下:
sn,fault_time,label
server_123,2019-08-16 02:12:00,0
注:
1)选手提交数据文件中出现的机器均需要进行诊断,并将结果写到上传文件中,否则评分为0。
2)若选手上传的结果文件中,同一个sn出现多次诊断结果,则评测程序会选取第一条用于评分。
3)在复赛中,参赛选手需要提交docker镜像,具体的提交方式及规范请参见镜像提交说明。
4)诊断结果保存为csv文件格式,保存的csv文件有header,无index; 第一列为sn,第二列为故障时间,格式为YYYY-MM-dd HH:mm:ss,字符串类型,第三列为故障类型,格式为整数0-3
评价指标(初赛)
本次竞赛采用多分类加权Macro F1-score作为评价指标, 根据具体场景化的诊断内容,定义相关术语和详细指标如下:
- 对于第ii类,有- Precision- #TP: 真正例的数量- #FP: 假正例的数量 Precision = # T P # T P + # F P \text { Precision }=\frac{# \mathrm{TP}}{# \mathrm{TP}+# \mathrm{FP}} Precision =#TP+#FP#TP- Recall- #FN: 假负例的数量 Recall = # T P # T P + # F N \text { Recall }=\frac{# \mathrm{TP}}{# \mathrm{TP}+# \mathrm{FN}} Recall =#TP+#FN#TP- F1-score F1-score = 2 × Precision × Recall ( Precision + Recall ) \text { F1-score }=\frac{2 \times \text { Precision } \times \text { Recall }}{(\text { Precision }+\text { Recall })} F1-score =( Precision + Recall )2× Precision × Recall
- 综合四类,即「两种CPU故障」(0-1)、「内存故障」(2)和「其他故障」(3),有- 对应该四类的权重向量 w v = { 3 7 , 2 7 , 1 7 , 1 7 } w \boldsymbol{v}=\left{\frac{3}{7}, \frac{2}{7}, \frac{1}{7}, \frac{1}{7}\right} wv={73,72,71,71}- Macro F1-score Macro F1-score = ∑ i ∈ { 0 , 1 , 2 , 3 } w i × F1-score -s i \text { Macro F1-score }=\sum_{i \in{0,1,2,3}} w_{i} \times \text { F1-score } \text {-s }_{i} Macro F1-score =i∈{0,1,2,3}∑wi× F1-score -s i
Baseline
本文BaseLine的数据处理参照了:baseline_template-天池实验室-实时在线的数据分析协作工具,享受免费计算资源 (aliyun.com),模型采用pytorch版的GRU。
模型与DataSet
新建model.py文件,插入代码:
from torch import nn
from torch.utils.data import Dataset
classRNN(nn.Module):def__init__(self, input_size):super(RNN, self).__init__()
self.rnn = nn.GRU(
input_size=input_size,
hidden_size=128,
num_layers=1,
batch_first=True,)
self.out = nn.Sequential(
nn.Linear(128,4),)
self.hidden =Nonedefforward(self, x):
r_out, self.hidden = self.rnn(x)# None 表示 hidden state 会用全0的 state
out = self.out(r_out)return out
classTrainSet(Dataset):def__init__(self, data, lables):# 定义好 image 的路径
self.data, self.label = data.float(), lables.float()def__getitem__(self, index):return self.data[index], self.label[index]def__len__(self):returnlen(self.data)
调用nn.GRU创建模型,input_size是输入的维度,nn.Linear(64, 4),4代表输出的类别是4类。
TrainSet类定义加载数据的方式。
训练
定义完模型就开始这篇文章最重要的部分:训练!新建train.py,插入如下代码:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import numpy as np
from torch import optim
from torch.utils.data import Dataset, DataLoader
import os
from model import RNN
DEVICE = torch.device('cuda'if torch.cuda.is_available()else'cpu')
导入需要的包。
判断是否存在cuda环境,如果存在DEVICE设置为cuda,如果没有则设置为cpu。
接下来是数据处理部分:
data_train = pd.read_csv('preliminary_sel_log_dataset.csv')
data_test = pd.read_csv('preliminary_sel_log_dataset_a.csv')
data = pd.concat([data_train, data_test])
from drain3 import TemplateMiner # 开源在线日志解析框架
from drain3.file_persistence import FilePersistence
from drain3.template_miner_config import TemplateMinerConfig
config = TemplateMinerConfig()
config.load('drain3.ini') ## 这个文件在drain3的github仓库里有
config.profiling_enabled = False
drain_file = 'comp_a_sellog'
persistence = FilePersistence(drain_file + '.bin')
template_miner = TemplateMiner(persistence, config=config)
##模板提取
for msg in data.msg.tolist():
template_miner.add_log_message(msg)
temp_count = len(template_miner.drain.clusters)
## 筛选模板
template_dic = {}
size_list = []
for cluster in template_miner.drain.clusters:
size_list.append(cluster.size)
size_list = sorted(size_list, reverse=True)[:200] ## 筛选模板集合大小前200条,这里的筛选只是举最简单的例子。
min_size = size_list[-1]
for cluster in template_miner.drain.clusters: ## 把符合要求的模板存下来
print(cluster.cluster_id)
if cluster.size >= min_size:
template_dic[cluster.cluster_id] = cluster.size
temp_count_f = len(template_dic)
def match_template(df, template_miner, template_dic):
msg = df.msg
cluster = template_miner.match(msg) # 匹配模板,由开源工具提供
if cluster and cluster.cluster_id in template_dic:
df['template_id'] = cluster.cluster_id # 模板id
df['template'] = cluster.get_template() # 具体模板
else:
df['template_id'] = 'None' # 没有匹配到模板的数据也会记录下来,之后也会用作一种特征。
df['template'] = 'None'
return df
data = data.apply(match_template, template_miner=template_miner, template_dic=template_dic, axis=1)
data.to_pickle(drain_file + '_result_match_data.pkl') # 将匹配好的数据存下来
df_data = pd.read_pickle(drain_file + '_result_match_data.pkl') # 读取匹配好模板的数据
df_data[df_data['template_id'] != 'None'].head()
def feature_generation(df_data, gap_list, model_name, log_source, win_list, func_list):
gap_list = gap_list.split(',')
dummy_list = set(df_data.template_id.unique())
dummy_col = ['template_id_' + str(x) for x in dummy_list]
for gap in gap_list:
df_data['collect_time_gap'] = pd.to_datetime(df_data.collect_time).dt.ceil(gap)
df_data = template_dummy(df_data)
df_data = df_data.reset_index(drop=True)
df_data = df_data.groupby(['sn', 'collect_time_gap']).agg(sum).reset_index()
df_data = feature_win_fun(df_data, dummy_col, win_list, func_list, gap)
df_data.to_pickle(
'cpu_diag_comp_sel_log_all_feature_' + gap + '_' + win_list + '_' + func_list + '.pkl') # 将构造好的特征数据存下来
return df_data
def template_dummy(df):
df_dummy = pd.get_dummies(df['template_id'], prefix='template_id')
df = pd.concat([df[['sn', 'collect_time_gap']], df_dummy], axis=1)
return df
def feature_win_fun(df, dummy_col, win_list, func_list, gap):
win_list = win_list.split(',')
func_list = func_list.split(',')
drop_col = ['sn']
merge_col = ['collect_time_gap']
df_out = df[drop_col + merge_col]
for win in win_list:
for func in func_list:
df_feature = df.groupby(drop_col).apply(rolling_funcs, win, func, dummy_col)
df_feature = df_feature.reset_index(drop=True).rename(columns=dict(zip(dummy_col, map(lambda x: x + '_' +
func + '_' + win,
dummy_col))))
df_out = pd.concat([df_out, df_feature], axis=1)
return df_out
def rolling_funcs(df, window, func, fea_col):
df = df.sort_values('collect_time_gap')
df = df.set_index('collect_time_gap')
df = df[fea_col]
df2 = df.rolling(str(window) + 'h')
if func in ['sum']:
df3 = df2.apply(sum_func)
else:
print('func not existed')
return df3
def sum_func(series):
return sum(series)
df_data.rename(columns={'time': 'collect_time'}, inplace=True)
feature_generation(df_data, '1h', '', '', '3', 'sum')
df_data = pd.read_pickle('cpu_diag_comp_sel_log_all_feature_1h_3_sum.pkl') # 读取之前构造好的特征数据
df_train_label = pd.read_csv('preliminary_train_label_dataset.csv')
df_train_label_s = pd.read_csv('preliminary_train_label_dataset_s.csv')
df_train_label = pd.concat([df_train_label, df_train_label_s])
df_train_label = df_train_label.drop_duplicates(['sn', 'fault_time', 'label'])
df_data_train = pd.merge(df_data[df_data.sn.isin(df_train_label.sn)], df_train_label, on='sn', how='left')
y = df_data_train['label']
x = df_data_train.drop(['sn', 'collect_time_gap', 'fault_time', 'label'], axis=1)
数据处理部分,我只修改了一些错误,详见:baseline_template-天池实验室-实时在线的数据分析协作工具,享受免费计算资源 (aliyun.com)
X_train, X_val, y_train, y_val = train_test_split(x, y, test_size=0.1, random_state=6)
X_train = np.array(X_train)
y_train = np.array(y_train)
df_tensor = torch.Tensor(X_train)
tensor_y = torch.Tensor(y_train)
n=X_train.shape[1]
print(n)
X_val = np.array(X_val)
y_val = np.array(y_val)
X_val = torch.Tensor(X_val)
y_val = torch.Tensor(y_val)
trainset = TrainSet(df_tensor, tensor_y)
valset = TrainSet(X_val, y_val)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
valloader = DataLoader(valset, batch_size=64, shuffle=False)
按照7:3的比例切分训练集和验证集。
将训练集和验证集转成torch.Tensor类型。
分别给训练集和验证集创建DataLoader对象。
n代表数据的维度,在后面创建模型的时候要用到。
EPOCH=300
modellr=0.0001
ACC=0
rnn = RNN(n)
rnn.to(device=DEVICE)
optimizer = optim.Adam(rnn.parameters(), lr=modellr)
loss_func=nn.CrossEntropyLoss()
EPOCH:迭代次数,设置为300。
modellr:学习率,设置为0.0001
ACC:记录验证集的最高分数。保存模型的时候,按照最高分数保存的。
创建rnn模型。
优化器选用Adam。
loss函数选用交叉熵。
完成上面的代码,接下来就是训练和验证部分的代码:
for step inrange(EPOCH):
rnn.train()
loss_train=0for tx, ty in trainloader:
data, target = tx.to(DEVICE, non_blocking=True), ty.to(DEVICE, non_blocking=True)
output = rnn(torch.unsqueeze(data, dim=1))
loss = loss_func(torch.squeeze(output), target.long())
print_loss = loss.data.item()
loss_train+=print_loss
optimizer.zero_grad()# clear gradients for this training step
loss.backward()# back propagation, compute gradients
optimizer.step()print("epoch:",str(step),"loss:",str(loss_train/len(trainloader)))if step %2:
torch.save(rnn,'rnn.pth')
rnn.eval()
correct =0
total_num =len(valloader.dataset)
loss_val=0for vx, vy in valloader:
data, target = vx.to(DEVICE, non_blocking=True), vy.to(DEVICE, non_blocking=True)
output = rnn(torch.unsqueeze(data, dim=1))
loss = loss_func(torch.squeeze(output), target.long())
print_loss = loss.data.item()
loss_val=loss_val+print_loss
_, pred = torch.max(torch.squeeze(output),1)
correct += torch.sum(pred == target)
acc = correct / total_num
print("Val Loss {},ACC {}\n".format(loss_val/len(valloader),acc))if acc > ACC:
torch.save(rnn,'best.pth')
ACC = acc
代码中注意的地方,由于GRU要求输入是三维的,但是我们的数据是二维的所以使用torch.unsqueeze(data, dim=1)增加一维。
在计算loss的时候,output多了一维,所以使用torch.squeeze将这一维去除。如果不理解将其打印出来就可以看到了。
完成上面的代码就可以开始训练了,运行结果如下:
训练完成后,接下来开始测试部分的编写
测试
新建test.py,插入代码:
import pandas as pd
import torch
import numpy as np
from torch import nn
from model import RNN
DEVICE = torch.device('cuda'if torch.cuda.is_available()else'cpu')
drain_file ='comp_a_sellog'
df_data = pd.read_pickle('cpu_diag_comp_sel_log_all_feature_1h_3_sum.pkl')# 读取之前构造好的特征数据
df_test_df = pd.read_csv('preliminary_submit_dataset_a.csv', index_col=0).reset_index()
df_test = pd.merge(df_data[df_data.sn.isin(df_test_df.sn)], df_test_df, on='sn', how='left')
res = df_test[['sn','fault_time']]print(df_test)
x_test = df_test.drop(['sn','collect_time_gap','fault_time'], axis=1)
model=torch.load('best.pth')
model.cpu()
x_test = np.array(x_test)
df_tensor = torch.Tensor(x_test)
output= model(torch.unsqueeze(df_tensor, dim=1))
_, pred = torch.max(torch.squeeze(output),1)
res['label']=pred
res = res.sort_values(['sn','fault_time'])
res = res.drop_duplicates(['sn','fault_time'], keep='last')
res.to_csv('comp_a_result_1.csv', index=0)
测试集处理部分也是参考了天池上Baseline。
load模型,并设置其device为cpu。
将测试集转为torch.Tensor对象。
预测,并保存结果。
总结
本文实现了第三届阿里云磐久智维算法大赛的BaseLine,该BaseLine基于pytoch版的GRU实现。通过对该BaseLine,可以学习到如何使用GRU完成对csv数据训练和测试。
希望能给大家带来帮助,谢谢!
版权归原作者 AI浩 所有, 如有侵权,请联系我们删除。