0


SpringBoot项目整合Kafka+es+logstash+kibana日志收集

  1. 现在更多项目会把日志整理收集起来,方便客户或者开发查询日志。日志是项目中一个多而且杂的关键组织部分。
  2. 今天将演示的就是kafka+ELKelasticSearch+logstash+kibana】组成的日志分析系统。其中kafka起到了异步的作用,最小程度减轻了应用本身的资源压力。

环境准备:

  1. 以下所有组件都是以tar.gz包的形式安装、配置。没有用到docker等容器。

1、Kafka

  1. 这边是通过网上下载的kafka安装包,其中内置了zookeeper。![](https://img-blog.csdnimg.cn/direct/14dbf42a7a33465faeb5d10ae3e17ba1.png)解压命令

tar -zxvf kafka_2.12-3.70.tgz -C 指定目录

  1. 这边解压完,如果不需要额外修改端口的话可以直接使用。进入到kafka_1.12目录下

先启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

等启动好后,看到有create new log controller字样即可再去启动kafka

./bin/kafka-server-start.sh config/server.properties

这样kafka就启动好了,如果需要后台启动,自行下载 nohup,安装好后在命令前加上nohup,命令最后加上符号 & 即可,示例:

nohup ./bin/kafka-server-start.sh config/server.properties &

2、elasticSearch

我这边使用wget命令下载的,没有外网的可以去网上下载tar.gz传入Linux。

重点:这里ELK三件套版本问题,es、kibana最好一致,logstash不要相差太多。本人这里踩坑了。

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.2-linux-x86_64.tar.gz

解压es,因为es启动不能用root账号启动。建议将解压文件放到/usr/local下

tar -zxvf elasticsearch-7.13.2-linux-x86_64.tar.gz -C /usr/local

  1. **ES是这几个中间件中最难配的一个,需要修改JDK环境、JVM参数、线程数等**

jdk配置:

  1. 因为elasticsearchjdk是强依赖关系,esJava编写。所以下的安装包中也有一个JDK,而我们服务器可能已经安装过JDK就会出现冲突。这边需要修改esbin目录下elasticsearch-env文件

vim elasticsearch-env 按 i 编辑 esc后 输入 wq 回车保存

将使用的Java指定成我们es安装包中自带的。

JVM配置:

去修改es目录下 config目录下jvm.options 默认的jvm参数太高了,如果服务器性能好可以不做修改

vim jvm.options

YML配置:

  1. 配置host和端口,方便后期kibanalogstash访问。修改config目录下的elasticsearch.yml文件。

vim elasticsearch.yml

修改 network.host、http.port、cluster.name、node.name、cluster.initial_master_nodes参数,如图

启动:

  1. 因为es不能直接用root用户启动,需要新建用户,且用root账户给新建账户授权

useradd user-es
passwd 123
接着输入自己的密码即可。接着授权

chown -R user-es:123/usr/local/elasticsearch-7.13.2

然后切换到user-es账户下启动es

su user-es

#bin目录下

./elasticsearch

如果还未启动可能是线程数、堆大小问题。需要自行解决,这里就不过多赘述了。

3、kibana

我这儿使用的是wget指令下载的安装包,注意:需要和es版本一致

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.13.2-linux-x86_64.tar.gz

解压至 /usr/local目录下

tar -zxvf kibana-7.13.2-linux-x86_64.tar.gz -C /usr/local

YML配置:

修改config目录下的kibana.yml 配置IP+端口,并指定elasticsearch地址

vim kibana.yml

其中的server.host之所以填0.0.0.0是因为elasticsear和kibana需要连接,使用localhost会连接失败,所以我这边直接使用0.0.0.0放开了所有连接。

启动:

进入到bin目录下,也需要切换到 uesr-es账户下,切换前记得授权

./kibana

4、lostash

使用wget命令下载logstash安装包

wget https://artifacts.elastic.co/downloads/loagstash/logstash-7.5.1-linux-x86_64.tar.gz

解压至/usr/local目录下

tar -zxvf logstash-7.5.1.tar.gz -C /usr/local

CONF配置

在logstash conf目录下新建一个 logstash.conf 文件

编辑文件,输入如下内容

  1. # 输入方式 通过监听kafka队列,读取数据
  2. input {
  3. #
  4. kafka {
  5. id => "my_plugin_id"
  6. bootstrap_servers => "localhost:9092" #kafka地址
  7. topics => ["zz-log-topic"] #队列名称,要于项目logback.xml文件中一致
  8. auto_offset_reset => "latest"
  9. }
  10. }
  11. # 输出 将监听的日志写入es中
  12. output {
  13. stdout { codec => rubydebug }
  14. elasticsearch {
  15. hosts =>["localhost:9200"] # es地址
  16. }
  17. }

保存,回到根目录下启动

./bin/logstash -f config/logstash.conf

spring项目配置

maven配置

添加maven依赖

  1. <dependency>
  2. <groupId>com.github.danielwegener</groupId>
  3. <artifactId>logback-kafka-appender</artifactId>
  4. <version>0.2.0-RC2</version>
  5. <scope>runtime</scope>
  6. </dependency>

xml配置

  1. 在项目模块resource目录下新建logback-spring.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <!--
  3. scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。
  4. scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。
  5. debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
  6. -->
  7. <configuration scan="false" scanPeriod="60 seconds" debug="false">
  8. <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
  9. <springProperty scope="context" name="logging.level" source="logging.level" defaultValue="info"/>
  10. <!-- 定义日志的根目录 -->
  11. <property name="LOG_HOME" value="./zzlogs/building" />
  12. <!-- 定义日志文件名称 -->
  13. <property name="LOG_NAME" value="building"></property>
  14. <!-- 下面注释中 %traceid 为SkyWalking 中的traceid -->
  15. <property name="LOG_PATTERN" value="[%-5level] [%d{yyyy-MM-dd HH:mm:ss}] [%thread] ==> [TxId: %X{PtxId},SpanId: %X{PspanId}]" />
  16. <property name="LOG_PATTERN_EX" value="[%-5level] [%d{yyyy-MM-dd HH:mm:ss}] T:[%X{traceId}] S:[%X{sessionId}] U:[%X{userId}] [%thread] ==> [%tid] %msg%n" />
  17. <appender name="kafka_log" class="com.github.danielwegener.logback.kafka.KafkaAppender">
  18. <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
  19. <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
  20. <pattern>${LOG_PATTERN_EX}</pattern>
  21. </layout>
  22. </encoder>
  23. <topic>zz-log-topic</topic>
  24. <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
  25. <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
  26. <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
  27. </appender>
  28. <!-- ch.qos.logback.core.ConsoleAppender 表示控制台输出 -->
  29. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
  30. <!--
  31. 日志输出格式:
  32. %-5level:级别从左显示5个字符宽度
  33. %d表示日期时间,
  34. %thread表示线程名,
  35. %msg:日志消息,
  36. %n是换行符
  37. -->
  38. <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
  39. <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
  40. <pattern>${LOG_PATTERN}</pattern>
  41. </layout>
  42. </encoder>
  43. </appender>
  44. <!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 -->
  45. <appender name="file_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
  46. <!-- 这里为缺省log文件命名配置。注意,如果需要支持基于flume的日志搬运,为了防止文件名滚动过程中,
  47. 重复搬运数据,请将下面两行配置注释掉,从而保证每次生成的日志文件均包含日期信息且不会变化。-->
  48. <file>${LOG_HOME}/${LOG_NAME}.log</file>
  49. <append>true</append>
  50. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  51. <!--
  52. 滚动时产生的文件的存放位置及文件名称 %d{yyyy-MM-dd}:按天进行日志滚动
  53. %i:当文件大小超过maxFileSize时,按照i进行文件滚动
  54. -->
  55. <fileNamePattern>${LOG_HOME}/${LOG_NAME}-%d{yyyy-MM-dd}-%i.log</fileNamePattern>
  56. <!--
  57. 可选节点,控制保留的归档文件的最大数量,超出数量就删除旧文件。假设设置每天滚动,
  58. 且maxHistory是365,则只保存最近365天的文件,删除之前的旧文件。注意,删除旧文件是,
  59. 那些为了归档而创建的目录也会被删除。
  60. -->
  61. <!-- 保存31天数据 -->
  62. <MaxHistory>31</MaxHistory>
  63. <!--
  64. 当日志文件超过maxFileSize指定的大小是,根据上面提到的%i进行日志文件滚动 注意此处配置SizeBasedTriggeringPolicy是无法实现按文件大小进行滚动的,必须配置timeBasedFileNamingAndTriggeringPolicy
  65. -->
  66. <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
  67. <maxFileSize>20MB</maxFileSize>
  68. </timeBasedFileNamingAndTriggeringPolicy>
  69. </rollingPolicy>
  70. <!-- 日志输出格式: -->
  71. <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
  72. <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
  73. <pattern>${LOG_PATTERN_EX}</pattern>
  74. </layout>
  75. </encoder>
  76. </appender>
  77. <!-- 然后定义logger,只有定义了logger并引入的appender,appender才会生效 -->
  78. <!-- 这里我们把输出到控制台appender的日志级别设置为DEBUG,便于调试。但是输出文件我们缺省为INFO,两者均可随时修改。-->
  79. <root level="${logging.level}">
  80. <appender-ref ref="console" />
  81. </root>
  82. <logger name="springfox.documentation" additivity="false" level="error">
  83. <appender-ref ref="console" />
  84. </logger>
  85. <logger name="apelet" additivity="false" level="info">
  86. <appender-ref ref="console" />
  87. <appender-ref ref="kafka_log"/>
  88. <appender-ref ref="file_log" />
  89. </logger>
  90. <!-- 这里将dao的日志级别设置为DEBUG,是为了SQL语句的输出 -->
  91. <logger name="apelet.upmsservice.dao" additivity="false" level="debug">
  92. <appender-ref ref="console" />
  93. <appender-ref ref="kafka_log"/>
  94. <appender-ref ref="file_log" />
  95. </logger>
  96. <logger name="apelet.common.log.dao" additivity="false" level="debug">
  97. <appender-ref ref="console" />
  98. <appender-ref ref="kafka_log"/>
  99. <appender-ref ref="file_log" />
  100. </logger>
  101. <logger name="apelet.common.report.dao" additivity="false" level="debug">
  102. <appender-ref ref="console" />
  103. <appender-ref ref="kafka_log"/>
  104. <appender-ref ref="file_log" />
  105. </logger>
  106. <logger name="apelet.common.online.dao" additivity="false" level="debug">
  107. <appender-ref ref="console" />
  108. <appender-ref ref="kafka_log"/>
  109. <appender-ref ref="file_log" />
  110. </logger>
  111. <logger name="apelet.common.flow.dao" additivity="false" level="debug">
  112. <appender-ref ref="console" />
  113. <appender-ref ref="kafka_log"/>
  114. <appender-ref ref="file_log" />
  115. </logger>
  116. <logger name="org.flowable" additivity="false" level="info">
  117. <appender-ref ref="console" />
  118. <appender-ref ref="kafka_log"/>
  119. <appender-ref ref="file_log" />
  120. </logger>
  121. <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener"/>
  122. </configuration>

xml详解:

启动服务测试

访问kibana,看日志是否写入到了es


本文转载自: https://blog.csdn.net/m0_58907154/article/details/138912576
版权归原作者 鱼儿撒了欢的跳 所有, 如有侵权,请联系我们删除。

“SpringBoot项目整合Kafka+es+logstash+kibana日志收集”的评论:

还没有评论