0


SpringBoot项目整合Kafka+es+logstash+kibana日志收集

    现在更多项目会把日志整理收集起来,方便客户或者开发查询日志。日志是项目中一个多而且杂的关键组织部分。

    今天将演示的就是kafka+ELK【elasticSearch+logstash+kibana】组成的日志分析系统。其中kafka起到了异步的作用,最小程度减轻了应用本身的资源压力。

环境准备:

    以下所有组件都是以tar.gz包的形式安装、配置。没有用到docker等容器。

1、Kafka

   这边是通过网上下载的kafka安装包,其中内置了zookeeper。![](https://img-blog.csdnimg.cn/direct/14dbf42a7a33465faeb5d10ae3e17ba1.png)解压命令

tar -zxvf kafka_2.12-3.70.tgz -C 指定目录

     这边解压完,如果不需要额外修改端口的话可以直接使用。进入到kafka_1.12目录下

先启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

等启动好后,看到有create new log controller字样即可再去启动kafka

./bin/kafka-server-start.sh config/server.properties

这样kafka就启动好了,如果需要后台启动,自行下载 nohup,安装好后在命令前加上nohup,命令最后加上符号 & 即可,示例:

nohup ./bin/kafka-server-start.sh config/server.properties &

2、elasticSearch

我这边使用wget命令下载的,没有外网的可以去网上下载tar.gz传入Linux。

重点:这里ELK三件套版本问题,es、kibana最好一致,logstash不要相差太多。本人这里踩坑了。

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.2-linux-x86_64.tar.gz

解压es,因为es启动不能用root账号启动。建议将解压文件放到/usr/local下

tar -zxvf elasticsearch-7.13.2-linux-x86_64.tar.gz -C /usr/local

     **ES是这几个中间件中最难配的一个,需要修改JDK环境、JVM参数、线程数等**

jdk配置:

    因为elasticsearch和jdk是强依赖关系,es由Java编写。所以下的安装包中也有一个JDK,而我们服务器可能已经安装过JDK就会出现冲突。这边需要修改es中bin目录下elasticsearch-env文件

vim elasticsearch-env 按 i 编辑 esc后 输入 wq 回车保存

将使用的Java指定成我们es安装包中自带的。

JVM配置:

去修改es目录下 config目录下jvm.options 默认的jvm参数太高了,如果服务器性能好可以不做修改

vim jvm.options

YML配置:

     配置host和端口,方便后期kibana、logstash访问。修改config目录下的elasticsearch.yml文件。

vim elasticsearch.yml

修改 network.host、http.port、cluster.name、node.name、cluster.initial_master_nodes参数,如图

启动:

    因为es不能直接用root用户启动,需要新建用户,且用root账户给新建账户授权

useradd user-es
passwd 123
接着输入自己的密码即可。接着授权

chown -R user-es:123/usr/local/elasticsearch-7.13.2

然后切换到user-es账户下启动es

su user-es

#bin目录下

./elasticsearch

如果还未启动可能是线程数、堆大小问题。需要自行解决,这里就不过多赘述了。

3、kibana

我这儿使用的是wget指令下载的安装包,注意:需要和es版本一致

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.13.2-linux-x86_64.tar.gz

解压至 /usr/local目录下

tar -zxvf kibana-7.13.2-linux-x86_64.tar.gz -C /usr/local

YML配置:

修改config目录下的kibana.yml 配置IP+端口,并指定elasticsearch地址

vim kibana.yml

其中的server.host之所以填0.0.0.0是因为elasticsear和kibana需要连接,使用localhost会连接失败,所以我这边直接使用0.0.0.0放开了所有连接。

启动:

进入到bin目录下,也需要切换到 uesr-es账户下,切换前记得授权

./kibana

4、lostash

使用wget命令下载logstash安装包

wget https://artifacts.elastic.co/downloads/loagstash/logstash-7.5.1-linux-x86_64.tar.gz

解压至/usr/local目录下

tar -zxvf logstash-7.5.1.tar.gz -C /usr/local

CONF配置

在logstash conf目录下新建一个 logstash.conf 文件

编辑文件,输入如下内容

# 输入方式 通过监听kafka队列,读取数据
input {
       #
      kafka {
        id => "my_plugin_id"
        bootstrap_servers => "localhost:9092"  #kafka地址
        topics => ["zz-log-topic"]             #队列名称,要于项目logback.xml文件中一致
        auto_offset_reset => "latest"
      }
    }

    # 输出 将监听的日志写入es中
    output {
      stdout { codec => rubydebug }
      elasticsearch {
           hosts =>["localhost:9200"]  # es地址
      }
    }

保存,回到根目录下启动

./bin/logstash -f config/logstash.conf

spring项目配置

maven配置

添加maven依赖

<dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC2</version>
            <scope>runtime</scope>
        </dependency>

xml配置

     在项目模块resource目录下新建logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。
scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。
debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
-->
<configuration scan="false" scanPeriod="60 seconds" debug="false">
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
    <springProperty scope="context" name="logging.level" source="logging.level" defaultValue="info"/>

    <!-- 定义日志的根目录 -->
    <property name="LOG_HOME" value="./zzlogs/building" />
    <!-- 定义日志文件名称 -->
    <property name="LOG_NAME" value="building"></property>
    <!-- 下面注释中 %traceid 为SkyWalking 中的traceid -->
    <property name="LOG_PATTERN" value="[%-5level] [%d{yyyy-MM-dd HH:mm:ss}] [%thread] ==> [TxId: %X{PtxId},SpanId: %X{PspanId}]" />
    <property name="LOG_PATTERN_EX" value="[%-5level] [%d{yyyy-MM-dd HH:mm:ss}] T:[%X{traceId}] S:[%X{sessionId}] U:[%X{userId}] [%thread] ==> [%tid] %msg%n" />

    <appender name="kafka_log" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
                <pattern>${LOG_PATTERN_EX}</pattern>
            </layout>
        </encoder>
        <topic>zz-log-topic</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
    </appender>

    <!-- ch.qos.logback.core.ConsoleAppender 表示控制台输出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <!--
        日志输出格式:
            %-5level:级别从左显示5个字符宽度
            %d表示日期时间,
            %thread表示线程名,
            %msg:日志消息,
            %n是换行符
        -->
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
                <pattern>${LOG_PATTERN}</pattern>
            </layout>
        </encoder>
    </appender>

    <!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 -->
    <appender name="file_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 这里为缺省log文件命名配置。注意,如果需要支持基于flume的日志搬运,为了防止文件名滚动过程中,
        重复搬运数据,请将下面两行配置注释掉,从而保证每次生成的日志文件均包含日期信息且不会变化。-->
        <file>${LOG_HOME}/${LOG_NAME}.log</file>
        <append>true</append>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--
            滚动时产生的文件的存放位置及文件名称 %d{yyyy-MM-dd}:按天进行日志滚动
            %i:当文件大小超过maxFileSize时,按照i进行文件滚动
            -->
            <fileNamePattern>${LOG_HOME}/${LOG_NAME}-%d{yyyy-MM-dd}-%i.log</fileNamePattern>
            <!--
            可选节点,控制保留的归档文件的最大数量,超出数量就删除旧文件。假设设置每天滚动,
            且maxHistory是365,则只保存最近365天的文件,删除之前的旧文件。注意,删除旧文件是,
            那些为了归档而创建的目录也会被删除。
            -->
            <!-- 保存31天数据 -->
            <MaxHistory>31</MaxHistory>
            <!--
            当日志文件超过maxFileSize指定的大小是,根据上面提到的%i进行日志文件滚动 注意此处配置SizeBasedTriggeringPolicy是无法实现按文件大小进行滚动的,必须配置timeBasedFileNamingAndTriggeringPolicy
            -->
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>20MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
        <!-- 日志输出格式: -->
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
                <pattern>${LOG_PATTERN_EX}</pattern>
            </layout>
        </encoder>
    </appender>

    <!-- 然后定义logger,只有定义了logger并引入的appender,appender才会生效 -->
    <!-- 这里我们把输出到控制台appender的日志级别设置为DEBUG,便于调试。但是输出文件我们缺省为INFO,两者均可随时修改。-->
    <root level="${logging.level}">
        <appender-ref ref="console" />
    </root>
    <logger name="springfox.documentation" additivity="false" level="error">
        <appender-ref ref="console" />
    </logger>
    <logger name="apelet" additivity="false" level="info">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <!-- 这里将dao的日志级别设置为DEBUG,是为了SQL语句的输出 -->
    <logger name="apelet.upmsservice.dao" additivity="false" level="debug">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <logger name="apelet.common.log.dao" additivity="false" level="debug">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <logger name="apelet.common.report.dao" additivity="false" level="debug">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <logger name="apelet.common.online.dao" additivity="false" level="debug">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <logger name="apelet.common.flow.dao" additivity="false" level="debug">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <logger name="org.flowable" additivity="false" level="info">
        <appender-ref ref="console" />
        <appender-ref ref="kafka_log"/>
        <appender-ref ref="file_log" />
    </logger>
    <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener"/>
</configuration>

xml详解:

启动服务测试

访问kibana,看日志是否写入到了es


本文转载自: https://blog.csdn.net/m0_58907154/article/details/138912576
版权归原作者 鱼儿撒了欢的跳 所有, 如有侵权,请联系我们删除。

“SpringBoot项目整合Kafka+es+logstash+kibana日志收集”的评论:

还没有评论