0


springboot logback整合kafka实现日志写入

springboot pom依赖导入

  1. <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.4.0.RELEASE</version></dependency><!--logstash 整合logback--><dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>4.11</version><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion></exclusions></dependency><!--logback 整合 kafka--><dependency><groupId>com.github.danielwegener</groupId><artifactId>logback-kafka-appender</artifactId><version>0.1.0</version><scope>runtime</scope></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>

kafka配置

  1. spring:kafka:listener:
  2. #设置是否批量消费,默认 single(单条),batch(批量)
  3. type: single
  4. # 集群地址
  5. bootstrap-servers: localhost:9092
  6. # 生产者配置
  7. producer:
  8. # 重试次数
  9. retries:3
  10. # 应答级别
  11. # acks=0 把消息发送到kafka就认为发送成功
  12. # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
  13. # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
  14. acks: all
  15. # 批量处理的最大大小 单位 byte
  16. batch-size:4096
  17. # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
  18. buffer-memory:33554432
  19. # 客户端ID
  20. client-id: hello-kafka
  21. # Key 序列化类
  22. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  23. # Value 序列化类
  24. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  25. # 消息压缩:none、lz4、gzip、snappy,默认为 none。
  26. compression-type: gzip
  27. properties:partitioner:
  28. #指定自定义分区器
  29. class: com.example.demo.config.MyPartitioner
  30. linger:
  31. # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
  32. ms:1000max:block:
  33. # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
  34. ms:6000
  35. # 消费者配置
  36. consumer:
  37. # 默认消费者组
  38. group-id: testGroup
  39. # 自动提交 offset 默认 true
  40. enable-auto-commit:false
  41. # 自动提交的频率 单位 ms
  42. auto-commit-interval:1000
  43. # 批量消费最大数量
  44. max-poll-records:100
  45. # Key 反序列化类
  46. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  47. # Value 反序列化类
  48. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  49. # 当kafka中没有初始offset或offset超出范围时将自动重置offset
  50. # earliest:重置为分区中最小的offset
  51. # latest:重置为分区中最新的offset(消费分区中新产生的数据)
  52. # none:只要有一个分区不存在已提交的offset,就抛出异常
  53. auto-offset-reset: latest
  54. properties:interceptor:classes: com.example.demo.service.MyConsumerInterceptor
  55. session:timeout:
  56. # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
  57. ms:120000request:timeout:
  58. # 请求超时
  59. ms:120000
  60. # 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
  61. logging:config: classpath:config/logback-spring.xml

logback-spring.xml配置

  1. <?xml version="1.0" encoding="UTF-8"?><configuration><!-- 日志文件路径 --><property name="logPath" value="C://Users//wangw//Desktop//aliyun-tts//"/><!-- 日志文件名称 --><property name="logName" value="sp-ipage-test"/><logger name="org.springframework.web" level="INFO"/><logger name="org.springboot.sample" level="TRACE"/><appender name="CONSOLE"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level -&#45;&#45;[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder></appender><!-- debug 日志文件 --><appender name="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logPath}${logName}.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level -&#45;&#45;[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 日志归档 --><fileNamePattern>${logPath}${logName}-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>10</maxHistory></rollingPolicy></appender><appender name="KAFKA_APPENDER"class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder"><layout class="net.logstash.logback.layout.LogstashLayout"><!--开启的话会包含hostname等logback的context信息--><includeContext>true</includeContext><!--是否包含日志来源--><includeCallerData>true</includeCallerData><fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/></layout><charset>UTF-8</charset></encoder><!--kafka topic 需要与配置文件里面的topic一致 否则kafka不认识--><topic>test</topic><!--主键分区策略--><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/><!--kafka消息提交策略,logback-kafka-appender为我们提供了两种策略,
  2. 异步提交策略(AsynchronousDeliveryStrategy)
  3. 阻塞提交策略(BlockingDeliveryStrategy)
  4. --><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!--bootstrap.servers 为kafka 部署地址,服务端需要使用对应的IP地址,不能使用localhost --><producerConfig>bootstrap.servers=127.0.0.1:9092</producerConfig></appender><appender name="kafkaAppenderAsync"class="ch.qos.logback.classic.AsyncAppender"><appender-ref ref="KAFKA_APPENDER"/></appender><!--记录行为日志到 kafka--><logger name="KafkaPipeline" level="INFO"><appender-ref ref="kafkaAppenderAsync"/></logger><!-- 开发、测试环境,额外指定不同包下不同的日志等级 --><springProfile name="dev,test"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 生产环境 --><springProfile name="prod"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 基础日志等级 --><root level="INFO"><appender-ref ref="FILE"/><appender-ref ref="CONSOLE"/><appender-ref ref="kafkaAppenderAsync"/></root></configuration>

自定义分区器

  1. package com.example.demo.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;/**
  2. * 自定义分区器
  3. *
  4. * @author zzkk
  5. * @create 2021/5/26 13:40
  6. **/publicclassMyPartitionerimplementsPartitioner{/**
  7. * 分区策略核心方法
  8. * @param topic
  9. * @param key
  10. * @param keyBytes
  11. * @param value
  12. * @param valueBytes
  13. * @param cluster
  14. * @return
  15. */
  16. @Override
  17. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){//具体分区逻辑,这里全部发送到0号分区return0;}
  18. @Override
  19. publicvoidclose(){}
  20. @Override
  21. publicvoidconfigure(Map<String,?> configs){}}
PS:
  1. 1kafka服务端配置的advertised.listeners格式:localhost跨主机会连接失败
  2. advertised.listeners=PLAINTEXT://127.0.0.1:9092

测试

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. //引入日志模块
  4. @Slf4j
  5. classDemoApplicationTests{
  6. @Test
  7. publicvoidtest(){//支持的日志级别,可以在logback中配置
  8. log.info("kafka日志测试");}

结果

打开客户端监听对应topic,打印了如下日志

在这里插入图片描述
{“@timestamp”:“2023-04-04T17:04:16.477+08:00”,“@version”:1,“message”:“kafka\u65E5\u5FD7\u6D4B\u8BD5INFO”,“logger”:“com.example.demo.DemoApplicationTests”,“thread”:“main”,“level”:“INFO”,“levelVal”:20000,“caller”:{“class”:“com.example.demo.DemoApplicationTests”,“method”:“testSendMessageAsync”,“file”:“DemoApplicationTests.java”,“line”:40}}


本文转载自: https://blog.csdn.net/qq_39091806/article/details/129956913
版权归原作者 zzkks 所有, 如有侵权,请联系我们删除。

“springboot logback整合kafka实现日志写入”的评论:

还没有评论