0


SpringBoot 集成 Kafka

SpringBoot 集成 Kafka

1 Docker 安装 Kafka

Docker 安装 Kafka

2 Kafka 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator>dockerexec-it kafka /bin/sh

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic topic1
Created topic topic1.

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic topic2
Created topic topic2.

3 Java 创建 Topic

packagecom.xu.mq.demo.test.service;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.apache.kafka.clients.admin.NewTopic;/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */@ConfigurationpublicclassKafkaInitialConfig{publicstaticfinalStringAUDIO_UPLOAD_TOPIC="AudioUploadTopic";publicstaticfinalStringTEXT_UPLOAD_TOPIC="TextUploadTopic";@BeanpublicNewTopicaudioUploadTopic(){// 设置分区1,备份1 returnnewNewTopic(AUDIO_UPLOAD_TOPIC,1,(short)1);}@BeanpublicNewTopictextUploadTopic(){// 设置分区3,备份2 returnnewNewTopic(TEXT_UPLOAD_TOPIC,3,(short)2);}}

4 SpringBoot 集成 Kafka

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.2 application.yml

server:port:8001spring:application:name: hello-kafka
  kafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 消息发送失败重试次数retries:3# 重试间隔retry-backoff-ms:500# 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none,lz4压缩比最高compression-type: none
      #发送缓冲区大小32Mbuffer-memory:33554432# 去缓冲区中一次拉16k的数据,发送到brokerbatch-size:16384# 每条消息大小限制 20Mmax-request-size:20971520# 设置发送延时时间,如果在设置的时间内依然没有达到batch-size,依然发出消息给kafka集群linger-ms:30# 失败重试时,保证消息顺序性,会降低吞吐量max-in-flight-requests-per-connection:1# 开启发送消息幂等性(单分区)enable-idempotence:true# 生产者空间不足时阻塞的时间,默认60smax-block-ms:6000# acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。# acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。# acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。acks:-1# key,value序列化器选择key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 开启事务,当开启时retries必须>0 acks必须为all 可以使用kafkaTemplate.executeInTransaction和@Transactional实现卡夫卡事物transaction-id-prefix: transaction
    consumer:group-id: KafkaGroup
      # 提交offset延时(接收到消息后多久提交offset)auto-commit-interval:1000# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit:false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量消费每次最多消费多少条消息max-poll-records:50# 根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalanceproperties:max:poll:interval:ms:600000listener:missing-topics-fatal:false# 线程数concurrency:4# RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交# BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交# COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交# COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交# MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交# MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种ack-mode: manual-immediate
      # 消费超时时间poll-timeout:3000

4.3 KafkaApplication.java

packagecom.xu.mq.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.kafka.annotation.EnableKafka;/**
 * @author Administrator
 */@EnableKafka@SpringBootApplicationpublicclassKafkaApplication{publicstaticvoidmain(String[] args){SpringApplication.run(DemoApplication.class, args);}}

4.4 CustomizePartitioner.java

自定义消息推送的分区,更加具体的业务逻辑可以使用Java自定义设置推送分区也可以使用kafka的默认设置。

packagecom.xu.kafka.config;importjava.util.Map;importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;/**
 * @author Administrator
 */publicclassCustomizePartitionerimplementsPartitioner{/**
     * 自定义分区规则
     *
     * @param topic      The topic name
     * @param key        The key to partition on (or null if no key)
     * @param keyBytes   The serialized key to partition on( or null if no key)
     * @param value      The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster    The current cluster metadata
     * @return
     */@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

4.5 KafkaInitialConfig.java

packagecom.xu.kafka.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.apache.kafka.clients.admin.NewTopic;/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */@ConfigurationpublicclassKafkaInitialConfig{publicstaticfinalStringAUDIO_UPLOAD_TOPIC="AudioUploadTopic";publicstaticfinalStringTEXT_UPLOAD_TOPIC="TextUploadTopic";@BeanpublicNewTopicaudioUploadTopic(){// 设置分区1,备份1 returnnewNewTopic(AUDIO_UPLOAD_TOPIC,1,(short)1);}@BeanpublicNewTopictextUploadTopic(){// 设置分区3,备份2 returnnewNewTopic(TEXT_UPLOAD_TOPIC,3,(short)2);}}

4.6 SendMessageController.java 生产者

packagecom.xu.kafka.message.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.retry.support.RetryTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importcom.xu.kafka.config.KafkaInitialConfig;importlombok.extern.slf4j.Slf4j;/**
 * @author Administrator
 */@Slf4j@RequestMapping(value ="/kafka")@RestControllerpublicclassSendMessageController{@AutowiredprivateKafkaTemplate kafkaTemplate;@AutowiredprivateRetryTemplate retryTemplate;/**
     * KafkaTemplate 发送消息 同步
     *
     * @param msg
     */@GetMapping("/test1/{msg}")publicvoidtest1(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg);}/**
     * KafkaTemplate 发送消息 异步
     *
     * @param msg
     */@GetMapping("/test2/{msg}")publicvoidtest2(@PathVariable("msg")String msg)throwsException{String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg).get();}/**
     * KafkaTemplate 发送消息 同步 有回调
     *
     * @param msg
     */@GetMapping("/test3/{msg}")publicvoidtest3(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg).addCallback(success ->{System.out.println("发送成功\t"+ success);}, fail ->{System.out.println("发送失败\t"+ fail);});}/**
     * RetryTemplate 发送消息 同步 有回调
     *
     * @param msg
     */@GetMapping("/test4/{msg}")publicvoidtest4(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        retryTemplate.execute(retryCallback ->{
            kafkaTemplate.send(topic, key, msg);
            log.info("Kafka 发送成功 Topic:{}, Key:{}, Count:{}", topic, key, retryCallback.getRetryCount());return"success "+ retryCallback.getRetryCount();}, recoveryCallback ->{// 重试后仍然失败后需要执行的代码
            log.info("Kafka 发送失败 Topic:{}, Key{}, Count:{}, Info:{}", topic, key, recoveryCallback.getRetryCount(),
                    recoveryCallback.getLastThrowable().getMessage());return"failure "+ recoveryCallback.getRetryCount();});}/**
     * KafkaTemplate 发送消息 事物
     *
     * @param msg
     */@GetMapping("/test5/{msg}")publicvoidtest5(@PathVariable("msg")String msg){String key =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;String topic =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.executeInTransaction(operations ->{
            operations.send(topic, key, msg);thrownewRuntimeException("fail");});}}

4.7 KafkaConsumer.java 消费者

packagecom.xu.kafka.message.controller;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;importorg.apache.kafka.clients.consumer.ConsumerRecord;importcom.xu.kafka.config.KafkaInitialConfig;importlombok.extern.slf4j.Slf4j;/**
 * @author Administrator
 */@Slf4j@ComponentpublicclassKafkaConsumer{/**
     * 指定一个消费者组,一个主题主题。
     *
     * @param record
     */@KafkaListener(topics =KafkaInitialConfig.AUDIO_UPLOAD_TOPIC)publicvoidconsumer(ConsumerRecord<String,String> record,Acknowledgment ack){System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp());
        ack.acknowledge();}}
分区 =2, 偏移量 =16, key =11111111, 内容 =1111111111111111111111111111,创建消息的时间戳 =1676983128560

在这里插入图片描述

5 启动服务

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
(()\___ |'_ | '_|| '_ \/ _` | \ \ \ \
 \\/  ___)||_)|||||||(_||))))
  '  |____|.__|_||_|_||_\__,|////=========|_|==============|___/=/_/_/_/::SpringBoot::(v2.7.8)2023-02-2210:20:51.142INFO38584---[  restartedMain]com.xu.kafka.KafkaApplication:StartingKafkaApplication using Java1.8.0_301 on HyacinthwithPID38584(E:\SourceCode\Idea\kafka\target\classes started by Administrator in E:\SourceCode\Idea\kafka)2023-02-2210:20:51.143INFO38584---[  restartedMain]com.xu.kafka.KafkaApplication:No active profile set, falling back to1default profile:"default"2023-02-2210:20:51.200INFO38584---[  restartedMain].e.DevToolsPropertyDefaultsPostProcessor :Devtools property defaults active!Set 'spring.devtools.add-properties' to 'false' todisable2023-02-2210:20:51.200INFO38584---[  restartedMain].e.DevToolsPropertyDefaultsPostProcessor :For additional web related logging consider setting the 'logging.level.web' property to'DEBUG'2023-02-2210:20:53.010INFO38584---[  restartedMain]o.s.b.w.embedded.tomcat.TomcatWebServer:Tomcat initialized withport(s):8001(http)2023-02-2210:20:53.021INFO38584---[  restartedMain]o.apache.catalina.core.StandardService:Starting service [Tomcat]2023-02-2210:20:53.021INFO38584---[  restartedMain]org.apache.catalina.core.StandardEngine:StartingServlet engine:[ApacheTomcat/9.0.71]2023-02-2210:20:53.140INFO38584---[  restartedMain]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpring embedded WebApplicationContext2023-02-2210:20:53.140INFO38584---[  restartedMain]w.s.c.ServletWebServerApplicationContext:RootWebApplicationContext: initialization completed in 1939 ms
2023-02-2210:20:53.683INFO38584---[  restartedMain]o.s.b.d.a.OptionalLiveReloadServer:LiveReload server is running on port 357292023-02-2210:20:53.755INFO38584---[  restartedMain]o.a.k.clients.admin.AdminClientConfig:AdminClientConfig values: 
    bootstrap.servers =[192.168.1.92:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms =300000default.api.timeout.ms =60000
    metadata.max.age.ms =300000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level =INFO
    metrics.sample.window.ms =30000
    receive.buffer.bytes =65536
    reconnect.backoff.max.ms =1000
    reconnect.backoff.ms =50
    request.timeout.ms =30000
    retries =2147483647
    retry.backoff.ms =100
    sasl.client.callback.handler.class=null
    sasl.jaas.config =null
    sasl.kerberos.kinit.cmd =/usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.service.name =null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class=null
    sasl.login.class=null
    sasl.login.connect.timeout.ms =null
    sasl.login.read.timeout.ms =null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism =GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience =null
    sasl.oauthbearer.expected.issuer =null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url =null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url =null
    security.protocol =PLAINTEXT
    security.providers =null
    send.buffer.bytes =131072
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    ssl.cipher.suites =null
    ssl.enabled.protocols =[TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class=null
    ssl.key.password =null
    ssl.keymanager.algorithm =SunX509
    ssl.keystore.certificate.chain =null
    ssl.keystore.key =null
    ssl.keystore.location =null
    ssl.keystore.password =null
    ssl.keystore.type =JKS
    ssl.protocol =TLSv1.2
    ssl.provider =null
    ssl.secure.random.implementation =null
    ssl.trustmanager.algorithm =PKIX
    ssl.truststore.certificates =null
    ssl.truststore.location =null
    ssl.truststore.password =null
    ssl.truststore.type =JKS2023-02-2210:20:53.847WARN38584---[  restartedMain]o.a.k.clients.admin.AdminClientConfig:The configuration 'max.poll.interval.ms' was supplied but isn't a known config.2023-02-2210:20:53.849INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:53.849INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:53.849INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324538482023-02-2210:20:54.234ERROR38584---[  restartedMain]o.springframework.kafka.core.KafkaAdmin:Failedtocreate topics

org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.2023-02-2210:20:54.236ERROR38584---[  restartedMain]o.springframework.kafka.core.KafkaAdmin:Could not configure topics

org.springframework.kafka.KafkaException:Failedtocreate topics; nested exception is org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.
    at org.springframework.kafka.core.KafkaAdmin.addTopics(KafkaAdmin.java:450)[spring-kafka-2.8.11.jar:2.8.11]
    at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:300)[spring-kafka-2.8.11.jar:2.8.11]
    at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:201)[spring-kafka-2.8.11.jar:2.8.11]
    at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:171)[spring-kafka-2.8.11.jar:2.8.11]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974)[spring-beans-5.3.25.jar:5.3.25]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)[spring-context-5.3.25.jar:5.3.25]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)[spring-context-5.3.25.jar:5.3.25]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147)[spring-boot-2.7.8.jar:2.7.8]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731)[spring-boot-2.7.8.jar:2.7.8]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408)[spring-boot-2.7.8.jar:2.7.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:307)[spring-boot-2.7.8.jar:2.7.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)[spring-boot-2.7.8.jar:2.7.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)[spring-boot-2.7.8.jar:2.7.8]
    at com.xu.kafka.KafkaApplication.main(KafkaApplication.java:15)[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)~[na:1.8.0_301]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)~[na:1.8.0_301]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)~[na:1.8.0_301]
    at java.lang.reflect.Method.invoke(Method.java:498)~[na:1.8.0_301]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)[spring-boot-devtools-2.7.8.jar:2.7.8]Caused by:org.apache.kafka.common.errors.InvalidReplicationFactorException:Replication factor:2 larger than available brokers:1.2023-02-2210:20:54.236INFO38584---[| adminclient-1]o.a.kafka.common.utils.AppInfoParser:App info kafka.admin.client for adminclient-1 unregistered
2023-02-2210:20:54.239INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Metrics scheduler closed
2023-02-2210:20:54.240INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Closing reporter org.apache.kafka.common.metrics.JmxReporter2023-02-2210:20:54.240INFO38584---[| adminclient-1]org.apache.kafka.common.metrics.Metrics:Metrics reporters closed
2023-02-2210:20:54.266INFO38584---[  restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values: 
    allow.auto.create.topics =true
    auto.commit.interval.ms =1000
    auto.offset.reset = earliest
    bootstrap.servers =[192.168.1.92:9092]
    check.crcs =true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-KafkaGroup-1
    client.rack = 
    connections.max.idle.ms =540000default.api.timeout.ms =60000
    enable.auto.commit =false
    exclude.internal.topics =true
    fetch.max.bytes =52428800
    fetch.max.wait.ms =500
    fetch.min.bytes =1
    group.id =KafkaGroup
    group.instance.id =null
    heartbeat.interval.ms =3000
    interceptor.classes =[]
    internal.leave.group.on.close =true
    internal.throw.on.fetch.stable.offset.unsupported =false
    isolation.level = read_uncommitted
    key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes =1048576
    max.poll.interval.ms =600000
    max.poll.records =50
    metadata.max.age.ms =300000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level =INFO
    metrics.sample.window.ms =30000
    partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes =65536
    reconnect.backoff.max.ms =1000
    reconnect.backoff.ms =50
    request.timeout.ms =30000
    retry.backoff.ms =100
    sasl.client.callback.handler.class=null
    sasl.jaas.config =null
    sasl.kerberos.kinit.cmd =/usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.service.name =null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class=null
    sasl.login.class=null
    sasl.login.connect.timeout.ms =null
    sasl.login.read.timeout.ms =null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism =GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience =null
    sasl.oauthbearer.expected.issuer =null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url =null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url =null
    security.protocol =PLAINTEXT
    security.providers =null
    send.buffer.bytes =131072
    session.timeout.ms =45000
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    ssl.cipher.suites =null
    ssl.enabled.protocols =[TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class=null
    ssl.key.password =null
    ssl.keymanager.algorithm =SunX509
    ssl.keystore.certificate.chain =null
    ssl.keystore.key =null
    ssl.keystore.location =null
    ssl.keystore.password =null
    ssl.keystore.type =JKS
    ssl.protocol =TLSv1.2
    ssl.provider =null
    ssl.secure.random.implementation =null
    ssl.trustmanager.algorithm =PKIX
    ssl.truststore.certificates =null
    ssl.truststore.location =null
    ssl.truststore.password =null
    ssl.truststore.type =JKS
    value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.300INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.300INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.300INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324542992023-02-2210:20:54.301INFO38584---[  restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.309INFO38584---[  restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values: 
    allow.auto.create.topics =true
    auto.commit.interval.ms =1000
    auto.offset.reset = earliest
    bootstrap.servers =[192.168.1.92:9092]
    check.crcs =true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-KafkaGroup-2
    client.rack = 
    connections.max.idle.ms =540000default.api.timeout.ms =60000
    enable.auto.commit =false
    exclude.internal.topics =true
    fetch.max.bytes =52428800
    fetch.max.wait.ms =500
    fetch.min.bytes =1
    group.id =KafkaGroup
    group.instance.id =null
    heartbeat.interval.ms =3000
    interceptor.classes =[]
    internal.leave.group.on.close =true
    internal.throw.on.fetch.stable.offset.unsupported =false
    isolation.level = read_uncommitted
    key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes =1048576
    max.poll.interval.ms =600000
    max.poll.records =50
    metadata.max.age.ms =300000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level =INFO
    metrics.sample.window.ms =30000
    partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes =65536
    reconnect.backoff.max.ms =1000
    reconnect.backoff.ms =50
    request.timeout.ms =30000
    retry.backoff.ms =100
    sasl.client.callback.handler.class=null
    sasl.jaas.config =null
    sasl.kerberos.kinit.cmd =/usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.service.name =null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class=null
    sasl.login.class=null
    sasl.login.connect.timeout.ms =null
    sasl.login.read.timeout.ms =null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism =GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience =null
    sasl.oauthbearer.expected.issuer =null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url =null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url =null
    security.protocol =PLAINTEXT
    security.providers =null
    send.buffer.bytes =131072
    session.timeout.ms =45000
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    ssl.cipher.suites =null
    ssl.enabled.protocols =[TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class=null
    ssl.key.password =null
    ssl.keymanager.algorithm =SunX509
    ssl.keystore.certificate.chain =null
    ssl.keystore.key =null
    ssl.keystore.location =null
    ssl.keystore.password =null
    ssl.keystore.type =JKS
    ssl.protocol =TLSv1.2
    ssl.provider =null
    ssl.secure.random.implementation =null
    ssl.trustmanager.algorithm =PKIX
    ssl.truststore.certificates =null
    ssl.truststore.location =null
    ssl.truststore.password =null
    ssl.truststore.type =JKS
    value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.315INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.315INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.315INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543152023-02-2210:20:54.315INFO38584---[  restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.317INFO38584---[  restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values: 
    allow.auto.create.topics =true
    auto.commit.interval.ms =1000
    auto.offset.reset = earliest
    bootstrap.servers =[192.168.1.92:9092]
    check.crcs =true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-KafkaGroup-3
    client.rack = 
    connections.max.idle.ms =540000default.api.timeout.ms =60000
    enable.auto.commit =false
    exclude.internal.topics =true
    fetch.max.bytes =52428800
    fetch.max.wait.ms =500
    fetch.min.bytes =1
    group.id =KafkaGroup
    group.instance.id =null
    heartbeat.interval.ms =3000
    interceptor.classes =[]
    internal.leave.group.on.close =true
    internal.throw.on.fetch.stable.offset.unsupported =false
    isolation.level = read_uncommitted
    key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes =1048576
    max.poll.interval.ms =600000
    max.poll.records =50
    metadata.max.age.ms =300000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level =INFO
    metrics.sample.window.ms =30000
    partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes =65536
    reconnect.backoff.max.ms =1000
    reconnect.backoff.ms =50
    request.timeout.ms =30000
    retry.backoff.ms =100
    sasl.client.callback.handler.class=null
    sasl.jaas.config =null
    sasl.kerberos.kinit.cmd =/usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.service.name =null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class=null
    sasl.login.class=null
    sasl.login.connect.timeout.ms =null
    sasl.login.read.timeout.ms =null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism =GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience =null
    sasl.oauthbearer.expected.issuer =null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url =null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url =null
    security.protocol =PLAINTEXT
    security.providers =null
    send.buffer.bytes =131072
    session.timeout.ms =45000
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    ssl.cipher.suites =null
    ssl.enabled.protocols =[TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class=null
    ssl.key.password =null
    ssl.keymanager.algorithm =SunX509
    ssl.keystore.certificate.chain =null
    ssl.keystore.key =null
    ssl.keystore.location =null
    ssl.keystore.password =null
    ssl.keystore.type =JKS
    ssl.protocol =TLSv1.2
    ssl.provider =null
    ssl.secure.random.implementation =null
    ssl.trustmanager.algorithm =PKIX
    ssl.truststore.certificates =null
    ssl.truststore.location =null
    ssl.truststore.password =null
    ssl.truststore.type =JKS
    value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.325INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.325INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.325INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543252023-02-2210:20:54.325INFO38584---[  restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.326WARN38584---[ntainer#0-1-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.327INFO38584---[  restartedMain]o.a.k.clients.consumer.ConsumerConfig:ConsumerConfig values: 
    allow.auto.create.topics =true
    auto.commit.interval.ms =1000
    auto.offset.reset = earliest
    bootstrap.servers =[192.168.1.92:9092]
    check.crcs =true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-KafkaGroup-4
    client.rack = 
    connections.max.idle.ms =540000default.api.timeout.ms =60000
    enable.auto.commit =false
    exclude.internal.topics =true
    fetch.max.bytes =52428800
    fetch.max.wait.ms =500
    fetch.min.bytes =1
    group.id =KafkaGroup
    group.instance.id =null
    heartbeat.interval.ms =3000
    interceptor.classes =[]
    internal.leave.group.on.close =true
    internal.throw.on.fetch.stable.offset.unsupported =false
    isolation.level = read_uncommitted
    key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes =1048576
    max.poll.interval.ms =600000
    max.poll.records =50
    metadata.max.age.ms =300000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level =INFO
    metrics.sample.window.ms =30000
    partition.assignment.strategy =[classorg.apache.kafka.clients.consumer.RangeAssignor,classorg.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes =65536
    reconnect.backoff.max.ms =1000
    reconnect.backoff.ms =50
    request.timeout.ms =30000
    retry.backoff.ms =100
    sasl.client.callback.handler.class=null
    sasl.jaas.config =null
    sasl.kerberos.kinit.cmd =/usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.service.name =null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class=null
    sasl.login.class=null
    sasl.login.connect.timeout.ms =null
    sasl.login.read.timeout.ms =null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism =GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience =null
    sasl.oauthbearer.expected.issuer =null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url =null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url =null
    security.protocol =PLAINTEXT
    security.providers =null
    send.buffer.bytes =131072
    session.timeout.ms =45000
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    ssl.cipher.suites =null
    ssl.enabled.protocols =[TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class=null
    ssl.key.password =null
    ssl.keymanager.algorithm =SunX509
    ssl.keystore.certificate.chain =null
    ssl.keystore.key =null
    ssl.keystore.location =null
    ssl.keystore.password =null
    ssl.keystore.type =JKS
    ssl.protocol =TLSv1.2
    ssl.provider =null
    ssl.secure.random.implementation =null
    ssl.trustmanager.algorithm =PKIX
    ssl.truststore.certificates =null
    ssl.truststore.location =null
    ssl.truststore.password =null
    ssl.truststore.type =JKS
    value.deserializer =classorg.apache.kafka.common.serialization.StringDeserializer2023-02-2210:20:54.328INFO38584---[ntainer#0-1-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.332INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.335INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.340INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.1.22023-02-2210:20:54.340INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: f8c67dc3ae0a3265
2023-02-2210:20:54.340WARN38584---[ntainer#0-2-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.340INFO38584---[  restartedMain]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:16770324543402023-02-2210:20:54.340INFO38584---[ntainer#0-2-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.340INFO38584---[  restartedMain]o.a.k.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Subscribedtotopic(s):AudioUploadTopic2023-02-2210:20:54.340INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.343INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.361WARN38584---[ntainer#0-3-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}2023-02-2210:20:54.361INFO38584---[ntainer#0-3-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.363INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.365INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.367INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.367INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.368INFO38584---[ntainer#0-2-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.368INFO38584---[ntainer#0-1-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.374INFO38584---[  restartedMain]o.s.b.w.embedded.tomcat.TomcatWebServer:Tomcat started on port(s):8001(http)withcontext path ''
2023-02-2210:20:54.375INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup]Request joining group due to: need tore-join withthe given member-id
2023-02-2210:20:54.376INFO38584---[ntainer#0-3-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.389WARN38584---[ntainer#0-0-C-1]org.apache.kafka.clients.NetworkClient:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Errorwhile fetching metadata withcorrelation id 2:{AudioUploadTopic=LEADER_NOT_AVAILABLE}2023-02-2210:20:54.390INFO38584---[ntainer#0-0-C-1]org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]ClusterID:1-P2cI8YRgSflmbY8vMd1w2023-02-2210:20:54.390INFO38584---[ntainer#0-0-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup]Discovered group coordinator 192.168.1.92:9092(id:2147482646 rack:null)2023-02-2210:20:54.391INFO38584---[ntainer#0-0-C-1]o.a.k.c.c.internals.ConsumerCoordinator:[Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup](Re-)joining group
2023-02-2210:20:54.394INFO38584---[  restartedMain]com.xu.kafka.KafkaApplication:StartedKafkaApplication in 3.708 seconds (JVM running for4.278)

6 效果测试

在这里插入图片描述

发送成功    SendResult[producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers =[], isReadOnly =true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_34814092/article/details/129079743
版权归原作者 深色風信子 所有, 如有侵权,请联系我们删除。

“SpringBoot 集成 Kafka”的评论:

还没有评论