文章目录
一、ELK采集MySQL慢日志架构
- MySQL 服务器安装 Filebeat 作为 agent 收集 slowLog
- Filebeat 读取 MySQL 慢日志文件做简单过滤传给 Kafka 集群
- Logstash 读取 Kafka 集群数据并按字段拆分后转成 JSON 格式存入 ES 集群
- Kibana 读取ES集群数据展示到web页面上
二、filebeat
使用filebeat实时采集mysql慢日志推送到kafka中
filebeat.inputs:
#slowlog
- type: log
enabled: true
paths:
- /data/mysql_tmp/slow.log
fields_under_root: true
fields:
clustername: test
type: mysql_slowlog
log_topic: mysql-slowlog
close_inactive: 5m
scan_frequency: 1m
tail_files: true
multiline:
pattern: "^# Time: "
negate: true
match: after
output.kafka:
version: x.x.x.x
hosts: ["xxx:9092","xxx:9092"]
topic: "%{[log_topic]}"
partition.round_robin:
reachable_only: false
worker: 1
required_acks: 1
compression: gzip
max_message_bytes: 10485760
keep_alive: 10s
client_id: "filebeat"
max_procs: 1
三、logstash
使用logstash将kakfa中的message,拆分后转成 JSON 格式存入 ES 集群
input {
kafka {
bootstrap_servers => "xxx:9092"
topics => "mysql-slowlog"
}
}
filter {
grok {
# 有ID有use
match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
# 有ID无use
match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
# 无ID有use
match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
# 无ID无use
match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
}
date {
match => ["timestamp_mysql","UNIX"]
target => "@timestamp"
}
mutate {
remove_field => ["@version","message","timestamp_mysql"]
}
}
output {
elasticsearch {
hosts => ["xxx.xxx.xxx.xxx:9200""]
index => "%{type}-%{+YYYY.MM.dd}"
}
}
四、es+kibana
读取Kibana 读取ES集群数据展示到web页面上
es中mysql慢日志索引模板
{
"order": 0,
"template": "mysqld-slow-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"mysqld-slow": {
"numeric_detection": true,
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"@version": {
"type": "string"
},
"query_time": {
"type": "double"
},
"row_sent": {
"type": "string"
},
"rows_examined": {
"type": "string"
},
"clientip": {
"type": "string"
},
"clienthost": {
"type": "string"
},
"id": {
"type": "integer"
},
"lock_time": {
"type": "string"
},
"dbname": {
"type": "keyword"
},
"user": {
"type": "keyword"
},
"query": {
"type": "string",
"index": "not_analyzed"
},
"tags": {
"type": "string"
},
"timestamp": {
"type": "string"
},
"type": {
"type": "string"
}
}
}
},
"aliases": {}
}
本文转载自: https://blog.csdn.net/qq_42979842/article/details/129035050
版权归原作者 _雪辉_ 所有, 如有侵权,请联系我们删除。
版权归原作者 _雪辉_ 所有, 如有侵权,请联系我们删除。