0


filebeat收集系统日志到kafka+elk中+elastalert钉钉告警

环境:centos7 7台 elk :7.17.1 kafka:3.5.1 filebeat:7.4.2
ip分布:
kafka集群:192.168.50.154-156
es集群:192.168.70.61-63
kibana:192.168.70.62 logstash:192.168.70.64
数据流向结构图:
在这里插入图片描述

一.安装filebeat

使用ansible统一安装

#安装ansible
yum install epel-release -y
yum isntall ansible -ymkdir /etc/ansible/play-book
[root@rabbit1-61 app]# cat /etc/ansible/play-book/filebt.yml 
---
- name: Deploy Filebeat
  hosts: install
  become: true# Run tasks as sudo
  tasks:
    - name: Create destination directory on remote host
      ansible.builtin.file:
        path: "/app/"
        state: directory

    - name: Extract filebeat archive
      unarchive: src=/app/filebeat-7.4.2-linux-x86_64.tar.gz dest=/app/
                
    - name: Rename extracted directory to "filebeat"
      ansible.builtin.command:
        cmd: mv"/app/filebeat-7.4.2-linux-x86_64""/app/filebeat"

    - name: Copy filebeat.service to /etc/systemd/system/
      copy: src=/app/filebeat.service dest=/etc/systemd/system/filebeat.service

    - name: Copy filebeat.yml to /app/filebeat/
      copy: src=/app/filebeat.yml dest=/app/filebeat/
    - name: Start Filebeat service
      ansible.builtin.service:
        name: filebeat
        state: started
# 在/etc/ansible/hosts文件中最后添加需要安装filebeat的机器[install]192.168.70.62
192.168.70.61
192.168.70.63
192.168.70.64
192.168.50.154
192.168.50.155
192.168.50.156
#  建立免密  安装ansible机器的是192.168.70.61# 使用ssh-copy-id将公钥传递给对应的ip# filebeat中的配置文件[root@rabbit1-61 ansible]# cat /app/filebeat/filebeat.yml 
filebeat.inputs:
- input_type: log
  paths:
    - /var/log/messages
  encoding: utf-8
  document_type: messages
  fields_under_root: true
  fields:
    log_type: messages
- input_type: log
  paths:
    - /var/log/secure
  encoding: utf-8
  document_type: secure
  fields_under_root: true
  fields:
    log_type: secure
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
output.kafka:
  hosts: ["192.168.50.154:9092", "192.168.50.155:9092", "192.168.50.156:9092"]
  topic: 'topic-syslogs'#命名topic
  partition.round_robin:
    reachable_only: false
  required_acks: 0
  compression: gzip
  max_message_bytes: 1000000#将filebeat做成服务[root@rabbit1-61 ansible]# cat /etc/systemd/system/filebeat.service [Unit]Description=Filebeat
Documentation=https://www.elastic.co/guide
After=network.target

[Service]Type=simple
Restart=always
WorkingDirectory=/app/filebeat
ExecStart=/app/filebeat/filebeat -c /app/filebeat/filebeat.yml -eUser=root

[Install]WantedBy=multi-user.target

# 所有的东西准备好后 执行filebt.yml文件
ansible-playbook filebt.yml

二 安装logstash

# 还是使用rpm安装wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.1-x86_64.rpm
yum installjava-yrpm-ivh logstash-7.17.1-x86_64.rpm
# 在/etc/logstash/conf.d文件夹下新建文件[root@localhost conf.d]# cat topic_log.conf 
input {
  kafka {
    group_id =>"topic-log"
    topics_pattern =>"topic-.*"
    decorate_events =>true
    bootstrap_servers =>"192.168.50.154:9200,192.168.50.155:9092,192.168.50.156:9092"# 配置kakfa集群ip
    consumer_threads =>10
    codec =>"json"}}

filter {
  mutate {split=>{"[@metadata][kafka][topic]"=>"-"}
    add_field =>{"topic"=>"%{[@metadata][kafka][topic][1]}"}# 传递到kibana中,那么就变成了syslog索引了,不再是topic-syslog}}

output {
  elasticsearch {
    hosts =>["192.168.70.61:9200", "192.168.70.62:9200", "192.168.70.63:9200"]#配置es集群ip
    user =>"elastic"
    password =>"123456789"
    index =>"%{topic}-%{+YYYY.MM.dd}"# 需要配置pipeline.yml文件[root@localhost logstash]# pwd
/etc/logstash
[root@localhost logstash]# cat /etc/logstash/pipelines.yml # This file is where you define your pipelines. You can define multiple.# For more information on multiple pipelines, see the documentation:#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html#- pipeline.id: main#  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: topic_log
  path.config: "/etc/logstash/conf.d/topic_log.conf"# 准备好后,使用systemctl 启动logstash
systemctl start logstash

三 访问knowstreaming (如果topic不存在也可以新建)

可以先访问es的192.168.70.61:9200/_cat/indices 查看是否有相关的信息
在这里插入图片描述
访问knowstreaming 新建topic
在这里插入图片描述

四 访问kibana,导入topic

从下图位置进入添加索引
在这里插入图片描述
添加索引
在这里插入图片描述
在这里插入图片描述
查看索引
在这里插入图片描述

五 nginx配置(参考)

如果是nginx,其实也差不多,将filebeat收集nginx存放日志的文件,例如:

filebeat.inputs:
- input_type: log
  paths:
    - /var/log/openresty/access.log
  encoding: utf-8
  document_type: access
  tail_files: true
  json.keys_under_root: true
  fields_under_root: true
  fields:
    log_type: access
    host_ip: 192.168.60.x
- input_type: log
  paths:
    - /var/log/openresty/error.log
  encoding: utf-8
  document_type: error

  tail_files: true
  fields_under_root: true
  fields:
    log_type: error
    host_ip: 192.168.60.x

- input_type: log
  paths:
    - /var/log/nginx/default.log
  encoding: utf-8
  document_type: default

  json.keys_under_root: true
  fields_under_root: true
  fields:
    log_type: default
    host_ip: 192.168.60.x

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["192.168.60.x:9092", "192.168.60.x:9092", "192.168.60.x:9092"]# message topic selection + partitioning
  topic: 'nginx'
  partition.round_robin:
    reachable_only: false
  required_acks: 0
  compression: gzip
  max_message_bytes: 1000000# 对应的logstash的配置:[root@logstash-60-6 conf.d]# cat nginx.conf 
input {
  kafka {
    group_id =>"nginx"
    topics_pattern =>"nginx"
    decorate_events =>true
    bootstrap_servers =>"192.168.60.x:9200,192.168.60.x:9092,192.168.60.x:9092"
    consumer_threads =>10
    codec =>"json"}}

filter {
  mutate {
    convert =>{"upstream_time"=>"float"}}}

output {
  elasticsearch {
    hosts =>["192.168.60.x:9200", "192.168.60.x:9200", "192.168.60.x:9200"]
    user =>"elastic"
    password =>"123456789"
    index =>"nginx-%{+YYYY.MM.dd}_new"}
  stdout { 
    codec  => rubydebug {
      metadata =>true}}}

elastart安装

建议使用python3.8.0版本的。3.6.8会提示openssl的版本不行。3.9版本会显示版本太高
参考:https://mp.weixin.qq.com/s/8W4NTwb5HKRBxdFUq6QplA

# python安装
yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make libffi-devel
cd /app
wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz
tar-xvf Python-3.8.0.tgz
cd Python-3.8.0
./configure prefix=/usr/local/python3
make&makeinstall
yum install gcc libffi-devel  openssl-devel -yln-s /usr/local/python3/bin/python3.8 /usr/bin/python3
ln-s /usr/local/python3/bin/pip3.8 /usr/bin/pip3
pip3 install-U pip
pip3 install"setuptools>=11.3"
pip3 install elastalert
ln-s /usr/local/python3/bin/elastalert-create-index  /usr/bin/elastalert-create-index
ln-s /usr/local/python3/bin/elastalert-rule-from-kibana /usr/bin/elastalert-rule-from-kibana
ln-s /usr/local/python3/bin/elastalert-test-rule  /usr/bin/elastalert-test-rule
ln-s /usr/local/python3/bin/elastalert  /usr/bin/elastalert
git clone https://github.com/Yelp/elastalert.git
cd elatalert
cp  config.yaml.example config.yaml
echo> config.yaml

cat config.yaml
[root@rabbitredis-3 elastalert]# cat config.yaml
rules_folder: /app/elastalert/rules
run_every:
  minutes: 1
buffer_time:
  minutes: 15
es_host: 192.168.70.63
es_port: 9200
es_username: "elastic"
es_password: "123456"
writeback_index: elastalert_status
max_scrolling_count: 1
alert_time_limit:
  days: 2# 可以做成服务[root@rabbitredis-3 elastalert]# cat /etc/systemd/system/elastalert.service [Unit]Description=elastalert
After=multi-user.target

[Service]Type=simple
WorkingDirectory=/app/elastalert
ExecStart=/usr/bin/python3  -m elastalert.elastalert --verbose--config ./config.yaml

[Install]WantedBy=multi-user.target

# 查看是否安装好[root@rabbitredis-3 elastalert]# systemctl status elastalert
● elastalert.service - elastalert
   Loaded: loaded (/etc/systemd/system/elastalert.service; disabled; vendor preset: disabled)
   Active: active (running) since 三 2024-05-22 14:26:27 CST; 43min ago
 Main PID: 20216(python3)
    Tasks: 12
   Memory: 40.2M
   CGroup: /system.slice/elastalert.service
           └─20216 /usr/bin/python3 -m elastalert.elastalert --verbose--config ./config.yaml

钉钉报警插件安装

wget https://github.com/xuyaoqiang/elastalert-dingtalk-plugin/archive/master.zip
unzip master.zip #不需要安装requirements.txt的内容,因为pip3更新后,下载了elastalert后,对应的包也会下载cd elastalert-dingtalk-plugin-master
cp-r elastalert_modules /app/elastalert/

测试使用elastalert

[root@localhost rules]# pwd
/app/elastalert/rules
[root@localhost rules]# cat hostname.yaml 
name: Agent Hostname Alert
type: frequency
index: syslogs-*
use_strftime_index: false
num_events: 1
timeframe:
  minutes: 2
filter:
- term:
    agent.hostname.keyword: "first-1"
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=2xxxxxxxxxxxxxxxxxxxxxxxxxxxe"
dingtalk_msgtype: text
alert_text: |
  Agent hostname message first-1" has exceeded 2 events in the last 1 minutes.
  Timestamp: {0}
  Message: {1}
alert_text_args:
  - "@timestamp"
  - message
alert_text_type: alert_text_only

[root@localhost rules]# elastalert-test-rule hostname.yaml #测试 测试中如果设置了钉钉告警,并不会发送告警信息到钉钉机器人[root@localhost elastalert]# /usr/bin/python3 -m elastalert.elastalert --verbose --config ./config.yaml --rule hostname.yaml #可以收到告警信息

实现效果:
在这里插入图片描述

nginx的状态码告警

需要在elastalert_modules的目录下新建time_enhancement.py文件

from elastalert.util import pretty_ts
from elastalert.enhancements import BaseEnhancement

classTimeEnhancement(BaseEnhancement):defprocess(self,match):if'@timestamp'inmatch:
            pretty_time = pretty_ts(match['@timestamp'])match['@timestamp']= pretty_time
# 404状态码告警[root@localhost elastalert]# cat http.yaml es_host: 192.168.60.2
es_port:9200es_username:"elastic"es_password:"dsfsadfasfe"name: DingDingNginxHttpCode404
type: spike
index: nginx-*
description:"查询时间段内客户端访问http的次数"timeframe:#minutes: 1seconds:30# 上个时间的数量需大于此值threshold_ref:2spike_height:1spike_type: up
filter:-query:query_string:query:"status: 404"query_key: http_host
max_query_size:10000doc_type: access

realert:minutes:1alert_text: "
now_time:{}\n
message:  当前 30s  {} 404状态大于1,的数量为 {},请检查应用状态!"
alert_text_type: alert_text_only
alert_text_args:-"@timestamp"- http_host
- spike_count
match_enhancements:-"elastalert_modules.time_enhancement.TimeEnhancement"include:["instance_name","app_name","level","logger_name","message","stack_trace","client_ip"]alert:-"elastalert_modules.dingtalk_alert.DingTalkAlerter"dingtalk_webhook:"https://oapi.dingtalk.com/robot/send?access_token=2xxxxxxxxxxxxxxxxxe"dingtalk_msgtype:"text"

在这里插入图片描述

标签: kafka elk

本文转载自: https://blog.csdn.net/qq_48736646/article/details/135941022
版权归原作者 追风筝的小青年 所有, 如有侵权,请联系我们删除。

“filebeat收集系统日志到kafka+elk中+elastalert钉钉告警”的评论:

还没有评论