SpringBoot 集成 Kafka
1 Docker 安装 Kafka
Docker 安装 Kafka
2 Kafka 创建 Topic
创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)
PS C:\Users\Administrator>dockerexec-it kafka /bin/sh
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic topic1
Created topic topic1.
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic topic2
Created topic topic2.
3 Java 创建 Topic
packagecom.xu.mq.demo.test.service;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.apache.kafka.clients.admin.NewTopic;/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/@ConfigurationpublicclassKafkaInitialConfig{publicstaticfinalStringAUDIO_UPLOAD_TOPIC="AudioUploadTopic";publicstaticfinalStringTEXT_UPLOAD_TOPIC="TextUploadTopic";@BeanpublicNewTopicaudioUploadTopic(){// 设置分区1,备份1 returnnewNewTopic(AUDIO_UPLOAD_TOPIC,1,(short)1);}@BeanpublicNewTopictextUploadTopic(){// 设置分区3,备份2 returnnewNewTopic(TEXT_UPLOAD_TOPIC,3,(short)2);}}
4 SpringBoot 集成 Kafka
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
4.2 application.yml
server:port:8001spring:application:name: hello-kafka
kafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 消息发送失败重试次数retries:3# 重试间隔retry-backoff-ms:500# 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none,lz4压缩比最高compression-type: none
#发送缓冲区大小32Mbuffer-memory:33554432# 去缓冲区中一次拉16k的数据,发送到brokerbatch-size:16384# 每条消息大小限制 20Mmax-request-size:20971520# 设置发送延时时间,如果在设置的时间内依然没有达到batch-size,依然发出消息给kafka集群linger-ms:30# 失败重试时,保证消息顺序性,会降低吞吐量max-in-flight-requests-per-connection:1# 开启发送消息幂等性(单分区)enable-idempotence:true# 生产者空间不足时阻塞的时间,默认60smax-block-ms:6000# acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。# acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。# acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。acks:-1# key,value序列化器选择key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 开启事务,当开启时retries必须>0 acks必须为all 可以使用kafkaTemplate.executeInTransaction和@Transactional实现卡夫卡事物transaction-id-prefix: transaction
consumer:group-id: KafkaGroup
# 提交offset延时(接收到消息后多久提交offset)auto-commit-interval:1000# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit:false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量消费每次最多消费多少条消息max-poll-records:50# 根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalanceproperties:max:poll:interval:ms:600000listener:missing-topics-fatal:false# 线程数concurrency:4# RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交# BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交# COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交# COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交# MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交# MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种ack-mode: manual-immediate
# 消费超时时间poll-timeout:3000
4.3 KafkaApplication.java
packagecom.xu.mq.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.kafka.annotation.EnableKafka;/**
* @author Administrator
*/@EnableKafka@SpringBootApplicationpublicclassKafkaApplication{publicstaticvoidmain(String[] args){SpringApplication.run(DemoApplication.class, args);}}
4.4 CustomizePartitioner.java
自定义消息推送的分区,更加具体的业务逻辑可以使用Java自定义设置推送分区也可以使用kafka的默认设置。
packagecom.xu.kafka.config;importjava.util.Map;importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;/**
* @author Administrator
*/publicclassCustomizePartitionerimplementsPartitioner{/**
* 自定义分区规则
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
* @return
*/@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}
4.5 KafkaInitialConfig.java
packagecom.xu.kafka.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.apache.kafka.clients.admin.NewTopic;/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/@ConfigurationpublicclassKafkaInitialConfig{publicstaticfinalStringAUDIO_UPLOAD_TOPIC="AudioUploadTopic";publicstaticfinalStringTEXT_UPLOAD_TOPIC="TextUploadTopic";@BeanpublicNewTopicaudioUploadTopic(){// 设置分区1,备份1 returnnewNewTopic(AUDIO_UPLOAD_TOPIC,1,(short)1);}@BeanpublicNewTopictextUploadTopic(){// 设置分区3,备份2 returnnewNewTopic(TEXT_UPLOAD_TOPIC,3,(short)2);}}
4.6 SendMessageController.java 生产者
packagecom.xu.kafka.message.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.retry.support.RetryTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importcom.xu.kafka.config.KafkaInitialConfig;importlombok.extern.slf4j.Slf4j;/**
* @author Administrator
*/@Slf4j@RequestMapping(value ="/kafka")@RestControllerpublicclassSendMessageController{@AutowiredprivateKafkaTemplate kafkaTemplate;@AutowiredprivateRetryTemplate retryTemplate;/**
* KafkaTemplate 发送消息 同步
*
* @param msg
*/@GetMapping("/test1/{msg}")publicvoidtest1(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg);}/**
* KafkaTemplate 发送消息 异步
*
* @param msg
*/@GetMapping("/test2/{msg}")publicvoidtest2(@PathVariable("msg")String msg)throwsException{String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg).get();}/**
* KafkaTemplate 发送消息 同步 有回调
*
* @param msg
*/@GetMapping("/test3/{msg}")publicvoidtest3(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg).addCallback(success ->{System.out.println("发送成功\t"+ success);}, fail ->{System.out.println("发送失败\t"+ fail);});}/**
* RetryTemplate 发送消息 同步 有回调
*
* @param msg
*/@GetMapping("/test4/{msg}")publicvoidtest4(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
retryTemplate.execute(retryCallback ->{
kafkaTemplate.send(topic, key, msg);
log.info("Kafka 发送成功 Topic:{}, Key:{}, Count:{}", topic, key, retryCallback.getRetryCount());return"success "+ retryCallback.getRetryCount();}, recoveryCallback ->{// 重试后仍然失败后需要执行的代码
log.info("Kafka 发送失败 Topic:{}, Key{}, Count:{}, Info:{}", topic, key, recoveryCallback.getRetryCount(),
recoveryCallback.getLastThrowable().getMessage());return"failure "+ recoveryCallback.getRetryCount();});}/**
* KafkaTemplate 发送消息 事物
*
* @param msg
*/@GetMapping("/test5/{msg}")publicvoidtest5(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.executeInTransaction(operations ->{
operations.send(topic, key, msg);thrownewRuntimeException("fail");});}}
4.7 KafkaConsumer.java 消费者
packagecom.xu.kafka.message.controller;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;importorg.apache.kafka.clients.consumer.ConsumerRecord;importcom.xu.kafka.config.KafkaInitialConfig;importlombok.extern.slf4j.Slf4j;/**
* @author Administrator
*/@Slf4j@ComponentpublicclassKafkaConsumer{/**
* 指定一个消费者组,一个主题主题。
*
* @param record
*/@KafkaListener(topics =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC)publicvoidconsumer(ConsumerRecord<String,String> record,Acknowledgment ack){System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp());
ack.acknowledge();}}
分区 =2, 偏移量 =16, key =11111111, 内容 =1111111111111111111111111111,创建消息的时间戳 =1676983128560
5 启动服务
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
(()\___ |'_ | '_|| '_ \/ _` | \ \ \ \
\\/ ___)||_)|||||||(_||))))
' |____|.__|_||_|_||_\__,|////=========|_|==============|___/=/_/_/_/::SpringBoot::(v2.7.8)2023-02-2210:20:51.142INFO38584---[ restartedMain]com.xu.kafka.KafkaApplication:StartingKafkaApplication using Java1.8.0_301 on HyacinthwithPID38584(E:\SourceCode\Idea\kafka\target\classes started by Administrator in E:\SourceCode\Idea\kafka)2023-02-2210:20:51.143INFO38584---[ restartedMain]com.xu.kafka.KafkaApplication:No active profile set, falling back to1default profile:"default"2023-02-2210:20:51.200INFO38584---[ restartedMain].e.DevToolsPropertyDefaultsPostProcessor :Devtools property defaults active!Set 'spring.devtools.add-properties' to 'false' todisable2023-02-2210:20:51.200INFO38584---[ restartedMain].e.DevToolsPropertyDefaultsPostProcessor :For additional web related logging consider setting the 'logging.level.web' property to'DEBUG'2023-02-2210:20:53.010INFO38584---[ restartedMain]o.s.b.w.embedded.tomcat.TomcatWebServer:Tomcat initialized withport(s):8001(http)2023-02-2210:20:53.021INFO38584---[ restartedMain]o.apache.catalina.core.StandardService:Starting service [Tomcat]2023-02-2210:20:53.021INFO38584---[ restartedMain]org.apache.catalina.core.StandardEngine:StartingServlet engine:[ApacheTomcat/9.0.71]2023-02-2210:20:53.140INFO38584---[ restartedMain]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpring embedded WebApplicationContext2023-02-2210:20:53.140INFO38584---[ restartedMain]w.s.c.ServletWebServerApplicationContext:RootWebApplicationContext: initialization completed in 1939 ms
2023-02-2210:20:53.683INFO38584---[ restartedMain]o.s.b.d.a.OptionalLiveReloadServer:LiveReload server is running on port 357292023-02-2210:20:53.755INFO38584---[ restartedMain]o.a.k.clients.admin.AdminClientConfig:AdminClientConfig values:
bootstrap.servers =[192.168.1.92:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms =300000default.api.timeout.ms =60000
metadata.max.age.ms =300000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level =INFO
metrics.sample.window.ms =30000
receive.buffer.bytes =65536
reconnect.backoff.max.ms =1000
reconnect.backoff.ms =50
request.timeout.ms =30000
retries =2147483647
retry.backoff.ms =100
sasl.client.callback.handler.class=null
sasl.jaas.config =null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.service.name =null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class=null
sasl.login.class=null
sasl.login.connect.timeout.ms =null
sasl.login.read.timeout.ms =null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism =GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience =null
sasl.oauthbearer.expected.issuer =null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url =null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url =null
security.protocol =PLAINTEXT
security.providers =null
send.buffer.bytes =131072
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
ssl.cipher.suites =null
ssl.enabled.protocols =[TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class=null
ssl.key.password =null
ssl.keymanager.algorithm =SunX509
ssl.keystore.certificate.chain =null
ssl.keystore.key =null
ssl.keystore.location =null
ssl.keystore.password =null
ssl.keystore.type =JKS
ssl.protocol =TLSv1.2
ssl.provider =null
ssl.secure.random.implementation =null
ssl.trustmanager.algorithm =PKIX
ssl.truststore.certificates =null
ssl.truststore.location =null
ssl.truststore.password =null
ssl.truststore.type =JKS2023-02-2210:20:53.847WARN38584---[ restartedMain]o.a.k.clients.admin.AdminClientConfig:The configuration 'max.poll.interval.ms' was supplied but isn't a known config.2023-02-2210:20:53.849INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:53.849INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:53.849INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324538482023-02-2210:20:54.234ERROR38584---[ restartedMain]o.springframework.kafka.core.KafkaAdmin:Failedtocreate topics
org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.2023-02-2210:20:54.236ERROR38584---[ restartedMain]o.springframework.kafka.core.KafkaAdmin:Could not configure topics
org.springframework.kafka.KafkaException:Failedtocreate topics; nested exception is org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.
at org.springframework.kafka.core.KafkaAdmin.addTopics(KafkaAdmin.java:450)[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:300)[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:201)[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:171)[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974)[spring-beans-5.3.25.jar:5.3.25]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)[spring-context-5.3.25.jar:5.3.25]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)[spring-context-5.3.25.jar:5.3.25]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147)[spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731)[spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408)[spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307)[spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)[spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)[spring-boot-2.7.8.jar:2.7.8]
at com.xu.kafka.KafkaApplication.main(KafkaApplication.java:15)[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)~[na:1.8.0_301]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)~[na:1.8.0_301]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)~[na:1.8.0_301]
at java.lang.reflect.Method.invoke(Method.java:498)~[na:1.8.0_301]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)[spring-boot-devtools-2.7.8.jar:2.7.8]Caused by:org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.2023-02-2210:20:54.236INFO38584---[| adminclient-1]o.a.kafka.common.utils.AppInfoParser:App info kafka.admin.client for adminclient-1 unregistered
2023-02-2210:20:54.239INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Metrics scheduler closed
2023-02-2210:20:54.240INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Closing reporter org.apache.kafka.common.metrics.JmxReporter2023-02-2210:20:54.240INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Metrics reporters closed
2023-02-2210:20:54.266INFO38584---[ restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values:
allow.auto.create.topics =true
auto.commit.interval.ms =1000
auto.offset.reset = earliest
bootstrap.servers =[192.168.1.92:9092]
check.crcs =true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-1
client.rack =
connections.max.idle.ms =540000default.api.timeout.ms =60000
enable.auto.commit =false
exclude.internal.topics =true
fetch.max.bytes =52428800
fetch.max.wait.ms =500
fetch.min.bytes =1
group.id =KafkaGroup
group.instance.id =null
heartbeat.interval.ms =3000
interceptor.classes =[]
internal.leave.group.on.close =true
internal.throw.on.fetch.stable.offset.unsupported =false
isolation.level = read_uncommitted
key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes =1048576
max.poll.interval.ms =600000
max.poll.records =50
metadata.max.age.ms =300000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level =INFO
metrics.sample.window.ms =30000
partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes =65536
reconnect.backoff.max.ms =1000
reconnect.backoff.ms =50
request.timeout.ms =30000
retry.backoff.ms =100
sasl.client.callback.handler.class=null
sasl.jaas.config =null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.service.name =null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class=null
sasl.login.class=null
sasl.login.connect.timeout.ms =null
sasl.login.read.timeout.ms =null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism =GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience =null
sasl.oauthbearer.expected.issuer =null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url =null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url =null
security.protocol =PLAINTEXT
security.providers =null
send.buffer.bytes =131072
session.timeout.ms =45000
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
ssl.cipher.suites =null
ssl.enabled.protocols =[TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class=null
ssl.key.password =null
ssl.keymanager.algorithm =SunX509
ssl.keystore.certificate.chain =null
ssl.keystore.key =null
ssl.keystore.location =null
ssl.keystore.password =null
ssl.keystore.type =JKS
ssl.protocol =TLSv1.2
ssl.provider =null
ssl.secure.random.implementation =null
ssl.trustmanager.algorithm =PKIX
ssl.truststore.certificates =null
ssl.truststore.location =null
ssl.truststore.password =null
ssl.truststore.type =JKS
value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.300INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.300INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.300INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324542992023-02-2210:20:54.301INFO38584---[ restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.309INFO38584---[ restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values:
allow.auto.create.topics =true
auto.commit.interval.ms =1000
auto.offset.reset = earliest
bootstrap.servers =[192.168.1.92:9092]
check.crcs =true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-2
client.rack =
connections.max.idle.ms =540000default.api.timeout.ms =60000
enable.auto.commit =false
exclude.internal.topics =true
fetch.max.bytes =52428800
fetch.max.wait.ms =500
fetch.min.bytes =1
group.id =KafkaGroup
group.instance.id =null
heartbeat.interval.ms =3000
interceptor.classes =[]
internal.leave.group.on.close =true
internal.throw.on.fetch.stable.offset.unsupported =false
isolation.level = read_uncommitted
key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes =1048576
max.poll.interval.ms =600000
max.poll.records =50
metadata.max.age.ms =300000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level =INFO
metrics.sample.window.ms =30000
partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes =65536
reconnect.backoff.max.ms =1000
reconnect.backoff.ms =50
request.timeout.ms =30000
retry.backoff.ms =100
sasl.client.callback.handler.class=null
sasl.jaas.config =null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.service.name =null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class=null
sasl.login.class=null
sasl.login.connect.timeout.ms =null
sasl.login.read.timeout.ms =null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism =GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience =null
sasl.oauthbearer.expected.issuer =null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url =null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url =null
security.protocol =PLAINTEXT
security.providers =null
send.buffer.bytes =131072
session.timeout.ms =45000
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
ssl.cipher.suites =null
ssl.enabled.protocols =[TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class=null
ssl.key.password =null
ssl.keymanager.algorithm =SunX509
ssl.keystore.certificate.chain =null
ssl.keystore.key =null
ssl.keystore.location =null
ssl.keystore.password =null
ssl.keystore.type =JKS
ssl.protocol =TLSv1.2
ssl.provider =null
ssl.secure.random.implementation =null
ssl.trustmanager.algorithm =PKIX
ssl.truststore.certificates =null
ssl.truststore.location =null
ssl.truststore.password =null
ssl.truststore.type =JKS
value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.315INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.315INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.315INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543152023-02-2210:20:54.315INFO38584---[ restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.317INFO38584---[ restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values:
allow.auto.create.topics =true
auto.commit.interval.ms =1000
auto.offset.reset = earliest
bootstrap.servers =[192.168.1.92:9092]
check.crcs =true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-3
client.rack =
connections.max.idle.ms =540000default.api.timeout.ms =60000
enable.auto.commit =false
exclude.internal.topics =true
fetch.max.bytes =52428800
fetch.max.wait.ms =500
fetch.min.bytes =1
group.id =KafkaGroup
group.instance.id =null
heartbeat.interval.ms =3000
interceptor.classes =[]
internal.leave.group.on.close =true
internal.throw.on.fetch.stable.offset.unsupported =false
isolation.level = read_uncommitted
key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes =1048576
max.poll.interval.ms =600000
max.poll.records =50
metadata.max.age.ms =300000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level =INFO
metrics.sample.window.ms =30000
partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes =65536
reconnect.backoff.max.ms =1000
reconnect.backoff.ms =50
request.timeout.ms =30000
retry.backoff.ms =100
sasl.client.callback.handler.class=null
sasl.jaas.config =null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.service.name =null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class=null
sasl.login.class=null
sasl.login.connect.timeout.ms =null
sasl.login.read.timeout.ms =null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism =GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience =null
sasl.oauthbearer.expected.issuer =null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url =null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url =null
security.protocol =PLAINTEXT
security.providers =null
send.buffer.bytes =131072
session.timeout.ms =45000
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
ssl.cipher.suites =null
ssl.enabled.protocols =[TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class=null
ssl.key.password =null
ssl.keymanager.algorithm =SunX509
ssl.keystore.certificate.chain =null
ssl.keystore.key =null
ssl.keystore.location =null
ssl.keystore.password =null
ssl.keystore.type =JKS
ssl.protocol =TLSv1.2
ssl.provider =null
ssl.secure.random.implementation =null
ssl.trustmanager.algorithm =PKIX
ssl.truststore.certificates =null
ssl.truststore.location =null
ssl.truststore.password =null
ssl.truststore.type =JKS
value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.325INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.325INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.325INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543252023-02-2210:20:54.325INFO38584---[ restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.326WARN38584---[ntainer#0-1-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.327INFO38584---[ restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values:
allow.auto.create.topics =true
auto.commit.interval.ms =1000
auto.offset.reset = earliest
bootstrap.servers =[192.168.1.92:9092]
check.crcs =true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-4
client.rack =
connections.max.idle.ms =540000default.api.timeout.ms =60000
enable.auto.commit =false
exclude.internal.topics =true
fetch.max.bytes =52428800
fetch.max.wait.ms =500
fetch.min.bytes =1
group.id =KafkaGroup
group.instance.id =null
heartbeat.interval.ms =3000
interceptor.classes =[]
internal.leave.group.on.close =true
internal.throw.on.fetch.stable.offset.unsupported =false
isolation.level = read_uncommitted
key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes =1048576
max.poll.interval.ms =600000
max.poll.records =50
metadata.max.age.ms =300000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level =INFO
metrics.sample.window.ms =30000
partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes =65536
reconnect.backoff.max.ms =1000
reconnect.backoff.ms =50
request.timeout.ms =30000
retry.backoff.ms =100
sasl.client.callback.handler.class=null
sasl.jaas.config =null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.service.name =null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class=null
sasl.login.class=null
sasl.login.connect.timeout.ms =null
sasl.login.read.timeout.ms =null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism =GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience =null
sasl.oauthbearer.expected.issuer =null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url =null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url =null
security.protocol =PLAINTEXT
security.providers =null
send.buffer.bytes =131072
session.timeout.ms =45000
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
ssl.cipher.suites =null
ssl.enabled.protocols =[TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class=null
ssl.key.password =null
ssl.keymanager.algorithm =SunX509
ssl.keystore.certificate.chain =null
ssl.keystore.key =null
ssl.keystore.location =null
ssl.keystore.password =null
ssl.keystore.type =JKS
ssl.protocol =TLSv1.2
ssl.provider =null
ssl.secure.random.implementation =null
ssl.trustmanager.algorithm =PKIX
ssl.truststore.certificates =null
ssl.truststore.location =null
ssl.truststore.password =null
ssl.truststore.type =JKS
value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.328INFO38584---[ntainer#0-1-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.332INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.335INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.340INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.340INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.340WARN38584---[ntainer#0-2-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.340INFO38584---[ restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543402023-02-2210:20:54.340INFO38584---[ntainer#0-2-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.340INFO38584---[ restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.340INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.343INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.361WARN38584---[ntainer#0-3-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.361INFO38584---[ntainer#0-3-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.363INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.365INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.367INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.367INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.368INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.368INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.374INFO38584---[ restartedMain]o.s.b.w.embedded.tomcat.TomcatWebServer:Tomcat started on port(s):8001(http)withcontext path ''
2023-02-2210:20:54.375INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.376INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.389WARN38584---[ntainer#0-0-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=LEADER_NOT_AVAILABLE}2023-02-2210:20:54.390INFO38584---[ntainer#0-0-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.390INFO38584---[ntainer#0-0-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.391INFO38584---[ntainer#0-0-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.394INFO38584---[ restartedMain]com.xu.kafka.KafkaApplication:StartedKafkaApplication in 3.708 seconds (JVM running for4.278)
6 效果测试
发送成功 SendResult[producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers =[], isReadOnly =true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]
本文转载自: https://blog.csdn.net/qq_34814092/article/details/129079743
版权归原作者 深色風信子 所有, 如有侵权,请联系我们删除。
版权归原作者 深色風信子 所有, 如有侵权,请联系我们删除。