0


springboot logback整合kafka实现日志写入

springboot pom依赖导入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.4.0.RELEASE</version></dependency><!--logstash 整合logback--><dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>4.11</version><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion></exclusions></dependency><!--logback 整合 kafka--><dependency><groupId>com.github.danielwegener</groupId><artifactId>logback-kafka-appender</artifactId><version>0.1.0</version><scope>runtime</scope></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>

kafka配置

spring:kafka:listener:
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: single
    # 集群地址
    bootstrap-servers: localhost:9092
    # 生产者配置
    producer:
      # 重试次数
      retries:3
      # 应答级别
      # acks=0 把消息发送到kafka就认为发送成功
      # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
      # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
      acks: all
      # 批量处理的最大大小 单位 byte
      batch-size:4096
      # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      buffer-memory:33554432
      # 客户端ID
      client-id: hello-kafka
      # Key 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      compression-type: gzip
      properties:partitioner:
          #指定自定义分区器
          class: com.example.demo.config.MyPartitioner
        linger:
          # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          ms:1000max:block:
            # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
            ms:6000
    # 消费者配置
    consumer:
      # 默认消费者组
      group-id: testGroup
      # 自动提交 offset 默认 true
      enable-auto-commit:false
      # 自动提交的频率 单位 ms
      auto-commit-interval:1000
      # 批量消费最大数量
      max-poll-records:100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
      properties:interceptor:classes: com.example.demo.service.MyConsumerInterceptor
        session:timeout:
            # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
            ms:120000request:timeout:
            # 请求超时
            ms:120000
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
logging:config: classpath:config/logback-spring.xml

logback-spring.xml配置

<?xml version="1.0" encoding="UTF-8"?><configuration><!-- 日志文件路径 --><property name="logPath" value="C://Users//wangw//Desktop//aliyun-tts//"/><!-- 日志文件名称 --><property name="logName" value="sp-ipage-test"/><logger name="org.springframework.web" level="INFO"/><logger name="org.springboot.sample" level="TRACE"/><appender name="CONSOLE"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level -&#45;&#45;[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder></appender><!-- debug 日志文件 --><appender name="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logPath}${logName}.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level -&#45;&#45;[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 日志归档 --><fileNamePattern>${logPath}${logName}-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>10</maxHistory></rollingPolicy></appender><appender name="KAFKA_APPENDER"class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder"><layout class="net.logstash.logback.layout.LogstashLayout"><!--开启的话会包含hostname等logback的context信息--><includeContext>true</includeContext><!--是否包含日志来源--><includeCallerData>true</includeCallerData><fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/></layout><charset>UTF-8</charset></encoder><!--kafka topic 需要与配置文件里面的topic一致 否则kafka不认识--><topic>test</topic><!--主键分区策略--><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/><!--kafka消息提交策略,logback-kafka-appender为我们提供了两种策略,
            异步提交策略(AsynchronousDeliveryStrategy)
            阻塞提交策略(BlockingDeliveryStrategy)
        --><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!--bootstrap.servers 为kafka 部署地址,服务端需要使用对应的IP地址,不能使用localhost --><producerConfig>bootstrap.servers=127.0.0.1:9092</producerConfig></appender><appender name="kafkaAppenderAsync"class="ch.qos.logback.classic.AsyncAppender"><appender-ref ref="KAFKA_APPENDER"/></appender><!--记录行为日志到 kafka--><logger name="KafkaPipeline" level="INFO"><appender-ref ref="kafkaAppenderAsync"/></logger><!-- 开发、测试环境,额外指定不同包下不同的日志等级 --><springProfile name="dev,test"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 生产环境 --><springProfile name="prod"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 基础日志等级 --><root level="INFO"><appender-ref ref="FILE"/><appender-ref ref="CONSOLE"/><appender-ref ref="kafkaAppenderAsync"/></root></configuration>

自定义分区器

package com.example.demo.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;/**
 * 自定义分区器
 *
 * @author zzkk
 * @create 2021/5/26 13:40
 **/publicclassMyPartitionerimplementsPartitioner{/**
     * 分区策略核心方法
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){//具体分区逻辑,这里全部发送到0号分区return0;}

    @Override
    publicvoidclose(){}

    @Override
    publicvoidconfigure(Map<String,?> configs){}}
PS:
 1、kafka服务端配置的advertised.listeners格式:localhost跨主机会连接失败
 
    advertised.listeners=PLAINTEXT://127.0.0.1:9092

测试

@RunWith(SpringRunner.class)
@SpringBootTest
//引入日志模块
@Slf4j
classDemoApplicationTests{
    
    @Test
    publicvoidtest(){//支持的日志级别,可以在logback中配置
      log.info("kafka日志测试");}

结果

打开客户端监听对应topic,打印了如下日志

在这里插入图片描述
{“@timestamp”:“2023-04-04T17:04:16.477+08:00”,“@version”:1,“message”:“kafka\u65E5\u5FD7\u6D4B\u8BD5INFO”,“logger”:“com.example.demo.DemoApplicationTests”,“thread”:“main”,“level”:“INFO”,“levelVal”:20000,“caller”:{“class”:“com.example.demo.DemoApplicationTests”,“method”:“testSendMessageAsync”,“file”:“DemoApplicationTests.java”,“line”:40}}


本文转载自: https://blog.csdn.net/qq_39091806/article/details/129956913
版权归原作者 zzkks 所有, 如有侵权,请联系我们删除。

“springboot logback整合kafka实现日志写入”的评论:

还没有评论