springboot pom依赖导入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.4.0.RELEASE</version></dependency><!--logstash 整合logback--><dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>4.11</version><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion></exclusions></dependency><!--logback 整合 kafka--><dependency><groupId>com.github.danielwegener</groupId><artifactId>logback-kafka-appender</artifactId><version>0.1.0</version><scope>runtime</scope></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
kafka配置
spring:kafka:listener:
#设置是否批量消费,默认 single(单条),batch(批量)
type: single
# 集群地址
bootstrap-servers: localhost:9092
# 生产者配置
producer:
# 重试次数
retries:3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: all
# 批量处理的最大大小 单位 byte
batch-size:4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
buffer-memory:33554432
# 客户端ID
client-id: hello-kafka
# Key 序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
compression-type: gzip
properties:partitioner:
#指定自定义分区器
class: com.example.demo.config.MyPartitioner
linger:
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
ms:1000max:block:
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
ms:6000
# 消费者配置
consumer:
# 默认消费者组
group-id: testGroup
# 自动提交 offset 默认 true
enable-auto-commit:false
# 自动提交的频率 单位 ms
auto-commit-interval:1000
# 批量消费最大数量
max-poll-records:100
# Key 反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: latest
properties:interceptor:classes: com.example.demo.service.MyConsumerInterceptor
session:timeout:
# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
ms:120000request:timeout:
# 请求超时
ms:120000
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
logging:config: classpath:config/logback-spring.xml
logback-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?><configuration><!-- 日志文件路径 --><property name="logPath" value="C://Users//wangw//Desktop//aliyun-tts//"/><!-- 日志文件名称 --><property name="logName" value="sp-ipage-test"/><logger name="org.springframework.web" level="INFO"/><logger name="org.springboot.sample" level="TRACE"/><appender name="CONSOLE"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level ---[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder></appender><!-- debug 日志文件 --><appender name="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logPath}${logName}.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}%-5level ---[%thread]%logger Line:%-3L -%msg%n</pattern><charset>UTF-8</charset></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 日志归档 --><fileNamePattern>${logPath}${logName}-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>10</maxHistory></rollingPolicy></appender><appender name="KAFKA_APPENDER"class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder"><layout class="net.logstash.logback.layout.LogstashLayout"><!--开启的话会包含hostname等logback的context信息--><includeContext>true</includeContext><!--是否包含日志来源--><includeCallerData>true</includeCallerData><fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/></layout><charset>UTF-8</charset></encoder><!--kafka topic 需要与配置文件里面的topic一致 否则kafka不认识--><topic>test</topic><!--主键分区策略--><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/><!--kafka消息提交策略,logback-kafka-appender为我们提供了两种策略,
异步提交策略(AsynchronousDeliveryStrategy)
阻塞提交策略(BlockingDeliveryStrategy)
--><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!--bootstrap.servers 为kafka 部署地址,服务端需要使用对应的IP地址,不能使用localhost --><producerConfig>bootstrap.servers=127.0.0.1:9092</producerConfig></appender><appender name="kafkaAppenderAsync"class="ch.qos.logback.classic.AsyncAppender"><appender-ref ref="KAFKA_APPENDER"/></appender><!--记录行为日志到 kafka--><logger name="KafkaPipeline" level="INFO"><appender-ref ref="kafkaAppenderAsync"/></logger><!-- 开发、测试环境,额外指定不同包下不同的日志等级 --><springProfile name="dev,test"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 生产环境 --><springProfile name="prod"><logger name="org.springframework.web" level="ERROR"></logger><logger name="org.springboot.sample" level="ERROR"></logger><logger name="com.ipage.work" level="INFO"></logger></springProfile><!-- 基础日志等级 --><root level="INFO"><appender-ref ref="FILE"/><appender-ref ref="CONSOLE"/><appender-ref ref="kafkaAppenderAsync"/></root></configuration>
自定义分区器
package com.example.demo.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;/**
* 自定义分区器
*
* @author zzkk
* @create 2021/5/26 13:40
**/publicclassMyPartitionerimplementsPartitioner{/**
* 分区策略核心方法
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){//具体分区逻辑,这里全部发送到0号分区return0;}
@Override
publicvoidclose(){}
@Override
publicvoidconfigure(Map<String,?> configs){}}
PS:
1、kafka服务端配置的advertised.listeners格式:localhost跨主机会连接失败
advertised.listeners=PLAINTEXT://127.0.0.1:9092
测试
@RunWith(SpringRunner.class)
@SpringBootTest
//引入日志模块
@Slf4j
classDemoApplicationTests{
@Test
publicvoidtest(){//支持的日志级别,可以在logback中配置
log.info("kafka日志测试");}
结果
打开客户端监听对应topic,打印了如下日志
{“@timestamp”:“2023-04-04T17:04:16.477+08:00”,“@version”:1,“message”:“kafka\u65E5\u5FD7\u6D4B\u8BD5INFO”,“logger”:“com.example.demo.DemoApplicationTests”,“thread”:“main”,“level”:“INFO”,“levelVal”:20000,“caller”:{“class”:“com.example.demo.DemoApplicationTests”,“method”:“testSendMessageAsync”,“file”:“DemoApplicationTests.java”,“line”:40}}
版权归原作者 zzkks 所有, 如有侵权,请联系我们删除。