1. 添加依赖
在项目的
pom.xml
文件中添加 Spring Kafka 的依赖。
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><!-- 请使用与 Spring Boot 版本相匹配的版本号 --></dependency>
2. 配置 Kafka
在
xx.yml
或
xx.properties
中配置 Kafka 连接信息,对于多数据源,可以按需配置多个 Kafka 集群的连接信息。
单数据源:
# ████████ Kafka相关 ████████
spring.kafka.bootstrap-servers=localhost:9092
# 用于在客户端向服务器发送请求时传递给服务器的标识字符串,不设置时系统会自动生成。
#spring.kafka.producer.client-id=common-parent
# 控制生产者(Producer)发送消息时的确认(acknowledgement)级别(1:只要集群的首领节点(Leader)收到消息,生产者就会收到确认。)
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries=10
# 指定元数据的最大生存时间
spring.kafka.producer.properties.metadata.max.age.ms=300000
# 可以发送的请求的最大字节数
spring.kafka.producer.properties.metadata.max.size=200000000
# 用于标识消费者所属的消费者组
spring.kafka.consumer.group-id=common-parent
# 用于设置当enable.auto.commit设置为true时,消费者自动向Kafka提交偏移量(offset)的频率,单位为毫秒。默认情况下,这个值是5000毫秒,即每5秒钟自动提交一次偏移量。
spring.kafka.consumer.enable-auto-commit=true
# 用于指定消费者在找不到已有偏移量(offset)时的行为(earliest:如果消费者组没有提交过偏移量,消费者将从主题中最早(即最早的偏移量)的消息开始消费)
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group=default
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
多数据源:
说明:第一个数据源我配置的是:localhost:9092,第二个数据源我配置的是:127.0.0.1:9092,虽然本质上是同一个数据源,不过重点看最后的配置是否生效即可。
# ████████ Kafka相关 ████████
spring.kafka.multiple.primary=first
spring.kafka.multiple.datasource.first.bootstrap-servers=localhost:9092
# 用于在客户端向服务器发送请求时传递给服务器的标识字符串,不设置时系统会自动生成。
#spring.kafka.multiple.datasource.first.producer.client-id=common-parent
# 控制生产者(Producer)发送消息时的确认(acknowledgement)级别(1:只要集群的首领节点(Leader)收到消息,生产者就会收到确认。)
spring.kafka.multiple.datasource.first.producer.acks=1
spring.kafka.multiple.datasource.first.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.multiple.datasource.first.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.multiple.datasource.first.producer.retries=10
# 指定元数据的最大生存时间
spring.kafka.multiple.datasource.first.producer.properties.metadata.max.age.ms=300000
# 可以发送的请求的最大字节数
spring.kafka.multiple.datasource.first.producer.properties.metadata.max.size=200000000
# 用于标识消费者所属的消费者组
spring.kafka.multiple.datasource.first.consumer.group-id=common-parent
# 用于设置当enable.auto.commit设置为true时,消费者自动向Kafka提交偏移量(offset)的频率,单位为毫秒。默认情况下,这个值是5000毫秒,即每5秒钟自动提交一次偏移量。
spring.kafka.multiple.datasource.first.consumer.enable-auto-commit=true
# 用于指定消费者在找不到已有偏移量(offset)时的行为(earliest:如果消费者组没有提交过偏移量,消费者将从主题中最早(即最早的偏移量)的消息开始消费)
spring.kafka.multiple.datasource.first.consumer.auto-offset-reset=earliest
spring.kafka.multiple.datasource.first.consumer.group=default
spring.kafka.multiple.datasource.first.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.multiple.datasource.first.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.multiple.datasource.second.bootstrap-servers=127.0.0.1:9092
#spring.kafka.multiple.datasource.second.producer.client-id=common-parent
spring.kafka.multiple.datasource.second.producer.acks=1
spring.kafka.multiple.datasource.second.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.multiple.datasource.second.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.multiple.datasource.second.producer.retries=10
spring.kafka.multiple.datasource.second.producer.properties.metadata.max.age.ms=300000
spring.kafka.multiple.datasource.second.producer.properties.metadata.max.size=200000000
spring.kafka.multiple.datasource.second.consumer.group-id=default
spring.kafka.multiple.datasource.second.consumer.enable-auto-commit=true
spring.kafka.multiple.datasource.second.consumer.auto-offset-reset=earliest
spring.kafka.multiple.datasource.second.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.multiple.datasource.second.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. 创建Kafka配置类
KafkaConfig.java
packagepriv.fc.common_parent.configuration.config.mq;importcn.hutool.core.util.ObjectUtil;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.boot.autoconfigure.kafka.KafkaProperties;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importjava.util.HashMap;importjava.util.Map;/**
* Kafka配置类
*
* @author 付聪
* @time 2024-08-11 14:54:13
*/@ConfigurationpublicclassKafkaConfig{// ———————————————— 配置属性Bean(开始) ————————————————@Primary@ConfigurationProperties(prefix ="spring.kafka.multiple.datasource.first")@Bean(name ="firstDataSourceKafkaProperties")publicKafkaPropertiesfirstDataSourceKafkaProperties(){returnnewKafkaProperties();}@ConfigurationProperties(prefix ="spring.kafka.multiple.datasource.second")@Bean(name ="secondDataSourceKafkaProperties")publicKafkaPropertiessecondDataSourceKafkaProperties(){returnnewKafkaProperties();}// ———————————————— 配置属性Bean(结束) ————————————————// ———————————————— 生产者相关(开始) ————————————————@Primary@Bean(name ="kafkaTemplate", destroyMethod ="destroy")publicKafkaTemplatekafkaTemplate(@Autowired@Qualifier("firstDataSourceKafkaProperties")KafkaProperties kafkaProperties){returnnewKafkaTemplate(this.getProducerFactory(kafkaProperties));}@Bean(name ="firstKafkaTemplate", destroyMethod ="destroy")publicKafkaTemplatefirstKafkaTemplate(@Autowired@Qualifier("firstDataSourceKafkaProperties")KafkaProperties kafkaProperties){returnnewKafkaTemplate(this.getProducerFactory(kafkaProperties));}@Bean(name ="secondKafkaTemplate", destroyMethod ="destroy")publicKafkaTemplatesecondKafkaTemplate(@Autowired@Qualifier("secondDataSourceKafkaProperties")KafkaProperties kafkaProperties){returnnewKafkaTemplate(this.getProducerFactory(kafkaProperties));}privateProducerFactory<String,String>getProducerFactory(KafkaProperties kafkaProperties){returnnewDefaultKafkaProducerFactory<>(this.getProducerConfigs(kafkaProperties));}privateMap<String,Object>getProducerConfigs(KafkaProperties kafkaProperties){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}// ———————————————— 生产者相关(结束) ————————————————// ———————————————— 消费者相关(开始) ————————————————@Primary@Bean(name ="firstDataSourceConsumerFactory")publicConsumerFactory<Object,Object>firstDataSourceConsumerFactory(@Autowired@Qualifier("firstDataSourceKafkaProperties")KafkaProperties kafkaProperties){returnnewDefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());}@Bean(name ="firstDataSourceKafkaListenerContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object,Object>>firstDataSourceKafkaListenerContainerFactory(@Autowired@Qualifier("firstDataSourceKafkaProperties")KafkaProperties kafkaProperties,@Autowired@Qualifier("firstDataSourceConsumerFactory")ConsumerFactory<Object,Object> consumerFactory
){returngetKafkaListenerContainerFactory(kafkaProperties, consumerFactory);}@Bean(name ="secondDataSourceConsumerFactory")publicConsumerFactory<Object,Object>secondDataSourceConsumerFactory(@Autowired@Qualifier("secondDataSourceKafkaProperties")KafkaProperties kafkaProperties){returnnewDefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());}@Bean(name ="secondDataSourceKafkaListenerContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object,Object>>secondDataSourceKafkaListenerContainerFactory(@Autowired@Qualifier("secondDataSourceKafkaProperties")KafkaProperties kafkaProperties,@Autowired@Qualifier("secondDataSourceConsumerFactory")ConsumerFactory<Object,Object> consumerFactory
){returngetKafkaListenerContainerFactory(kafkaProperties, consumerFactory);}privateKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object,Object>>getKafkaListenerContainerFactory(KafkaProperties kafkaProperties,ConsumerFactory<Object,Object> consumerFactory){// 创建支持并发消费的容器ConcurrentKafkaListenerContainerFactory<Object,Object> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);if(ObjectUtil.isNotEmpty(kafkaProperties.getListener().getConcurrency())){// 设置容器的并发级别
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());}if(ObjectUtil.isNotEmpty(kafkaProperties.getListener().getAckMode())){// 设置Kafka消息监听器容器的确认模式
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());}return factory;}// ———————————————— 消费者相关(结束) ————————————————}
KafkaServiceConfig.java
packagepriv.fc.common_parent.common.config;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;importorg.springframework.kafka.core.KafkaTemplate;importpriv.fc.common_parent.common.service.KafkaService;importpriv.fc.common_parent.common.service.impl.KafkaServiceImpl;/**
* KafkaService配置类
*
* @author 付聪
* @time 2024-08-22 21:55:54
*/@ConfigurationpublicclassKafkaServiceConfig{@Primary@Bean(name ="kafkaService")publicKafkaServicekafkaService(@Autowired@Qualifier("kafkaTemplate")KafkaTemplate<String,String> kafkaTemplate){returnnewKafkaServiceImpl(kafkaTemplate);}@Bean(name ="firstKafkaService")publicKafkaServicefirstKafkaService(@Autowired@Qualifier("firstKafkaTemplate")KafkaTemplate<String,String> firstKafkaTemplate){returnnewKafkaServiceImpl(firstKafkaTemplate);}@Bean(name ="secondKafkaService")publicKafkaServicesecondKafkaService(@Autowired@Qualifier("secondKafkaTemplate")KafkaTemplate<String,String> secondKafkaTemplate){returnnewKafkaServiceImpl(secondKafkaTemplate);}}
4. 创建KafkaService接口
说明:这里仅仅封装了一个示例方法,自己需要什么方法可自行封装。
packagepriv.fc.common_parent.common.service;importorg.springframework.kafka.support.SendResult;importorg.springframework.util.concurrent.ListenableFuture;/**
* KafkaService接口
*
* @author 付聪
* @time 2024/4/12 11:37
*/publicinterfaceKafkaService{/**
* 将数据发送到提供的主题,不带键或分区。
*
* @param topic - 主题
* @param value - 数据
* @return ListenableFuture<SendResult<String, String>> - 发送后ListenableFuture的结果(具有接受完成回调的功能)
* @author 付聪
* @time 2024/8/11 下午2:59:00
*/ListenableFuture<SendResult<String,String>>send(String topic,String value);}
5. 创建KafkaService实现类
packagepriv.fc.common_parent.common.service.impl;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;importorg.springframework.util.concurrent.ListenableFuture;importpriv.fc.common_parent.common.service.KafkaService;/**
* KafkaService实现类
*
* @throws
* @author 付聪
* @time 2024-08-11 15:03:51
*/@ServicepublicclassKafkaServiceImplimplementsKafkaService{privatefinalKafkaTemplate<String,String> kafkaTemplate;publicKafkaServiceImpl(KafkaTemplate<String,String> template){this.kafkaTemplate = template;}@OverridepublicListenableFuture<SendResult<String,String>>send(String topic,String value){return kafkaTemplate.send(topic, value);}}
6. 测试发送消息
KafkaLearnController.java
packagepriv.fc.common_parent.learn.kafka.controller;importio.swagger.annotations.Api;importio.swagger.annotations.ApiOperation;importio.swagger.annotations.ApiResponse;importio.swagger.annotations.ApiResponses;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.RestController;importpriv.fc.common_parent.configuration.response.Result;importpriv.fc.common_parent.learn.kafka.service.KafkaLearnService;importjavax.annotation.Resource;/**
* Kafka学习管理
*
* @author 付聪
* @time 2024-08-10 23:30:03
*/@Api(tags ={"Kafka学习管理"}, hidden =false)@RestController@RequestMapping("/kafka_learn")publicclassKafkaLearnController{@ResourceprivateKafkaLearnService kafkaLearnService;@ApiOperation(value ="测试能否正常发送、消费消息", notes ="无特别说明!", hidden =false)@ApiResponses({@ApiResponse(code =200, message ="OK", response =Void.class)})@RequestMapping(value ="/test", method =RequestMethod.POST)Result<Void>test(){
kafkaLearnService.send();returnnewResult<>();}}
KafkaLearnService.java
packagepriv.fc.common_parent.learn.kafka.service;/**
* Kafka学习Service接口
*
* @author 付聪
* @time 2024-08-10 23:30:03
*/publicinterfaceKafkaLearnService{/**
* 将数据发送到提供的主题,不带键或分区。
*
* @return void - 空
* @author 付聪
* @time 2024-08-10 23:31:08
*/voidsend();}
KafkaLearnValueReqDto.java
packagepriv.fc.common_parent.learn.kafka.req_dto;importio.swagger.annotations.ApiModel;importio.swagger.annotations.ApiModelProperty;importlombok.Data;importlombok.EqualsAndHashCode;importlombok.ToString;importorg.springframework.format.annotation.DateTimeFormat;importpriv.fc.common_parent.common.req_dto.CommonReqDto;importjava.io.Serializable;importjava.util.Date;/**
* Kafka学习数据请求对象
*
* @author 付聪
* @time 2024-08-11 09:37:25
*/@ApiModel(value ="KafkaLearnValueReqDto", description ="Kafka学习数据请求对象")@Data@EqualsAndHashCode(callSuper =true)@ToString(callSuper =true)// 说明:可以不继承CommonReqDto(自己定义的一个公共请求DTO而已)publicclassKafkaLearnValueReqDtoextendsCommonReqDtoimplementsSerializable{privatestaticfinallong serialVersionUID =7674576755150005686L;@ApiModelProperty(value ="主键ID", example ="", hidden =false)privateLong id;@ApiModelProperty(value ="备注", example ="", hidden =false)privateString remark;@ApiModelProperty(value ="创建人ID", example ="", hidden =false)privateLong createPersonId;@ApiModelProperty(value ="创建时间(默认以【yyyy-MM-dd HH:mm:ss】格式的字符串接收前端传参)", example ="2024-04-09 16:04:03", hidden =false)@DateTimeFormat(pattern ="yyyy-MM-dd HH:mm:ss")privateDate createTime;@ApiModelProperty(value ="更新人ID", example ="", hidden =false)privateLong updatePersonId;@ApiModelProperty(value ="更新时间(默认以【yyyy-MM-dd HH:mm:ss】格式的字符串接收前端传参)", example ="2024-04-09 16:04:03", hidden =false)@DateTimeFormat(pattern ="yyyy-MM-dd HH:mm:ss")privateDate updateTime;@ApiModelProperty(value ="是否删除", example ="", hidden =false)privateInteger delFlag;}
KafkaLearnServiceImpl.java
packagepriv.fc.common_parent.learn.kafka.service.impl;importcn.hutool.json.JSONUtil;importorg.apache.commons.lang.math.NumberUtils;importorg.springframework.stereotype.Service;importpriv.fc.common_parent.common.service.KafkaService;importpriv.fc.common_parent.learn.kafka.req_dto.KafkaLearnValueReqDto;importpriv.fc.common_parent.learn.kafka.service.KafkaLearnService;importjavax.annotation.Resource;/**
* Kafka学习Service实现类
*
* @author 付聪
* @time 2024-08-10 23:30:03
*/@ServicepublicclassKafkaLearnServiceImplimplementsKafkaLearnService{@ResourceprivateKafkaService kafkaService;@ResourceprivateKafkaService firstKafkaService;@ResourceprivateKafkaService secondRKafkaService;@Overridepublicvoidsend(){KafkaLearnValueReqDto kafkaLearnValueReqDto =newKafkaLearnValueReqDto();
kafkaLearnValueReqDto.setRemark("【默认数据源】的消息:Hello Kafka!");
kafkaLearnValueReqDto.setDelFlag(NumberUtils.INTEGER_ZERO);// 说明:发送消息之前的Topic需要先创建,可以通过命令或者可视化工具创建。
kafkaService.send("test",JSONUtil.toJsonStr(kafkaLearnValueReqDto));KafkaLearnValueReqDto kafkaLearnValueReqDto1 =newKafkaLearnValueReqDto();
kafkaLearnValueReqDto1.setRemark("【第一个数据源】的消息:Hello Kafka!");
kafkaLearnValueReqDto1.setDelFlag(NumberUtils.INTEGER_ZERO);
firstKafkaService.send("test",JSONUtil.toJsonStr(kafkaLearnValueReqDto1));KafkaLearnValueReqDto kafkaLearnValueReqDto2 =newKafkaLearnValueReqDto();
kafkaLearnValueReqDto2.setRemark("【第二个数据源】的消息:Hello Kafka!");
kafkaLearnValueReqDto2.setDelFlag(NumberUtils.INTEGER_ZERO);
secondRKafkaService.send("test",JSONUtil.toJsonStr(kafkaLearnValueReqDto2));}}
6. 测试消费消息
KafkaLearnConsumer.java
packagepriv.fc.common_parent.learn.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;/**
* Kafka学习消费者
*
* @author 付聪
* @time 2024-08-11 09:37:25
*/@ComponentpublicclassKafkaLearnConsumer{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaLearnConsumer.class);@KafkaListener(topics ="test")publicvoidconsume(ConsumerRecord<String,String> record){
logger.info("record是:{}", record);
logger.info("value是:{}", record.value());}@KafkaListener(topics ="test", containerFactory ="firstDataSourceKafkaListenerContainerFactory")publicvoidconsume1(ConsumerRecord<String,String> record){
logger.info("record是:{}", record);
logger.info("value是:{}", record.value());}@KafkaListener(topics ="test", containerFactory ="secondDataSourceKafkaListenerContainerFactory")publicvoidconsume2(ConsumerRecord<String,String> record){
logger.info("record是:{}", record);
logger.info("value是:{}", record.value());}}
8. 启动项目
2024-08-11 15:10:06,157 INFO [main] o.a.k.c.u.AppInfoParser.<init>(117): Kafka version: 2.5.1
2024-08-11 15:10:06,158 INFO [main] o.a.k.c.u.AppInfoParser.<init>(118): Kafka commitId: 0efa8fb0f4c73d92
2024-08-11 15:10:06,159 INFO [main] o.a.k.c.u.AppInfoParser.<init>(119): Kafka startTimeMs: 1723360206157
2024-08-11 15:10:06,159 INFO [main] o.a.k.c.c.KafkaConsumer.subscribe(974): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Subscribed to topic(s): test
2024-08-11 15:10:06,160 INFO [main] o.s.s.c.ThreadPoolTaskScheduler.initialize(181): Initializing ExecutorService
2024-08-11 15:10:06,162 INFO [main] o.a.c.h.Http11NioProtocol.log(173): Starting ProtocolHandler ["http-nio-8888"]
2024-08-11 15:10:06,227 INFO [main] o.s.b.w.e.t.TomcatWebServer.start(220): Tomcat started on port(s): 8888 (http) with context path ''
2024-08-11 15:10:07,380 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.Metadata.update(277): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Cluster ID: 5WIA7foUSN-3iLQFV8vA7Q
2024-08-11 15:10:07,380 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.Metadata.update(277): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Cluster ID: 5WIA7foUSN-3iLQFV8vA7Q
2024-08-11 15:10:07,380 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.Metadata.update(277): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Cluster ID: 5WIA7foUSN-3iLQFV8vA7Q
2024-08-11 15:10:07,386 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(797): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-08-11 15:10:07,386 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(797): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-08-11 15:10:07,386 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(797): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-08-11 15:10:07,398 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,398 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,399 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,440 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator.joinGroupIfNeeded(455): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2024-08-11 15:10:07,440 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.AbstractCoordinator.joinGroupIfNeeded(455): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2024-08-11 15:10:07,440 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.AbstractCoordinator.joinGroupIfNeeded(455): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2024-08-11 15:10:07,441 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,441 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,442 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest(552): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] (Re-)joining group
2024-08-11 15:10:07,458 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.performAssignment(611): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Finished assignment for group at generation 64: {consumer-common-parent-2-4da3eb1e-e025-4ce1-8b0b-1efab52b6e68=Assignment(partitions=[test-2, test-3]), consumer-common-parent-1-65b1a27e-0288-4cc4-b633-ea5e6e7496d4=Assignment(partitions=[test-0, test-1]), consumer-common-parent-3-deffe8ba-dac0-4eda-97bc-bc0bf81c789d=Assignment(partitions=[test-4])}
2024-08-11 15:10:07,467 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(503): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Successfully joined group with generation 64
2024-08-11 15:10:07,467 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(503): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Successfully joined group with generation 64
2024-08-11 15:10:07,467 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator.onSuccess(503): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Successfully joined group with generation 64
2024-08-11 15:10:07,484 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.invokePartitionsAssigned(273): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Adding newly assigned partitions: test-3, test-2
2024-08-11 15:10:07,484 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.invokePartitionsAssigned(273): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Adding newly assigned partitions: test-4
2024-08-11 15:10:07,484 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.invokePartitionsAssigned(273): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Adding newly assigned partitions: test-1, test-0
2024-08-11 15:10:07,518 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(799): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Setting offset for partition test-1 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-08-11 15:10:07,518 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(799): [Consumer clientId=consumer-common-parent-3, groupId=common-parent] Setting offset for partition test-4 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-08-11 15:10:07,518 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(799): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Setting offset for partition test-3 to the committed offset FetchPosition{offset=9, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-08-11 15:10:07,518 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(799): [Consumer clientId=consumer-common-parent-1, groupId=common-parent] Setting offset for partition test-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-08-11 15:10:07,519 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(799): [Consumer clientId=consumer-common-parent-2, groupId=common-parent] Setting offset for partition test-2 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-08-11 15:10:07,543 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer.info(292): common-parent: partitions assigned: [test-1, test-0]
2024-08-11 15:10:07,543 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer.info(292): common-parent: partitions assigned: [test-4]
2024-08-11 15:10:07,543 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer.info(292): common-parent: partitions assigned: [test-3, test-2]
2024-08-11 15:10:07,544 INFO [main] p.f.c.m.MainApplication.logStarted(61): Started MainApplication in 36.901 seconds (JVM running for 42.065)
9. 验证是否成功
2024-08-11 15:14:21,758 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] p.f.c.l.k.c.Consumer.consume(16): record是:ConsumerRecord(topic = test, partition = 3, leaderEpoch = 0, offset = 9, CreateTime = 1723360461727, serialized key size = -1, serialized value size = 73, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"remark":"【第二个数据源】的消息:Hello Kafka!","delFlag":0})
2024-08-11 15:14:21,758 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] p.f.c.l.k.c.Consumer.consume2(28): record是:ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 12, CreateTime = 1723360461719, serialized key size = -1, serialized value size = 73, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"remark":"【第一个数据源】的消息:Hello Kafka!","delFlag":0})
2024-08-11 15:14:21,758 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] p.f.c.l.k.c.Consumer.consume(17): value是:{"remark":"【第二个数据源】的消息:Hello Kafka!","delFlag":0}
2024-08-11 15:14:21,758 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] p.f.c.l.k.c.Consumer.consume2(29): value是:{"remark":"【第一个数据源】的消息:Hello Kafka!","delFlag":0}
2024-08-11 15:14:21,760 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] p.f.c.l.k.c.Consumer.consume(16): record是:ConsumerRecord(topic = test, partition = 3, leaderEpoch = 0, offset = 10, CreateTime = 1723360461663, serialized key size = -1, serialized value size = 70, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"remark":"【默认数据源】的消息:Hello Kafka!","delFlag":0})
2024-08-11 15:14:21,760 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] p.f.c.l.k.c.Consumer.consume(17): value是:{"remark":"【默认数据源】的消息:Hello Kafka!","delFlag":0}
版权归原作者 付聪1210 所有, 如有侵权,请联系我们删除。