// 文档简介
文档内容:
文档介绍了如何部署和配置一个ELK(Elasticsearch、Logstash、Kibana)日志收集系统的详细步骤。主要内容包括以下几个部分:
- Elasticsearch 集群部署:- 涵盖了Elasticsearch的安装和配置,文档详细描述了如何在多个节点上设置集群,配置数据目录、权限以及JVM参数等。每个节点的角色和配置方法也得到了详细解释。
- Logstash 配置:- 文档接着介绍了如何配置Logstash,用于收集、过滤和转发日志数据。包括安装、插件配置、输入和输出的定义,确保日志数据可以从不同来源收集并传输到Elasticsearch。
- Filebeat 和 Kafka:- 文档还涵盖了Filebeat和Kafka的部署和配置,Filebeat作为轻量级日志传输工具,用于将日志文件推送到Logstash或Kafka。Kafka作为消息队列,确保日志数据的可靠传输。
- Kibana 配置:- 最后,文档详细介绍了如何配置Kibana,作为Elasticsearch的前端UI,允许用户可视化、搜索和分析存储在Elasticsearch中的日志数据。
角色作用:
在一个完整的ELK(Elasticsearch, Logstash, Kibana)日志收集系统中,各个组件通过以下流程进行协作,实现日志数据的高效收集、传输、存储和可视化:
- Filebeat:
日志处理流程从Filebeat开始。Filebeat是一个轻量级的日志传输工具,它部署在需要监控日志的服务器上。Filebeat会实时读取指定的日志文件,并将日志数据发送到下游系统(此架构中为Kafka)。通过这种方式,Filebeat将分布在各个服务器上的日志数据集中收集到一处,确保数据的完整性和实时性。
- Kafka:
在此架构中,Filebeat会将日志数据发送到Kafka,Kafka是一个分布式消息队列系统。Kafka的引入增强了系统的可靠性和可扩展性,尤其是在处理高吞吐量的日志数据时。Kafka会临时存储日志数据,并保证数据在传输过程中不会丢失。多个消费者(Logstash)可以从Kafka中读取日志数据,进行进一步的处理。
- Logstash:
Logstash作为数据处理管道,此架构中从Kafka接收到日志数据后,对其进行过滤、解析和格式化处理。Logstash可以根据预定义的规则对数据进行清洗、提取有用信息并转换格式,以便更好地适应后续的存储和分析。处理后的日志数据会被Logstash发送到Elasticsearch中进行存储和索引。
- Elasticsearch:
Elasticsearch是整个系统的核心数据存储引擎。它接收来自Logstash的日志数据,并对其进行存储和索引。Elasticsearch具备强大的搜索和分析功能,能够在大量数据中快速查找匹配的记录。数据被存储在Elasticsearch的分布式集群中,确保了高可用性和可靠性。
- Kibana:
最终,Kibana作为Elasticsearch的前端UI,为用户提供了一个强大的可视化平台。用户可以通过Kibana对存储在Elasticsearch中的日志数据进行查询、分析和展示。Kibana允许创建各种图表、仪表板和报告,使用户能够实时监控系统状态、分析历史数据以及发现潜在问题。
- 流程总结:
在这个流程中,日志数据从分布在不同服务器上的日志文件开始,经过Filebeat收集后传输到Kafka进行临时存储。然后,Logstash从Kafka读取数据,进行清洗和处理,并将其发送到Elasticsearch进行存储和索引。最终,Kibana提供了一个用户友好的界面,帮助用户对这些日志数据进行深度分析和可视化展示。这一流程确保了日志数据的高效收集、可靠传输、强大存储和便捷分析,是现代运维和监控系统中的关键组成部分。
流程:
Filebeat:
作用:实时读取和收集日志文件。
操作:将日志数据发送到Kafka。
优点:轻量级,低资源消耗,适用于大量分布式服务器。
Kafka:
作用:分布式消息队列系统,用于日志数据的临时存储和可靠传输。
操作:接收来自Filebeat的数据,多个Logstash实例可以从Kafka读取数据。
优点:高吞吐量、可靠性和可扩展性。
Logstash:
作用:数据处理管道,负责日志数据的过滤、解析和格式化。
操作:从Kafka读取数据,处理后将数据发送到Elasticsearch。
优点:强大的数据处理能力,支持多种插件和数据转换。
Elasticsearch:
作用:分布式搜索和分析引擎,负责存储和索引日志数据。
操作:接收来自Logstash的数据,进行存储和索引,以支持高效搜索和分析。
优点:强大的搜索引擎能力,支持大规模数据的实时查询。
Kibana:
作用:数据可视化和分析平台,提供用户界面。
操作:从Elasticsearch中查询数据,生成图表、仪表板和报告。
优点:用户友好的界面,支持实时数据监控和深度分析。
一、ElasticSearch 集群部署
系统IP部署应用节点CentOS 750.50.50.100elasticsearch 7.10.0node1CentOS 750.50.50.101elasticsearch 7.10.0node2CentOS 750.50.50.102elasticsearch 7.10.0node3
1.1、部署 ES7.10.0
1.1.1 下载rpm包安装
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.0-x86_64.rpm
yum install-y elasticsearch-7.10.0-x86_64.rpm
1.1.2 创建数据目录赋予权限
mkdir-p /data/elasticsearch/{logs,data,apps}
chmod-R750 /data/elasticsearch/{data,logs}
1.2、配置ES
1.2.1 修改主配置
vim /data/elasticsearch/elasticsearch.yml
cluster.name: ES7-cluster
node.name: ES7-node1
network.host: 0.0.0.0
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
node.master: true
node.data: true
bootstrap.memory_lock: false
http.port: 9200
discovery.seed_hosts: ["50.50.50.100:9300","50.50.50.101:9300","50.50.50.102:9300"]
cluster.initial_master_nodes: ["ES7-node1","ES7-node2","ES7-node3"]
bootstrap.system_call_filter: false#xpack.security.enabled: true#xpack.security.transport.ssl.enabled: true#xpack.security.transport.ssl.verification_mode: certificate#xpack.security.transport.ssl.keystore.path: elastic-certificates.p12#xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
http.cors.enabled: true
http.cors.allow-origin: "*"
1.2.2 配置 ES 运行时使用的 Java 虚拟机参数 。
内存分配:通过指定
-Xms和
-Xmx参数来设置初始堆大小和最大堆大小。
垃圾回收器:通过指定
-XX:+UseConcMarkSweepGC或者
-XX:+UseG1GC来选择垃圾回收器。
监控和调试:通过添加
-D参数来设置系统属性,例如设置远程调试端口。
GC 日志:通过指定
-Xloggc参数来启用垃圾回收日志,并指定日志输出的位置。
其他 JVM 参数:你还可以根据需要添加其他的 JVM 参数,例如设置线程栈大小、设置 IO 相关参数等。
vim /data/elasticsearch/jvm.options
## JVM configuration################################################################## IMPORTANT: JVM heap size#################################################################### You should always set the min and max JVM heap## size to the same value. For example, to set## the heap to 4 GB, set:#### -Xms4g## -Xmx4g#### See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html## for more information################################################################### Xms represents the initial size of total heap space# Xmx represents the maximum size of total heap space-Xms1g-Xmx1g################################################################## Expert settings#################################################################### All settings below this section are considered## expert settings. Don't tamper with them unless## you understand what you are doing#################################################################### GC configuration8-13:-XX:+UseConcMarkSweepGC
8-13:-XX:CMSInitiatingOccupancyFraction=758-13:-XX:+UseCMSInitiatingOccupancyOnly
## G1GC Configuration# NOTE: G1 GC is only supported on JDK version 10 or later# to use G1GC, uncomment the next two lines and update the version on the# following three lines to your version of the JDK# 10-13:-XX:-UseConcMarkSweepGC# 10-13:-XX:-UseCMSInitiatingOccupancyOnly14-:-XX:+UseG1GC
14-:-XX:G1ReservePercent=2514-:-XX:InitiatingHeapOccupancyPercent=30## JVM temporary directory-Djava.io.tmpdir=\${ES_TMPDIR}## heap dumps# generate a heap dump when an allocation from the Java heap fails# heap dumps are created in the working directory of the JVM-XX:+HeapDumpOnOutOfMemoryError# specify an alternative path for heap dumps; ensure the directory exists and# has sufficient space-XX:HeapDumpPath=/var/lib/elasticsearch
# specify an alternative path for JVM fatal error logs-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log
## JDK 8 GC logging8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:/var/log/elasticsearch/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=328:-XX:GCLogFileSize=64m
# JDK 9+ GC logging9-:-Xlog:gc*,gc+age=trace,safepoint:file=/var/log/elasticsearch/gc.log:utctime,pid,tags:filecount=32,filesize=64m
1.3、系统优化
1.3.1 增加系统的最大打开文件数
修改 * soft nofile 65536 和 * hard nofile 65536 的目的是增加系统的最大打开文件数限制。
默认情况下,Linux系统对单个用户同时打开的文件数量有一定限制,这个限制可以通过修改配置文件 /etc/security/limits.conf 中的 soft nofile 和 hard nofile 参数来调整。
soft nofile 参数设置了用户能够打开的最大文件数限制,而 hard nofile 参数设置了系统整体能够支持的最大文件数限制。
通过将这两个参数设置为较大的值,可以增加系统所能处理的并行文件数量,从而提高系统的性能和扩展性。
这对于一些需要同时操作大量文件的应用程序或者服务器来说特别重要
vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
1.3.2 增加系统中允许的最大进程数
修改
* soft nproc 4096和
root soft nproc unlimited这两个配置的目的是为了调整Linux系统的最大进程数量限制。
默认情况下,Linux系统对于每个用户和用户组都有最大进程数量的限制。
通过将
* soft nproc设置为较大的值(例如65535)和将
root soft nproc设置为
unlimited,可以增加系统中允许的最大进程数量,从而提高系统的并发处理能力。
这对于使用大量进程的应用程序或者需要处理大量并发连接的服务器非常有用。
vim /etc/security/limits.d/90-nproc.conf
# Default limit for number of user's processes to prevent# accidental fork bombs.# See rhbz #432903 for reasoning.
* soft nproc 4096
root soft nproc unlimited
1.3.3 修改系统的内核参数
修改/etc/sysctl.conf的目的是为了修改系统的内核参数配置。
通过修改这个文件,可以对系统的各种参数进行调整,以便优化系统的性能和稳定性。
例如,可以修改共享内存大小、消息队列个数以及其他与系统资源相关的参数。
这样可以根据实际需求来调整系统的配置,以获得更好的性能和稳定性。
vim /etc/sysctl.conf
# Kernel sysctl configuration file for Red Hat Linux## For binary values, 0 is disabled, 1 is enabled. See sysctl(8) and# sysctl.conf(5) for more details.# Controls IP packet forwarding
net.ipv4.ip_forward =0# Controls source route verification
net.ipv4.conf.default.rp_filter =1# Do not accept source routing
net.ipv4.conf.default.accept_source_route =0# Controls the System Request debugging functionality of the kernel
kernel.sysrq =0# Controls whether core dumps will append the PID to the core filename.# Useful for debugging multi-threaded applications.
kernel.core_uses_pid =1# Controls the use of TCP syncookies
net.ipv4.tcp_syncookies =1# Disable netfilter on bridges.
net.bridge.bridge-nf-call-ip6tables =0
net.bridge.bridge-nf-call-iptables =0
net.bridge.bridge-nf-call-arptables =0# Controls the default maxmimum size of a mesage queue
kernel.msgmnb =65536# Controls the maximum size of a message, in bytes
kernel.msgmax =65536# Controls the maximum shared segment size, in bytes
kernel.shmmax =68719476736# Controls the maximum number of shared memory segments, in pages
kernel.shmall =4294967296
vm.max_map_count =655360############ TCP Optimize ############
net.ipv4.tcp_fin_timeout =15
net.ipv4.tcp_keepalive_time =30
net.ipv4.tcp_tw_reuse =1
net.ipv4.tcp_tw_recycle =1
net.ipv4.ip_local_port_range =102465000
net.ipv4.tcp_timestamps =0
kernel.msgmni =128
net.ipv4.tcp_max_tw_buckets =10000
net.ipv4.tcp_sack =1
net.ipv4.tcp_window_scaling =1
net.ipv4.tcp_rmem =8192838860816777216
net.ipv4.tcp_wmem =8192838860816777216
net.core.wmem_default =8388608
net.core.rmem_default =8388608
net.core.rmem_max =16777216
net.core.netdev_max_backlog =262144
net.core.somaxconn =262144
net.ipv4.tcp_max_orphans =262144
net.ipv4.tcp_synack_retries =2
net.ipv4.tcp_syn_retries =2
net.ipv4.tcp_mem =94500000915000000927000000########################################
sysctl-p
1.4、启动 ES
1.4.1 添加启动环境
sed-i'10a JAVA_HOME=/usr/share/elasticsearch/jdk' /etc/sysconfig/elasticsearch
1.4.2 启动
1.4.2.1
CentOS7系统
启动命令
systemctl start elasticsearch
1.4.2.2
CentOS6系统
启动命令
/etc/init.d/elasticsearch start
1.5、查看集群节点
curl localhost:9200/_cat/nodes?pretty
1.6、集群添加用户安全认证
node1创建证书、秘钥,直接回车先不设置密码。
将证书、秘钥上传至node2、node3。
node1-3修改集群配置,添加认证配置并重启服务。
1.6.1 创建证书ca
/usr/share/elasticsearch/bin/elasticsearch-certutil ca
1.6.2 颁发证书
/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
1.6.3 保存证书密码
/usr/share/elasticsearch/bin/elasticsearch-keystore create
1.6.4 将证书放在安装目录下
cp /usr/share/elasticsearch/elastic-certificates.p12 /etc/elasticsearch/
1.6.5 证书密钥添加读写权限
chmod664 /etc/elasticsearch/elastic-certificates.p12
chmod664 /etc/elasticsearch/elasticsearch.keystore
1.6.6 停止ES7.10
systemctl stop elasticsearch
1.6.7 将证书密钥上传至其他节点
tar zcf K.tar.gz elastic-certificates.p12 elasticsearch.keystore
1.6.8 修改配置添加安全认证配置
vim /etc/elasticsearch/elasticsearch.yml
cluster.name: ES7-cluster
node.name: ES7-node1
network.host: 0.0.0.0
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
node.master: true
node.data: true
bootstrap.memory_lock: false
http.port: 9200
discovery.seed_hosts: ["50.50.50.100:9300","50.50.50.101:9300","50.50.50.102:9300"]
cluster.initial_master_nodes: ["ES7-node1","ES7-node2","ES7-node3"]
bootstrap.system_call_filter: false
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
http.cors.enabled: true
http.cors.allow-origin: "*"
1.6.9 启动ES7.10
systemctl start elasticsearch
1.7、添加用户密码并验证
- 设置用户密码,最后查看集群节点验证
1.7.1 查看集群节点
curl localhost:9200/_cat/nodes?pretty
1.7.2 添加用户密码
# 建议密码都设置一样的,保存好密码。
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
1.7.3 查看集群节点
curl--user elastic:password localhost:9200/_cat/nodes?pretty
二、Kibana 监控部署
系统IP部署应用CentOS 750.50.50.100kibana 7.10.0
2.1、部署 Kibana7.10.0
2.1.1 下载二进制包安装
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.10.0-linux-x86_64.tar.gz
2.1.2 创建安装目录
mkdir /data/elasticsearch/apps
tar-zxf kibana-7.10.0-linux-x86_64.tar.gz
mv kibana-7.10.0-linux-x86_64 /data/elasticsearch/apps/kibana-7.10.0
2.2、配置 Kibana 连接 ES
vim /data/elasticsearch/apps/kibana-7.10.0/config/kibana.yml
server.port: 5601
server.host: "50.50.50.100"
elasticsearch.hosts: ["http://50.50.50.100:9200,http://50.50.50.101:9200,http://50.50.50.102:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "password"
2.3、编写启停脚本
2.3.1 编写启动脚本
vim /data/elasticsearch/apps/kibana-7.10.0/start.sh
#!/bin/bashpath=\`pwd\`ES_PID=\`ps aux |grepjava|grep-w elasticsearch |awk'{print \$2}'\`Kibana_PID=\`ps aux |grep"/node/bin/node"|grep kibana-7.10.0 |awk'{print \$2}'\`if[-n"\$ES_PID"];thenif[-z"\$Kibana_PID"];thennohup\$path/bin/kibana --allow-root > /dev/null 2>&1&echo'kibana started successfully ! [OK]'elseecho"kibana has been launched PID:\$Kibana_PID"fielseecho'ES is not started [failed]...'fi
chmod755 /data/elasticsearch/apps/kibana-7.10.0/start.sh
2.3.2 编写停止脚本
vim /data/elasticsearch/apps/kibana-7.10.0/stop.sh
#!/bin/bashKibana_PID=\`ps aux |grep"/node/bin/node"|grep kibana-7.10.0 |awk'{print \$2}'\`if[-n"\$Kibana_PID"];thenkill-9\$Kibana_PID&&echo"Kibana PID:\$Kibana_PID is killed ! [OK]"fi
chmod755 /data/elasticsearch/apps/kibana-7.10.0/start.sh
2.4、启动并访问
/data/elasticsearch/apps/kibana-7.10.0/start.sh
http://50.50.50.100:5601
三、Kafka 集群部署
系统IP部署应用my.idbroker.idCentOS 750.50.50.100kafka 2.13 2.8.210CentOS 750.50.50.101kafka 2.13 2.8.221CentOS 750.50.50.102kafka 2.13 2.8.232
准备以下压缩包
jdk-8u161-linux-x64.tar.gz
kafka_2.13-2.8.2.tgz
wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
3.1、部署 Kafka2.8.2
3.1.1 安装 JDK
tar zxf jdk-8u161-linux-x64.tar.gz
mv jdk1.8.0_161 /usr/local
chown root.root -R /usr/local/jdk1.8.0_161
vim /etc/profile
exportPATH=/usr/local/jdk1.8.0_161/bin:$PATH
source /etc/profile
java-version
3.1.2 安装 Kafka
tar zxf kafka_2.13-2.8.2.tgz
mv kafka_2.13-2.8.2 /usr/local/
chown root.root -R /usr/local/kafka_2.13-2.8.2
3.1.3 创建 zookeeper、Kafka 数据目录
mkdir-p /usr/local/kafka_2.13-2.8.2/{kafka_data,zk_data}
3.2、配置 Zookeeper 、Kafka
3.2.1 配置 Zookeeper
cd /usr/local/kafka_2.13-2.8.2/config
mv zookeeper.properties zookeeper.properties.bak
vim zookeeper.properties
#### zk数据目录 ###
dataDir="/usr/local/kafka_2.13-2.8.2/zk_data"
clientPort=2161
maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=2
### zkserver配置 ###
server.1=50.50.50.100:2266:3266
server.2=50.50.50.101:2266:3266
server.3=50.50.50.102:2266:3266
echo1> /usr/local/kafka_2.13-2.8.2/zk_data/myid # 50.50.50.100echo2> /usr/local/kafka_2.13-2.8.2/zk_data/myid # 50.50.50.101echo3> /usr/local/kafka_2.13-2.8.2/zk_data/myid # 50.50.50.102
3.2.2 配置 Kafka
cd /usr/local/kafka_2.13-2.8.2/config
mv server.properties server.properties.bak
# 50.50.50.100 broker.id=0
# 50.50.50.101 broker.id=1
# 50.50.50.102 broker.id=2
broker.id=0
# 本机 IP
listeners=PLAINTEXT://50.50.50.100:9002
### 副本数量 ###
default.replication.factor=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
### 数据存储目录 ####
log.dirs="/usr/local/kafka_2.13-2.8.2/kafka_data"
### 分区数 ###
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
### 连接zk ###
zookeeper.connect=50.50.50.100:2161,50.50.50.101:2161,50.50.50.102:2161
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
3.3、编写启动脚本
3.3.1 编写 Zookeeper 启动脚本
vim /usr/local/kafka_2.13-2.8.2/zk_start.sh
#!/bin/bashkafka_path="/usr/local/kafka_2.13-2.8.2"nohup$kafka_path/bin/zookeeper-server-start.sh $kafka_conf_path/config/zookeeper.properties >>$kafka_path/zk_nohup.out 2>&1&
chmod744 /usr/local/kafka_2.13-2.8.2/zk_start.sh
3.3.2 启动 zookeeper
sh /usr/local/kafka_2.13-2.8.2/zk_start.sh
3.3.3 编写 Kafka 启动脚本
vim /usr/local/kafka_2.13-2.8.2/kafka_start.sh
#!/bin/bashkafka_path="/usr/local/kafka_2.13-2.8.2"nohup$kafka_path/bin/kafka-server-start.sh $kafka_path/config/server.properties >>$kafka_path/kafka_nohup.out 2>&1&
chmod744 /usr/local/kafka_2.13-2.8.2/kafka_start.sh
3.3.4 启动 Kafka
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh
3.4、测试
3.4.1 创建 topic
cd /usr/local/kafka_2.13-2.8.2/bin
./kafka-topics.sh --create--zookeeper50.50.50.100:2161 --replication-factor 1--partitions1--topic topic-nginx
这条命令用于在Apache Kafka集群中创建一个新的主题。Apache Kafka是一个流处理平台,它允许发布和订阅记录流,类似于消息队列。在Kafka中,主题是发布和订阅的类别或者说是消息的种类。
这个命令的详细参数解释如下:
./kafka-topics.sh --create
:这个部分告诉kafka-topics.sh脚本创建一个新的主题。--zookeeper localhost:2161
:指定Zookeeper的地址。Zookeeper是Kafka的一个组件,用于管理集群的状态。--replication-factor 1
:这是主题的副本因子,表示主题的副本数。如果设置为1,那么每个分区只有一个副本。如果设置为2,那么每个分区会有两个副本,以此类推。副本的存在可以提高系统的可用性和持久性。--partitions 1
:指定主题的分区数。在Kafka中,数据是按分区进行存储的,每个分区是一个逻辑上的日志文件,可以理解为一组有序的消息。这个参数决定了主题有多少个这样的逻辑日志文件。--topic
:这是要创建的主题的名称。在命令中,你需要在此位置指定主题的名称。
所以,这条命令的作用是在指定的Zookeeper集群中创建一个具有一个分区、一个副本因子为1的新主题。
2.4.2 列出 topic
cd /usr/local/kafka_2.13-2.8.2/bin
./kafka-topics.sh --describe--zookeeper50.50.50.100:2161
./kafka-topics.sh --describe--zookeeper50.50.50.100:2161 --topic topic-nginx
3.4.3 日志信息
server.log:这是Kafka服务器的主要日志文件,包含有关服务器启动、关闭、配置等信息。它还记录了所有请求和操作的详细信息。这个日志对于调试Kafka服务器的行为和问题非常有用。
controller.log:这个日志文件由Kafka控制器使用,它负责在Kafka集群中管理和协调副本。这个日志包含有关控制器启动、关闭、重新均衡等操作的详细信息。
state-change.log:这个日志文件记录了Kafka集群中与副本状态改变有关的事件,例如领导者选举、副本创建和删除等。这个日志对于监控和调试分布式系统中的状态变化非常有用。
这些日志文件通常位于Kafka安装目录的logs文件夹下,但也可以通过修改Kafka配置文件来更改这个位置。
3.5、Kafka ACL认证
3.5.1 添加身份验证配置
vim /usr/local/kafka_2.13-2.8.2/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"password="password"# 必须有一个用户对应 username 账户 和 password 密码user_admin="password";};
chmod644 /usr/local/kafka_2.13-2.8.2/config/kafka_server_jaas.conf
3.5.2 更改server.properties配置
vim /usr/local/kafka_2.13-2.8.2/config/server.properties
broker.id=0
auto.create.topics.enable=false
delete.topic.enable=false
listeners=SASL_PLAINTEXT://50.50.50.100:9002
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 超级用户设置 对应必须设置的用户
super.users=User:admin
### 副本数量 ###
default.replication.factor=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
### 数据存储目录 ####
log.dirs=/usr/local/kafka_2.11-1.0.2/kafka_data
### 分区数 ###
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
### 连接zk ###
zookeeper.connect=50.50.50.100:2161,50.50.50.101:2161,50.50.50.102:2161
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
3.5.3 修改启动脚本
vim /usr/local/kafka_2.13-2.8.2/kafka_start.sh
#!/bin/bashexportKAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka_2.13-2.8.2/config/kafka_server_jaas.conf "nohup /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties >> /usr/local/kafka_2.13-2.8.2/kafka_nohup.out 2>&1&
配置完毕后 kafka 就可以重新启动了。
启动后看到报错:
ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
建议把 Zookeeper 的安全认证也加上,具体和 kafka 差不多,不加不会影响,就会输出个报错而已。
四、Logstash 部署
系统IP部署应用CentOS 750.50.50.100logstash 7.10.0
4.1、部署 Logstash7.10.0
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
tar-zxf logstash-7.10.0-linux-x86_64.tar.gz
mv logstash-7.10.0 /usr/local
mkdir /usr/local/logstash-7.10.0/logs
chown root.root -R /usr/local/logstash-7.10.0
4.2、配置 Logstash
vim /usr/local/logstash-7.10.0/config/logstash.yml
path.data: /usr/local/logstash-7.10.0/data
path.logs: /usr/local/logstash-7.10.0/logs
http.host:"0.0.0.0"
vim /usr/local/logstash-7.10.0/config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="password";
};
vim /usr/local/logstash-7.10.0/config/logstash.conf
input {
kafka {
bootstrap_servers => "50.50.50.100:9002,50.50.50.101:9002,50.50.50.102:9002"
topics =>["topic-nginx"]
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
jaas_path => "/usr/local/logstash-7.10.0/config/kafka_client_jaas.conf"
# codec => json
auto_offset_reset => "earliest"
}}
filter {
grok {
match =>{ "message" => "%{IPORHOST:client_ip}-- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:bytes_sent} \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:x_forwarded_for}\" %{NUMBER:request_time}" }}}
output {
elasticsearch {
hosts =>["http://50.50.50.100:9200","http://50.50.50.101:9200","http://50.50.50.102:9200"]
index => "nginx-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "password"
}}
4.3、配置介绍
这个配置文件的目的是将从 Kafka 消费的 Nginx 日志数据处理后发送到 Elasticsearch 进行存储和索引,以便于后续的查询和分析。以下是配置的详细说明:
4.3.1 Input (输入)
input {
kafka {
bootstrap_servers => "50.50.50.100:9002,50.50.50.101:9002,50.50.50.102:9002"
topics =>["topic-nginx"]
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
jaas_path => "/usr/local/logstash-7.10.0/config/kafka_client_jaas.conf"
# codec => json
auto_offset_reset => "earliest"
}}
bootstrap_servers: 指定 Kafka 集群的 broker 地址。这里包含了三个 broker (50.50.50.100:9002, 50.50.50.101:9002, 50.50.50.102:9002),用于实现高可用性和负载均衡。
topics: 指定要消费的 Kafka 主题,这里是 "topic-nginx",即 Nginx 日志被发布到的主题。
sasl_mechanism 和 security_protocol: 这些配置指定了 Kafka 的安全协议,使用 SASL_PLAINTEXT 和 PLAIN 机制进行认证。
jaas_path: 提供了 Kafka 认证所需的 JAAS 配置文件路径。
codec => json: 由于 Nginx 日志可能是以 JSON 格式存储在 Kafka 中,因此这里指定 codec => json 以确保正确解析 Kafka 中的消息。
auto_offset_reset => "earliest": 指定从最早的消息开始消费。
4.3.2 Filter (过滤)
filter {
grok {
match =>{ "message" => "%{IPORHOST:client_ip}-- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:bytes_sent} \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:x_forwarded_for}\" %{NUMBER:request_time}" }}}
grok 过滤器: 用于解析 Nginx 日志的结构,将日志中的各个部分提取为独立的字段。例如,提取客户端 IP、请求时间、HTTP 方法、URL、响应代码等信息。grok 使用了一种基于正则表达式的模式匹配来解析非结构化日志数据。
4.3.3 Output (输出)
output {
elasticsearch {
hosts =>["http://50.50.50.100:9200","http://50.50.50.101:9200","http://50.50.50.102:9200"]
index => "nginx-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "password"
}}
Elasticsearch hosts: 指定了多个 Elasticsearch 节点的地址 (50.50.50.100:9200, 50.50.50.101:9200, 50.50.50.102:9200) 以实现高可用性和负载均衡。
index: 指定日志数据在 Elasticsearch 中的索引名称格式,这里使用了 nginx-logs-%{+YYYY.MM.dd},表示每天一个新的索引。例如,2024 年 9 月 4 日的日志将被存储在 nginx-logs-2024.09.04 索引中。
user 和 password: Elasticsearch 的认证信息,用于保护集群。
这份配置文件的主要目的是将 Nginx 访问日志从 Kafka 中提取出来,解析并处理日志数据后,发送到 Elasticsearch 集群中进行存储和索引。通过这种方式,可以方便地对日志数据进行搜索、分析和可视化。
4.4、编写启停脚本
4.4.1 编写启动脚本
vim /usr/local/logstash-7.10.0/start.sh
#!/bin/bash
/usr/local/logstash-7.10.0/bin/logstash -f /usr/local/logstash-7.10.0/config/logstash.conf >> logstash_nohup.out 2>&1&
chmod +x /usr/local/logstash-7.10.0/start.sh
4.4.1 编写停止脚本
vim /usr/local/logstash-7.10.0/stop.sh
#!/bin/bashps aux |grep'[l]ogstash-7.10.0'|awk'{print $2}'|xargs-rkill-9
chmod +x /usr/local/logstash-7.10.0/stop.sh
4.5、启动
/usr/local/logstash-7.10.0/start.sh
五、Filebeat 部署
系统IP部署应用CentOS 710.10.10.100filebeat 5.1.1、nginx
5.1、部署 Filebeat5.1.1
5.1.1 下载rpm包部署
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.1.1-x86_64.rpm
yum -yinstall filebeat-5.1.1-x86_64.rpm
5.2、配置 Filebeat
vim /etc/filebeat/filebeat.yml
filebeat.prospectors:-input_type: log
paths:- /usr/local/lnmp/nginx-1.20.0/logs/access-log
tags: www.test.com-nginx
fields:type: WebNginxLog
host: 10.10.10.100
location: BeiJing
fields_under_root:trueoutput.kafka:hosts:["50.50.50.100:9002","50.50.50.101:9002","50.50.50.102:9002"]username:'admin'password:'password'topic:'topic-nginx'partition.round_robin:reachable_only:falserequired_acks:1compression: gzip
max_message_bytes:1000000
5.3、启动
systemctl start filebeat.service
六、测试
6.1、访问 Nginx 查看 Filebeat 日志
http://10.10.10.100
tail-f /var/log/filebeat/filebeat
2024-09-04T11:10:33+08:00 INFO Non-zero metrics in the last 30s: registrar.writes=1 publish.events=4 registrar.states.update=4 libbeat.kafka.call_count.PublishEvents=1 libbeat.kafka.published_and_acked_events=4 libbeat.publisher.published_events=4
这条 Filebeat 日志提供了一些关于 Filebeat 在过去 30 秒内执行的操作和事件的统计信息。让我们逐一分析这些指标:
- registrar.writes=1
这是指 Filebeat 将其内部状态(包括文件偏移量等信息)写入 registry 文件的次数。在这里,它表示 Filebeat 在过去的 30 秒内更新了其状态一次。``````registry 文件用于跟踪已处理日志文件的位置,防止在 Filebeat 重新启动或日志文件变动时重复读取相同的数据。
- publish.events=4
这意味着在过去 30 秒内,Filebeat 成功从日志文件中读取并准备发送的事件数量为 4。``````每个“事件”通常对应于日志文件中的一行日志。
- registrar.states.update=4
表示 Filebeat 更新了内部状态 4 次,与 publish.events 相对应。这通常意味着它处理了 4 个事件,并将这些事件对应的文件偏移量更新到 registry 中。
- libbeat.kafka.call_count.PublishEvents=1
这是指 Filebeat 调用了 Kafka 输出插件的 PublishEvents 函数 1 次。它表明 Filebeat 已尝试将读取的事件发送到 Kafka。
- libbeat.kafka.published_and_acked_events=4
表示 Filebeat 成功将 4 个事件发布到 Kafka,并且这些事件已被 Kafka 确认(acknowledged)。``````这个指标意味着 Kafka 接受并确认了这些事件,确保它们已成功写入 Kafka 主题中。
- libbeat.publisher.published_events=4
这是 Filebeat 通过其输出模块发布的事件总数,统计范围包括 Kafka 和其他可能的输出。这里的数量也是 4,表明所有被读取的事件都被发布了。
这条日志表明,在过去 30 秒内,Filebeat 成功从日志文件中读取了 4 条日志,并将它们发布到 Kafka 主题中。发布过程成功,并得到了 Kafka
的确认。同时,Filebeat 更新了它的状态,以便在下一次启动时知道哪些日志已经处理过。这表明 Filebeat 正在按预期工作,没有出现错误或数据
丢失。
6.2、Kibana 查看访问日志
6.2.1 访问url创建索引
堆栈管理 > 索引模式 > 创建索引模式
http://50.50.50.100:5601/app/management/kibana/indexPatterns/create
6.3、多次访问查看日志收集情况
- 浏览器刷新 6 次
http://10.10.10.100
- 查看 Filebeat 日志
2024-09-04T14:33:33+08:00 INFO Non-zero metrics in the last 30s: filebeat.harvester.running=1 libbeat.kafka.published_and_acked_events=6 registrar.states.update=6 registrar.writes=1 libbeat.publisher.published_events=6 filebeat.harvester.open_files=1 filebeat.harvester.started=1 libbeat.kafka.call_count.PublishEvents=1 publish.events=6
- 查看 Kibana
七、定义 Elasticsearch 索引模板
7.1、创建索引模板
http://50.50.50.100:5601/app/dev_tools#/console
PUT _template/nginx_logs_template
{"index_patterns":["nginx-logs-*"],"mappings":{"properties":{"client_ip":{"type":"ip"},"timestamp":{"type":"date","format":"dd/MMM/yyyy:HH:mm:ss Z"},"method":{"type":"keyword"},"url":{"type":"text"},"response_code":{"type":"integer"},"bytes_sent":{"type":"integer"},"user_agent":{"type":"text"},"request_time":{"type":"float"},"x_forwarded_for":{"type":"text"},"referrer":{"type":"text"},"http_version":{"type":"keyword"}}}}
7.2、刷新 Kibana 索引模式
http://50.50.50.100:5601/app/management/kibana/indexPatterns
7.3、查看字段是否已缓存映射
- 一切正常
八、配置仪表盘
版权归原作者 仙长道号-Linux真人 所有, 如有侵权,请联系我们删除。