logstash读取kafka的topics,根据内容提取指定字段然后自动创建es索引。
input {
kafka{
bootstrap_servers => "192.168.1.15:9092"
auto_offset_reset => "latest"
topics_pattern => "svc.*" #topics_pattern支持正则匹配,topics不支持
consumer_threads => 5
codec => "json"
}
}
filter {
mutate {
gsub => [
"fieldname", "#", "-"
] 用于替换指定字符
split => ["message","#"] #分割字符串获取服务名
add_field => { "service" => "%{[message][3]}" }
}
#下面移除不必要的字段
mutate {
remove_field => ["@version"]
remove_field => ["@timestamp"]
remove_field => ["tags"]
remove_field => ["_id"]
remove_field => ["_type"]
remove_field => ["_index"]
remove_field => ["_score"]
}
}
output {
elasticsearch{
hosts => "192.168.1.15:9200"
index => "log-%{service}"
}
stdout {
codec => rubydebug
}
}
版权归原作者 ice_bird 所有, 如有侵权,请联系我们删除。