0


Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
  1. cat your_cert.pem your_key.key > combined.pem
  2. openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

  1. keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成

  1. truststore.jks

文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成

  1. truststore.jks

的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts
  2. 将根证书或证书链保存为 .pem 文件。
  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

项目集成:

maven集成:

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.5.5.RELEASE</version>
  5. </dependency>

nacos配置:

  1. spring:
  2. kafka:
  3. bootstrap-servers: SSL://connectedca.com:443 ##换成你自己的连接
  4. ssl:
  5. protocol: TLS
  6. ###3这三个密码是你证书配置的时候设置的密码
  7. trust-store-password: a123456
  8. key-store-password: a123456
  9. key-password: a123456
  10. consumer:
  11. group-id:
  12. producer:
  13. topic: *.event ##换成你自己的topic

核心配置:

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.kafka.clients.admin.AdminClientConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.core.io.Resource;
  9. import org.springframework.core.io.ResourceLoader;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  12. import org.springframework.kafka.core.KafkaAdmin;
  13. import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
  14. import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
  15. import org.springframework.kafka.support.serializer.JsonDeserializer;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. @Slf4j
  19. @Configuration
  20. public class KafkaConfiguration {
  21. @Autowired
  22. C3ConfigProperties c3ConfigProperties;
  23. @Autowired
  24. private KafkaConfig kafkaProperties;
  25. @Autowired
  26. private ResourceLoader resourceLoader;
  27. @Bean
  28. public KafkaAdmin kafkaAdmin() {
  29. Map <String, Object> configs = new HashMap <>();
  30. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  31. return new KafkaAdmin(configs);
  32. }
  33. @Bean
  34. public DefaultKafkaConsumerFactory <String, String> consumerFactory() {
  35. Map <String, Object> consumerConfig = new HashMap <>();
  36. consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  37. consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");
  38. consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");
  39. consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  40. consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  41. // 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息
  42. consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
  43. consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头
  44. consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  45. consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息
  46. consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名
  47. String pemUrl = "";
  48. String csrUrl = "";
  49. if (c3ConfigProperties.getEnvironment().equals("uat")) {
  50. pemUrl = "file/uat/kafka/client.jks";
  51. csrUrl = "file/uat/kafka/truststore.jks";
  52. } else if (c3ConfigProperties.getEnvironment().equals("pre")) {
  53. pemUrl = "file/pre/kafka/client.jks";
  54. csrUrl = "file/pre/kafka/truststore.jks";
  55. } else if (c3ConfigProperties.getEnvironment().equals("prod")) {
  56. pemUrl = "file/prod/kafka/client.jks";
  57. csrUrl = "file/prod/kafka/truststore.jks";
  58. }
  59. try {
  60. // 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载
  61. Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);
  62. Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
  63. // 获取证书文件的路径
  64. String keyStorePath = pemResource.getFile().getAbsolutePath();
  65. String trustStorePath = csrResource.getFile().getAbsolutePath();
  66. consumerConfig.put("ssl.keystore.location", keyStorePath);
  67. consumerConfig.put("ssl.truststore.location", trustStorePath);
  68. }catch (Exception e){
  69. log.error("Resource file error:{}",e.getMessage());
  70. }
  71. consumerConfig.put("security.protocol", "SSL");
  72. consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());
  73. consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());
  74. consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());
  75. return new DefaultKafkaConsumerFactory <>(consumerConfig);
  76. }
  77. @Bean
  78. public ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {
  79. ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();
  80. factory.setConsumerFactory(consumerFactory());
  81. factory.setConcurrency(3); // 设置并发消费者数量
  82. factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器
  83. return factory;
  84. }
  85. @Bean
  86. public KafkaC3MsgListener kafkaC3MsgListener() {
  87. return new KafkaC3MsgListener();
  88. }
  89. }

注入配置:

  1. import lombok.Data;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Configuration;
  4. @Data
  5. @Configuration
  6. public class KafkaConfig {
  7. @Value("${spring.kafka.bootstrap-servers}")
  8. private String bootstrapServers;
  9. @Value("${spring.kafka.consumer.group-id}")
  10. private String groupId;
  11. @Value("${spring.kafka.producer.topic}")
  12. private String topic;
  13. @Value("${spring.kafka.ssl.trust-store-password}")
  14. private String trustStorePassword;
  15. @Value("${spring.kafka.ssl.key-store-password}")
  16. private String keyStorePassword;
  17. @Value("${spring.kafka.ssl.key-password}")
  18. private String keyPassword;
  19. }

能够看到这个配置就成功了表示:

然后在监听处理消息即可

————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

标签: spring boot kafka ssl

本文转载自: https://blog.csdn.net/qq_39751120/article/details/136590616
版权归原作者 小杨互联网 所有, 如有侵权,请联系我们删除。

“Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费”的评论:

还没有评论