如果不是有要求或者kafka生产者没有消费者群组,就不要用assign方式订阅,还是用subscribe订阅主题,我是被生产者坑了,开始给我说没有消费者群组,所有我只能用assign订阅指定分区,后来才给我说有消费者群组。
import com.alibaba.fastjson2.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class KafkaConsumerAssign implements
CommandLineRunner {
@Value("${ss.pubTopic}")
private String pubTopic = "topic";
@Value("${ss.kafkaAddress}")
private String kafkaAddress = "xx.xx.xxx.xx:8093,xx.xxx.xxx.xx:8093,xx.xxx.xxx.xx:8093";
public void autoCommit() {
ConsumerDict consumerDict = new ConsumerDict();
Properties properties = new Properties();
// 指定key与value的反序列化器
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", false);//手动提交提交
properties.put("bootstrap.servers", kafkaAddress);//kafka连接地址
//消费者群组,如果没有群组的话可以写通,若果有消费者组不写会,后面提交偏移量的时候会报错
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");//消费者组
// properties.put("max.poll.records",50);//单次最大记录数
// properties.put("session.timeout.ms","50000");//消费者连接的超时时间
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='用户名' password='密码';");
properties.put("security.protocol", "SASL_SSL");//安全协议
properties.put("sasl.mechanism", "SCRAM-SHA-256");//加密方式
//指定truststore文件
properties.put("ssl.truststore.location", "D:/xxx/xx/xxx/xxxxxx.jks");
//truststore文件密码
properties.put("ssl.truststore.password", "aaaaaa");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
//使用partitionsFor获取该topic下所有的分区
List<PartitionInfo> partitionInfos = consumer.partitionsFor(pubTopic);
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
//使用assign方式订阅kafka
consumer.assign(topicPartitions);
operationKafkaMessage(consumer);
}
//启动程序后自动启动此kafka客户端
@Override
public void run(String... args) {
new KafkaConsumerAssign().autoCommit();
}
private void operationKafkaMessage(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//100ms 自动获取一次数据,消费者主动发起请求
//循环所有的分区
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
//获取每个分区中的所有数据
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
//当前的消费到的位置
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//处理完每个分区中的消息后,提交偏移量。
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
public static void main(String[] args) {
//new KafkaConsumerAssign().autoCommit();
}
}
本文转载自: https://blog.csdn.net/malz_zh/article/details/131100844
版权归原作者 1988我想和这个世界谈谈 所有, 如有侵权,请联系我们删除。
版权归原作者 1988我想和这个世界谈谈 所有, 如有侵权,请联系我们删除。