kafka常用命令
查看所有topic
./kafka-topics.sh --zookeeper 10.1.10.163:2181 --list
查看kafka中指定topic的详情
./kafka-topics.sh --zookeeper 10.1.10.163:2181 --topic ai_jl_analytic --describe
查看消费者consumer的group列表
./kafka-consumer-groups.sh --bootstrap-server 10.1.10.163:9092 --list
创建topic
./kafka-topics.sh --create --zookeeper 10.1.10.163:2181 --replication-factor 1 --partitions 1 --topic mytest_topic
查看指定的group
./kafka-consumer-groups.sh --bootstrap-server 10.1.10.163:9092 --group ai-trace --describe
模拟生产者生产消息:
./kafka-console-producer.sh --broker-list 10.1.10.163:9092 --topic mytest_topic
模拟消费者消费消息:
./kafka-console-consumer.sh --bootstrap-server 10.1.10.163:9092 --from-beginning --topic mytest_topic
启动logstash
//启动logstash --path.data 指定数据存储路径
./logstash -f /data/logstash/config/kafka_applog_to_es.conf --path.data=/data/logstash/data1/
例子一 logstash input kafka output ES 配置文件
input{
kafka{
bootstrap_servers => "xx.x.xx.xxx:9092,xx.x.xx.xxx:9092,xx.x.xx.xxx:9092"
topics_pattern => "tag-nginx-log"
consumer_threads => 5
decorate_events => true
auto_offset_reset => "latest"
client_id => "logstash-bpm-nginx-access-log1"
group_id => "logstashbpmnginxaccesslog1"
}
}
filter {
grok {
match => [
"message","%{HTTPDATE:timestamp}"
]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
elasticsearch {
hosts => ["xx.x.xx.xxx:9200"]
index => "tag-nginx-log-%{+YYYY.MM.dd}"
}
}
client_id 与 group_id 保证唯一即可。
例子二 logstash input kafka output ES 配置文件
input{
kafka{
bootstrap_servers => "XX.X.XX.XXX:9092,XX.X.XX.XXX:9092,XX.X.XX.XXX:9092"
topics_pattern => "tag*"
consumer_threads => 5
decorate_events => true
auto_offset_reset => "latest"
client_id => "logstash1-1"
group_id => "logstash1"
codec => multiline {
pattern => "^\d{4}-\d{2}-\d{2}"
negate => true
what => "previous"
charset => "UTF-8"
}
}
}
filter{
grok{
match=>{
"message"=>"(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s\[(?<appName>.*)\]\s\[(?<profile>.*)\]\s\[((?<thread>.*))\]\s\[(?<traceId>.*)\]\s\[(?<class>[A-Za-z0-9/./?]{1,50})\]\s(?<level>[A-Z]{4,5}):(?<msg>.*)"
}
remove_field=>"message"
}
ruby {
code => "event.set('index_date', event.timestamp.time.localtime + 8*60*60)"
}
mutate {
convert => ["index_date", "string"]
gsub => ["index_date", "T([\S\s]*?)Z", ""]
gsub => ["index_date", "-", "."]
}
}
output {
elasticsearch {
hosts => ["XX.X.XX.XXX:9200"]
index => "%{profile}-%{appName}-%{index_date}"
}
}
启动filebeat
//可以防止日志爆盘,将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 & disown
filebeat output kafka 基本配置文件 filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /logs/nginx/access.log
fields:
log_topic: tag-nginx-access
output.kafka:
enabled: true
hosts: ["xx.x.xx.xxx:9092","xx.x.xx.xxx:9092","xx.x.xx.xxx:9092"]
topic: tag-nginx-log
version: "0.10"
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10240
processors:
- drop_fields:
fields: ["host","input","agent","ecs","@version","flags","log","fields"]
ignore_missing: false
logging.level: info
name: "xx.x.x.xx"
版权归原作者 不求甚解误入此道 所有, 如有侵权,请联系我们删除。