0


日志=》kafka》ELK

kELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana;
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能;它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
**Kibana **也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志

logstach(日志收集)->Elasticsearch(日志存储和搜索)->Kibana(查看日志,可视化)

为什么要使用elk?
ELK 组件在海量日志系统的运维中,可用于解决以下主要问题:- 分布式日志数据统一收集,实现集中式查询和管理
故障排查
安全信息和事件管理
报表功能

我们为什么用kafka,一定要通过kafka吗
不是,可以直接logback到ELK的,但是为什么使用kafka接收日志呢,是为了减少logstash对于日志进入时的压力。kafka的特性使用过的人应该都清楚,拥有这10W级别每秒的单机吞吐量,所以很适合作为数据来源缓冲区。

logback.xml

 <!-- kafkaAppender 输出日志到kafka -->
    <appender name="kafkaAppender" 
         class="com.td.ai.frame.uni.platform.oaudit.unify.config.KafkaAppender">
        <bootstrapServers>kafka-servers</bootstrapServers>
        <topic>kafka-topic</topic>
    </appender>

 <!-- 要输出日志的类 -->
    <logger name="logKafka" level="info">
        <appender-ref ref="kafkaAppender"/>
    </logger>
    <!-- 异步传递策略,建议选择异步,不然连接kafka失败,会阻挡服务启动 -->
    <appender name="Async" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender"/>
    </appender>
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    private static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);

    private String topic = "***";

  
    private Producer<String, String> producer;

    @Override
    public void start() {
        super.start();
        if (producer == null) {
            Properties props = new Properties();
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "SCRAM-SHA-512");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule\n" +
                    "required username=\"***\"" +
                    "password=\"****\";");
            props.put("bootstrap.servers", topic);
            //判断是否成功,我们指定了“1”将会阻塞消息
            props.put("acks", "1");
            props.put("retries", 3);
            props.put("batch.size", 262144);
            //延迟10s,10s内数据会缓存进行发送\
            props.put("linger.ms", 10);
            props.put("buffer.memory", 67108864);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("metric.reporters", "com.ctg.kafka.clients.reporter.KafkaClientMetricsReporter");
            props.put("client.id", ""***);
            producer = new KafkaProducer<String, String>(props);

        }

    }

    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
        String msg = iLoggingEvent.getFormattedMessage();
        String message = "";
        InetAddress localHost = null;
        try {
            localHost = Inet4Address.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        String hostIp = localHost.getHostAddress();
        String hostName = localHost.getHostName();
        Date date = new Date();
        SimpleDateFormat sdformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//24小时制
        String datetime = sdformat.format(date);
        JSONObject json = new JSONObject();
       
        json.put("podIP", hostIp);
        json.put("podName", hostName);
        message = json.toString();
//        System.out.println("向kafka推送日志开始:" + message);
        //key为null  2.4之前为轮询策略
        // 如果key值为null,并且使用了默认的分区器,Kafka会根据轮询(Random Robin)策略将消息均匀地分布到各个分区上。
        // 之后为粘性策略
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, null, message);
        //同步发动消息-改-异步发送消息
        try {
            Future<RecordMetadata> result = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        // 执行错误逻辑处理//否则就是成功喽
                        exception.printStackTrace();
                    }
                }
            });
//            System.out.println("分区:" + result.get().partition() + ",offset: " + result.get().offset());
        } catch (Exception e) {
            e.printStackTrace();
        }

        producer.flush();

    }

}

**服务器安装logstash **

  1. 查看一下路径
    pwd
    应该显示/app/logstash或者/data/logstash

  2. 将tar包上传

  3. 执行以下命令
    tar -zxvf logstash-7.5.2.tar.gz(自己的版本号)

cd logstash-7.5.2

mkdir config/conf
mkdir config/certs
mkdir logs

cd config/conf
上传js-sysname.conf

input {
    kafka {
        topics_pattern  => "kafkatopic"
        consumer_threads => 4
        group_id => "***-consumer" # kafka 消费组
        type => "kafka"
        security_protocol => "SASL_PLAINTEXT"
        sasl_mechanism => "SCRAM-SHA-512"
        jaas_path => "/home/crmapp/logstash-7.5.2/config/certs/kafka_client_jaas.conf"
        bootstrap_servers => "*****"
        codec => "json"
    }
}
 
filter {
  ruby{
    code => "event.set('index_day',event.get('@timestamp').time.localtime('+08:00').strftime('%Y.%m.%d'))"
  }
 
}

output {
    
        elasticsearch {
       hosts => ["*****"]
       index => "***a-log-%{index_day}"
            user => "**"
            password => "这里写es的密码"
        }
    
}

cd ../certs
上传kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="**"
password="****";
};

cd ..
vim pipelines.yml
将下面的加到"# Example of two pipelines:"这一行下面

  • pipeline.id:js-sysname
    pipeline.workers: 2
    path.config: "/app/logstash/logstash-7.5.2/config/conf/-js-sysname.conf"

cd /app/logstash/logstash-7.5.2/
nohup bin/logstash -r true --config.reload.automatic >> logs/logstash.log &

  1. 查看日志
    tail -100f logs/logstash.log
    启动需要时间,如果没有erorr日志,没有提示连不上kafka或者elasricsearch即可
标签: kafka elk elasticsearch

本文转载自: https://blog.csdn.net/qq_37411545/article/details/129871692
版权归原作者 antyyy123 所有, 如有侵权,请联系我们删除。

“日志=》kafka》ELK”的评论:

还没有评论