0


Springboot 集成kafka 以及连接 带有SASL/PLAIN 的kafka

前言

spring boot 集成kafka是比较简单的 直接引入spring-kafka的包 然后稍作配置即可

1. Spring Boot集成kafka

  • 添加 Kafka 依赖 在 pom.xml 文件中添加 Kafka 依赖:<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
  • 配置 Kafka 属性 在 application.properties 或 application.yml 文件中添加 Kafka 配置属性:# Kafka 服务器地址spring.kafka.bootstrap-servers=localhost:9092# 消费者配置spring.kafka.consumer.group-id=my-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 生产者配置spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • 创建测试消费者importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassKafkaConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsume(String message){System.out.println("Received message: "+ message);}}这里创建了一个 KafkaConsumer 类,使用 @KafkaListener 注解来监听名为 “my-topic” 的主题。当收到消息时,consume 方法会被调用,并打印出收到的消息。
  • 创建 Kafka 生产者importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@ComponentpublicclassKafkaProducer{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsend(String topic,String message){ kafkaTemplate.send(topic, message);}}这里创建了一个 KafkaProducer 类,通过注入 KafkaTemplate 来发送消息。send 方法接受主题和消息作为参数,并将消息发送到指定的主题。
  • 测试代码importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassKafkaTests{@AutowiredprivateKafkaProducer producer;@TestpublicvoidtestProduceAndConsume()throwsInterruptedException{String topic ="my-topic";String message ="Hello, Kafka!"; producer.send(topic, message);// 等待消息被消费Thread.sleep(5000);}}

官方地址: https://docs.spring.io/spring-kafka/docs/2.8.11/reference/html/#getting-started

2. kafka的鉴权协议

  • PLAINTEXT这是 Kafka 默认的协议,不提供任何认证机制。所有客户端都可以不经认证就连接到 Kafka 集群。 在生产环境中,强烈不建议使用这种协议,因为它不提供任何安全保障。
  • SSLSSL 提供了一种加密通信的方式,可以保护客户端和服务器之间的数据传输。 SSL 需要在 Kafka 代理(Broker)和客户端之间建立信任关系,通常是通过 SSL 证书来实现的。 SSL 可以单独使用,也可以与其他认证协议(如 SASL)结合使用,提供更高级别的安全性。
  • SASL/PLAINSASL/PLAIN 是一种基于用户名和密码的简单认证机制。 客户端需要提供有效的用户名和密码才能连接到 Kafka 集群。 SASL/PLAIN 通常与 SSL 协议结合使用,以防止明文传输用户名和密码。
  • SASL/SCRAMSASL/SCRAM 是一种更安全的基于挑战-响应的认证机制。 SASL/SCRAM 支持两种变体:SCRAM-SHA-256 和 SCRAM-SHA-512。 SCRAM 使用 HMAC 和随机数来对密码进行哈希计算,而不会在网络上传输明文密码。 SCRAM 通常也与 SSL 结合使用,以提供更高的安全性。
  • SASL/GSSAPI (Kerberos) SASL/GSSAPI 是基于 Kerberos 的认证协议。 Kerberos 是一种广泛使用的网络认证协议,提供了集中式的认证和授权服务。 Kerberos 使用票据(Ticket)系统来验证客户端身份,而不是直接传输密码。 Kerberos 认证通常被认为是企业级环境中最安全的认证方式之一。
  • SASL/OAUTHBEARER SASL/OAUTHBEARER 是一种使用 OAuth 2.0 访问令牌(Access Token)进行认证的协议。 OAuth 2.0 是一种广泛使用的授权框架,常用于授权第三方应用访问用户数据。 SASL/OAUTHBEARER 需要客户端获取有效的 OAuth 2.0 访问令牌,并在连接 Kafka 时提供该令牌进行认证。

3. 连接 开放 SASL/PLAIN的kafka

kafka的server端开放了 SASL_PLAINTEXT 认证 同时使用了 SCRAM-SHA-256

yaml配置

spring:application:name: xxxx
  kafka:bootstrap-servers: 192.168.xxxx:9092,192.168.xxx:9092,192.168.xxx:9092security:#指定安全协议protocol: SASL_PLAINTEXT
    properties:# SCRAM 加密方法 256还是512sasl.mechanism: SCRAM-SHA-256# username  password 为在server端新建的用户 ## 注意 最后边有个;  sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";

    producer:value-serializer: org.apache.kafka.common.serialization.StringSerializer

配置好后 在使用 测试类进行验证
会出现如下log 就是配置成功了

2024-03-2115:59:20.306INFO[recommend,,]84510---[           main]o.a.k.c.t.i.KafkaMetricsCollector: initializing Kafka metrics collector
2024-03-2115:59:20.338INFO[recommend,,]84510---[           main]o.a.k.clients.producer.KafkaProducer:[Producer clientId=producer-1]Instantiated an idempotent producer.2024-03-2115:59:20.372INFO[recommend,,]84510---[           main]o.a.k.c.s.authenticator.AbstractLogin:Successfully logged in.2024-03-2115:59:20.400INFO[recommend,,]84510---[           main]o.a.kafka.common.utils.AppInfoParser:Kafka version:3.7.02024-03-2115:59:20.400INFO[recommend,,]84510---[           main]o.a.kafka.common.utils.AppInfoParser:Kafka commitId: xxxxx524ed625438c5
2024-03-2115:59:20.401INFO[recommend,,]84510---[           main]o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:1711007960400

the end !!!

good day !!!

标签: spring boot kafka sasl

本文转载自: https://blog.csdn.net/a15835774652/article/details/136912701
版权归原作者 寂夜了无痕 所有, 如有侵权,请联系我们删除。

“Springboot 集成kafka 以及连接 带有SASL/PLAIN 的kafka”的评论:

还没有评论