0


kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

1、指定offset消费

1.1、创建消费者监听器‘

packagecom.atguigu.spring.kafka.consumer.listener;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.annotation.PartitionOffset;importorg.springframework.kafka.annotation.TopicPartition;importorg.springframework.stereotype.Component;@ComponentpublicclassMyKafkaPartitionListener{//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(
            topicPartitions ={@TopicPartition(topic ="my_topic1",partitionOffsets ={@PartitionOffset(partition ="0",initialOffset ="2")})}, groupId ="my_group1")publicvoidonMessage1(ConsumerRecord<String,String> record){System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port:8120# v1spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit:true# 消费者提交ack时多长时间批量提交一次auto-commit-interval:1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

packagecom.atguigu.spring.kafka.consumer.config;importorg.apache.kafka.clients.admin.NewTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.TopicBuilder;@ConfigurationpublicclassMyKafkaConfig{@BeanpublicNewTopicspringTestPartitionTopic(){returnTopicBuilder.name("my_topic1")//主题名称.partitions(3)//分区数量.replicas(3)//副本数量.build();}}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

packagecom.atguigu.spring.kafka.consumer;importjakarta.annotation.Resource;importorg.junit.jupiter.api.Test;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.kafka.core.KafkaTemplate;@SpringBootTestclassSpringKafkaConsumerApplicationTests{@ResourceKafkaTemplate kafkaTemplate;@TestvoidcontextLoads(){for(int i =0; i <10; i++){
            kafkaTemplate.send("my_topic1",i%3,"","指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition":0,"offset":0,"msg":"指定offset消费0","timespan":1717660785962,"date":"2024-06-06 07:59:45"},{"partition":0,"offset":1,"msg":"指定offset消费3","timespan":1717660785974,"date":"2024-06-06 07:59:45"},{"partition":0,"offset":2,"msg":"指定offset消费6","timespan":1717660785975,"date":"2024-06-06 07:59:45"},{"partition":0,"offset":3,"msg":"指定offset消费9","timespan":1717660785975,"date":"2024-06-06 07:59:45"}]]

1.5、创建SpringBoot启动类

packagecom.atguigu.spring.kafka.consumer;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn@SpringBootApplicationpublicclassSpringKafkaConsumerApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration><!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><loggername="org.apache.kafka.clients"level="debug"/></configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/><!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ |'_| | '_ \/ _` |\\\\\\/  ___)||_)|||||||(_||))))
  '  |____| .__|_||_|_||_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset =2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset =3,key = ,value = 指定offset消费9

**

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

**

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ |'_| | '_ \/ _` |\\\\\\/  ___)||_)|||||||(_||))))
  '  |____| .__|_||_|_||_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset =2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset =3,key = ,value = 指定offset消费9
标签: kafka spring boot linq

本文转载自: https://blog.csdn.net/m0_65152767/article/details/139502638
版权归原作者 小丁学Java 所有, 如有侵权,请联系我们删除。

“kafka-消费者-指定offset消费(SpringBoot整合Kafka)”的评论:

还没有评论