kELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana;
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能;它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
**Kibana **也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志
logstach(日志收集)->Elasticsearch(日志存储和搜索)->Kibana(查看日志,可视化)
为什么要使用elk?
ELK 组件在海量日志系统的运维中,可用于解决以下主要问题:- 分布式日志数据统一收集,实现集中式查询和管理
故障排查
安全信息和事件管理
报表功能
我们为什么用kafka,一定要通过kafka吗
不是,可以直接logback到ELK的,但是为什么使用kafka接收日志呢,是为了减少logstash对于日志进入时的压力。kafka的特性使用过的人应该都清楚,拥有这10W级别每秒的单机吞吐量,所以很适合作为数据来源缓冲区。
logback.xml
<!-- kafkaAppender 输出日志到kafka -->
<appender name="kafkaAppender"
class="com.td.ai.frame.uni.platform.oaudit.unify.config.KafkaAppender">
<bootstrapServers>kafka-servers</bootstrapServers>
<topic>kafka-topic</topic>
</appender>
<!-- 要输出日志的类 -->
<logger name="logKafka" level="info">
<appender-ref ref="kafkaAppender"/>
</logger>
<!-- 异步传递策略,建议选择异步,不然连接kafka失败,会阻挡服务启动 -->
<appender name="Async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="kafkaAppender"/>
</appender>
public class KafkaAppender extends AppenderBase<ILoggingEvent> {
private static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
private String topic = "***";
private Producer<String, String> producer;
@Override
public void start() {
super.start();
if (producer == null) {
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule\n" +
"required username=\"***\"" +
"password=\"****\";");
props.put("bootstrap.servers", topic);
//判断是否成功,我们指定了“1”将会阻塞消息
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 262144);
//延迟10s,10s内数据会缓存进行发送\
props.put("linger.ms", 10);
props.put("buffer.memory", 67108864);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("metric.reporters", "com.ctg.kafka.clients.reporter.KafkaClientMetricsReporter");
props.put("client.id", ""***);
producer = new KafkaProducer<String, String>(props);
}
}
@Override
protected void append(ILoggingEvent iLoggingEvent) {
String msg = iLoggingEvent.getFormattedMessage();
String message = "";
InetAddress localHost = null;
try {
localHost = Inet4Address.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
String hostIp = localHost.getHostAddress();
String hostName = localHost.getHostName();
Date date = new Date();
SimpleDateFormat sdformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//24小时制
String datetime = sdformat.format(date);
JSONObject json = new JSONObject();
json.put("podIP", hostIp);
json.put("podName", hostName);
message = json.toString();
// System.out.println("向kafka推送日志开始:" + message);
//key为null 2.4之前为轮询策略
// 如果key值为null,并且使用了默认的分区器,Kafka会根据轮询(Random Robin)策略将消息均匀地分布到各个分区上。
// 之后为粘性策略
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, null, message);
//同步发动消息-改-异步发送消息
try {
Future<RecordMetadata> result = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 执行错误逻辑处理//否则就是成功喽
exception.printStackTrace();
}
}
});
// System.out.println("分区:" + result.get().partition() + ",offset: " + result.get().offset());
} catch (Exception e) {
e.printStackTrace();
}
producer.flush();
}
}
**服务器安装logstash **
查看一下路径
pwd
应该显示/app/logstash或者/data/logstash将tar包上传
执行以下命令
tar -zxvf logstash-7.5.2.tar.gz(自己的版本号)
cd logstash-7.5.2
mkdir config/conf
mkdir config/certs
mkdir logs
cd config/conf
上传js-sysname.conf
input {
kafka {
topics_pattern => "kafkatopic"
consumer_threads => 4
group_id => "***-consumer" # kafka 消费组
type => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "SCRAM-SHA-512"
jaas_path => "/home/crmapp/logstash-7.5.2/config/certs/kafka_client_jaas.conf"
bootstrap_servers => "*****"
codec => "json"
}
}
filter {
ruby{
code => "event.set('index_day',event.get('@timestamp').time.localtime('+08:00').strftime('%Y.%m.%d'))"
}
}
output {
elasticsearch {
hosts => ["*****"]
index => "***a-log-%{index_day}"
user => "**"
password => "这里写es的密码"
}
}
cd ../certs
上传kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="**"
password="****";
};
cd ..
vim pipelines.yml
将下面的加到"# Example of two pipelines:"这一行下面
- pipeline.id:js-sysname
pipeline.workers: 2
path.config: "/app/logstash/logstash-7.5.2/config/conf/-js-sysname.conf"
cd /app/logstash/logstash-7.5.2/
nohup bin/logstash -r true --config.reload.automatic >> logs/logstash.log &
- 查看日志
tail -100f logs/logstash.log
启动需要时间,如果没有erorr日志,没有提示连不上kafka或者elasricsearch即可
版权归原作者 antyyy123 所有, 如有侵权,请联系我们删除。