Kafka的Offset详解
1、生产者Offset
2、消费者Offset
2.1、消费者
packagecom.power.consumer;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassEventConsumer{/**
* topics 用于指定从哪个主题中消费消息
* concurrency 用于指定有多少个消费者
* @param record
*/@KafkaListener(topics ={"offSetTopic"}, groupId ="offSetGroup")publicvoidonEventA(ConsumerRecord<String,String> record){System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = "+ record);}}
2.2、生产者
packagecom.power.producer;importcom.power.model.User;importcom.power.util.JSONUtils;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.util.Date;@ComponentpublicclassEventProducer{@ResourceprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsendEvent(){for(int i =0; i <2; i++){User user =User.builder().id(i).phone("1567676767"+i).birthday(newDate()).build();String userJson =JSONUtils.toJSON(user);
kafkaTemplate.send("offSetTopic","k"+i, userJson);}}}
2.3、实体类对象
packagecom.power.model;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;importjava.util.Date;@Builder@AllArgsConstructor@NoArgsConstructor@DatapublicclassUser{privateInteger id;privateString phone;privateDate birthday;}
2.4、JSON工具类
packagecom.power.util;importcom.fasterxml.jackson.core.JsonProcessingException;importcom.fasterxml.jackson.databind.ObjectMapper;publicclassJSONUtils{privatestaticfinalObjectMapperOBJECTMAPPER=newObjectMapper();publicstaticStringtoJSON(Object object){try{returnOBJECTMAPPER.writeValueAsString(object);}catch(JsonProcessingException e){thrownewRuntimeException(e);}}publicstatic<T>TtoBean(String json,Class<T> clazz){try{returnOBJECTMAPPER.readValue(json,clazz);}catch(JsonProcessingException e){thrownewRuntimeException(e);}}}
2.5、项目配置文件
spring:
application:
#应用名称
name: spring-boot-06-kafka-offset
#kafka连接地址(ip+port)
kafka:
bootstrap-servers:<你的kafka服务器IP>:9092
#配置消费者的反序列化
consumer:
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
2.6、测试类
packagecom.power;importcom.power.producer.EventProducer;importorg.junit.jupiter.api.Test;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassSpringBoot07KafkaBaseApplication{@ResourceprivateEventProducer eventProducer;@TestvoidsendInterceptor(){
eventProducer.sendEvent();}}
2.7、测试
- 先启动生产者,会发送两条消息到kafka服务器
- 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息
- 在kafka安装目录的bin文件夹下执行命令:
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092--group offSetGroup --describe
- 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。
- 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量
- 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据
2.8、总结
- 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
- 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092--group offSetGroup --describe
本文转载自: https://blog.csdn.net/qq_46112274/article/details/141529015
版权归原作者 小码哥呀 所有, 如有侵权,请联系我们删除。
版权归原作者 小码哥呀 所有, 如有侵权,请联系我们删除。