0


ELK采集MySQL慢日志实现

文章目录

一、ELK采集MySQL慢日志架构

  • MySQL 服务器安装 Filebeat 作为 agent 收集 slowLog
  • Filebeat 读取 MySQL 慢日志文件做简单过滤传给 Kafka 集群
  • Logstash 读取 Kafka 集群数据并按字段拆分后转成 JSON 格式存入 ES 集群
  • Kibana 读取ES集群数据展示到web页面上在这里插入图片描述

二、filebeat

  使用filebeat实时采集mysql慢日志推送到kafka中

filebeat.inputs:

#slowlog
- type: log
  enabled: true
  paths:
    - /data/mysql_tmp/slow.log
  fields_under_root: true
  fields:
    clustername: test
    type: mysql_slowlog
    log_topic: mysql-slowlog
  close_inactive: 5m
  scan_frequency: 1m
  tail_files: true
  multiline:
    pattern: "^# Time: "
    negate: true
    match: after

output.kafka:
  version: x.x.x.x
  hosts: ["xxx:9092","xxx:9092"]
  topic: "%{[log_topic]}"
  partition.round_robin:
    reachable_only: false
  worker: 1
  required_acks: 1
  compression: gzip
  max_message_bytes: 10485760
  keep_alive: 10s
  client_id: "filebeat"

max_procs: 1
  

三、logstash

  使用logstash将kakfa中的message,拆分后转成 JSON 格式存入 ES 集群

input {
    kafka {
        bootstrap_servers => "xxx:9092"
        topics => "mysql-slowlog"
    }
}
filter {
    grok {
       # 有ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]

        # 有ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]

        # 无ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]

        # 无ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
    }
    date {
            match => ["timestamp_mysql","UNIX"]
            target => "@timestamp"
    }
    mutate {
            remove_field => ["@version","message","timestamp_mysql"]
    }
}
output {
    elasticsearch {
        hosts  => ["xxx.xxx.xxx.xxx:9200""]
        index  => "%{type}-%{+YYYY.MM.dd}"
            }
        }

四、es+kibana

  读取Kibana 读取ES集群数据展示到web页面上
es中mysql慢日志索引模板

{
  "order": 0,
  "template": "mysqld-slow-*",
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "mysqld-slow": {
      "numeric_detection": true,
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        },
        "@version": {
          "type": "string"
        },
        "query_time": {
          "type": "double"
        },
        "row_sent": {
          "type": "string"
        },
        "rows_examined": {
          "type": "string"
        },
        "clientip": {
          "type": "string"
        },
        "clienthost": {
          "type": "string"
        },
        "id": {
          "type": "integer"
        },
        "lock_time": {
          "type": "string"
        },
        "dbname": {
          "type": "keyword"
        },
        "user": {
          "type": "keyword"
        },
        "query": {
          "type": "string",
          "index": "not_analyzed"
        },
        "tags": {
          "type": "string"
        },
        "timestamp": {
          "type": "string"
        },
        "type": {
          "type": "string"
        }
      }
    }
  },
  "aliases": {}
}

在这里插入图片描述

标签: mysql elk kafka

本文转载自: https://blog.csdn.net/qq_42979842/article/details/129035050
版权归原作者 _雪辉_ 所有, 如有侵权,请联系我们删除。

“ELK采集MySQL慢日志实现”的评论:

还没有评论