0


java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据

  1. 如果不是有要求或者kafka生产者没有消费者群组,就不要用assign方式订阅,还是用subscribe订阅主题,我是被生产者坑了,开始给我说没有消费者群组,所有我只能用assign订阅指定分区,后来才给我说有消费者群组。
  1. import com.alibaba.fastjson2.JSON;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.PartitionInfo;
  4. import org.apache.kafka.common.TopicPartition;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.CommandLineRunner;
  7. import org.springframework.stereotype.Component;
  8. import java.util.*;
  9. @Component
  10. public class KafkaConsumerAssign implements
  11. CommandLineRunner {
  12. @Value("${ss.pubTopic}")
  13. private String pubTopic = "topic";
  14. @Value("${ss.kafkaAddress}")
  15. private String kafkaAddress = "xx.xx.xxx.xx:8093,xx.xxx.xxx.xx:8093,xx.xxx.xxx.xx:8093";
  16. public void autoCommit() {
  17. ConsumerDict consumerDict = new ConsumerDict();
  18. Properties properties = new Properties();
  19. // 指定key与value的反序列化器
  20. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  21. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. properties.put("enable.auto.commit", false);//手动提交提交
  23. properties.put("bootstrap.servers", kafkaAddress);//kafka连接地址
  24. //消费者群组,如果没有群组的话可以写通,若果有消费者组不写会,后面提交偏移量的时候会报错
  25. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");//消费者组
  26. // properties.put("max.poll.records",50);//单次最大记录数
  27. // properties.put("session.timeout.ms","50000");//消费者连接的超时时间
  28. properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='用户名' password='密码';");
  29. properties.put("security.protocol", "SASL_SSL");//安全协议
  30. properties.put("sasl.mechanism", "SCRAM-SHA-256");//加密方式
  31. //指定truststore文件
  32. properties.put("ssl.truststore.location", "D:/xxx/xx/xxx/xxxxxx.jks");
  33. //truststore文件密码
  34. properties.put("ssl.truststore.password", "aaaaaa");
  35. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  36. ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
  37. //使用partitionsFor获取该topic下所有的分区
  38. List<PartitionInfo> partitionInfos = consumer.partitionsFor(pubTopic);
  39. for (PartitionInfo partitionInfo : partitionInfos) {
  40. topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
  41. }
  42. //使用assign方式订阅kafka
  43. consumer.assign(topicPartitions);
  44. operationKafkaMessage(consumer);
  45. }
  46. //启动程序后自动启动此kafka客户端
  47. @Override
  48. public void run(String... args) {
  49. new KafkaConsumerAssign().autoCommit();
  50. }
  51. private void operationKafkaMessage(KafkaConsumer<String, String> consumer) {
  52. while (true) {
  53. ConsumerRecords<String, String> records = consumer.poll(100);//100ms 自动获取一次数据,消费者主动发起请求
  54. //循环所有的分区
  55. for (TopicPartition partition : records.partitions()) {
  56. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  57. //获取每个分区中的所有数据
  58. for (ConsumerRecord<String, String> record : partitionRecords) {
  59. System.out.println(record.offset() + ": " + record.value());
  60. }
  61. //当前的消费到的位置
  62. long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
  63. //处理完每个分区中的消息后,提交偏移量。
  64. consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
  65. }
  66. }
  67. }
  68. public static void main(String[] args) {
  69. //new KafkaConsumerAssign().autoCommit();
  70. }
  71. }
标签: java kafka 开发语言

本文转载自: https://blog.csdn.net/malz_zh/article/details/131100844
版权归原作者 1988我想和这个世界谈谈 所有, 如有侵权,请联系我们删除。

“java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据”的评论:

还没有评论