0


go kafka 配置SASL认证及实现SASL PLAIN认证功能

用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL)。

本篇会介绍部署SASL/PLAIN认证功能的流程。最后再介绍对SASL/PLAIN功能进行二次开发。

kafka 2.x用户认证方式小结

需要先明确的一点是,用户认证和权限控制是两码事。用户认证是确认这个用户能否访问当前的系统,而权限控制是控制用户对当前系统中各种资源的访问权限。用户认证就是今天要讲的内容,而kafka的权限控制,则是对应

  1. bin/kafka-acls.sh

工具所提供的一系列功能,这里不详细展开。

标题特地说明kafka2.x是因为kafka2.0的时候推出一种新的用户认证方式,SASL/OAUTHBEARER,在此前的版本是不存在这个东西的。那么加上这个之后,kafka目前共有4种常见的认证方式。

  • SASL/GSSAPI(kerberos):kafka0.9版本推出,即借助kerberos实现用户认证,如果公司恰好有kerberos环境,那么用这个是比较合适的。
  • SASL/PLAIN:kafka0.10推出,非常简单,简单得有些鸡肋,不建议生产环境使用,除非对这个功能二次开发,这也是我后面要讲的。
  • SASL/SCRAM:kafka0.10推出,全名Salted Challenge Response Authentication Mechanism,为解决SASL/PLAIN的不足而生,缺点可能是某些客户端并不支持这种方式认证登陆(使用比较复杂)。
  • SASL/OAUTHBEARER:kafka2.0推出,实现较为复杂,目前业内应该较少实践。

其实除了上述四种用户认证功能之外,还有一个叫Delegation Token的东西。这个东西说一个轻量级的工具,是对现有SASL的一个补充,能够提高用户认证的性能(主要针对Kerberos的认证方式)。算是比较高级的用法,一般也用不到,所以也不会多介绍,有兴趣可以看这里Authentication using Delegation Tokens。

SASL/GSSAPI

如果已经有kerberos的环境,那么会比较适合使用这种方式,只需要让管理员分配好principal和对应的keytab,然后在配置中添加对应的选项就可以了。需要注意的是,一般采用这种方案的话,zookeeper也需要配置kerberos认证。

SASL/PLAIN

这种方式其实就是一个用户名/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,明文等等!这些特性决定了它比较鸡肋,但好处是足够简单,这使得我们可以方便地对它进行二次开发。本篇文章后续会介绍SASL/PLAIN的部署方式和二次开发的例子(基于kafka2.x)。

SASL/SCRAM

针对PLAIN方式的不足而提供的另一种认证方式。这种方式的用户名/密码是存储中zookeeper的,因此能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。

而且配置起来和SASL/PLAIN差不多同样简单,添加用户/密码的命令官网也有提供,个人比较推荐使用这种方式。不过有些客户端是不支持这个方式认证登陆的,比如python的kafka客户端,这点需要提前调研好。

具体的部署方法官网或网上有很多,这里不多介绍,贴下官网的Authentication using SASL/SCRAM。

SASL/OAUTHBEARER

SASL/OAUTHBEARER是基于OAUTH2.0的一个新的认证框架,这里先说下什么是OAUTH吧,引用维基百科。

OAuth是一个开放标准,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。而 OAUTH2.0算是OAUTH的一个加强版。

说白了,SASL/OAUTHBEARER就是一套让用户使用第三方认证工具认证的标准,通常是需要自己实现一些token认证和创建的接口,所以会比较繁琐。

详情可以通过这个kip了解KIP-255

说了这么多,接下来就说实战了,先介绍下如何配置SASL/PLAIN。

SASL/PLAIN实例(配置及客户端)

Kafka添加SASL_PLAIN安全认证

1,配置修改

2,添加kafka_server_jaas.conf文件 server端的认证文件,放置在/mnt/hdb/ops-ng/kafka/config/中。

KafkaServer {

  1. org.apache.kafka.common.security.plain.PlainLoginModule required
  2. username="admin"
  3. password="admin-secret"
  4. user_admin="admin-secret"
  5. user_alice="alice-secret";
  6. };

内容解释:配置文件命名为:kafka_server_jaas.conf,放置在/mnt/hdb/ops-ng/kafka/config/。

使用user_来定义多个用户,供客户端程序(生产者、消费者程序)认证使用,可以定义多个。

上例我定义了两个用户,一个是admin,一个是alice,等号后面是对应用户的密码(如user_admin定义了用户名为admin,密码为admin-secret的用户)。

官方说明:

大概意思是:username="admin"和password="admin-secret"是代理之间使用的用户名和密码,即多个kafka集群使用的用户名和密码,而user_userName则是连接端使用的用户名密码。

3,创建client认证文件kafka_client_jaas.conf,此文件是后面console的生产者和消费者使用,放置在/mnt/hdb/ops-ng/kafka/config/中。(可选,如果是程序是生产者或者消费者,可以不用配置)

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

  1. username="alice"
  2. password="alice-secret";

};

4,修改启动脚本

vi bin/kafka-server-start.sh

添加一行:export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_server_jaas.conf"

5,console 控制台生产、消费相关配置 (可选)

  • 修改/mnt/hdb/ops-ng/kafka/config/producer.properties,在配置最后加入以下两行内容:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

  • 修改/mnt/hdb/ops-ng/kafka/config/consumer.properties,要添加的内容和producer的内容一样:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

  • 添加kafka-console-producer.sh认证文件路径,后面启动生产者测试时使用:

[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-producer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"

控制台生产者命令:

bin/kafka-console-producer.sh --broker-list 192.168.1.20:9092 --topic read

--producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

  • 添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用:

[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-consumer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"

控制台消费者命令:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.203:9092 --topic test-topicaaa --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

一般能够发送数据就说明部署完成了~

代码实现SASL/PLAIN认证

  1. github.com/segmentio/kafka-go 消费数据 未指定组
  1. package kafka
  2. import (
  3. "context"
  4. "github.com/segmentio/kafka-go"
  5. "github.com/segmentio/kafka-go/sasl"
  6. "github.com/segmentio/kafka-go/sasl/plain"
  7. "github.com/segmentio/kafka-go/sasl/scram"
  8. ktesting "github.com/segmentio/kafka-go/testing"
  9. "github.com/wonderivan/logger"
  10. "testing"
  11. "time"
  12. )
  13. const (
  14. saslTestConnect = "172.19.1.103:9092" // connect to sasl listener
  15. saslTestTopic = "test-topic" // this topic is guaranteed to exist.
  16. username = "alice"
  17. password = "alice-secret"
  18. version = "0.10.2.0"
  19. )
  20. func Test_SASL_Read(t *testing.T) {
  21. //建立连接
  22. d := &kafka.Dialer{
  23. SASLMechanism: plain.Mechanism{
  24. Username: username,
  25. Password: password,
  26. },
  27. }
  28. //消费数据
  29. Readers := make([]*kafka.Reader, 0)
  30. for _, topic := range []string{saslTestTopic} {
  31. rc := kafka.ReaderConfig{
  32. Brokers: []string{saslTestConnect},
  33. Topic: topic,
  34. Partition: 0,
  35. MinBytes: 10e3, // 10KB
  36. MaxBytes: 10e6, // 10MB
  37. }
  38. rc.Dialer = d
  39. reader := kafka.NewReader(rc)
  40. // beginning设置 通过获取当前log值指定消费位置
  41. //if !consumer.beginning {
  42. // lag, _ := reader.ReadLag(context.Background())
  43. // reader.SetOffset(lag) //从当前部开始读取
  44. //}
  45. Readers = append(Readers, reader)
  46. }
  47. for {
  48. for _, reader := range Readers {
  49. m, err := reader.ReadMessage(context.Background())
  50. if err == nil {
  51. logger.Info(m.Value)
  52. } else {
  53. logger.Error("read kafka error:%v", err)
  54. //lag, _ := reader.ReadLag(context.Background())
  55. //logger.Error("read %d close:lag:%d", idx, lag)
  56. reader.Close()
  57. }
  58. }
  59. }
  60. }
  1. github.com/Shopify/sarama 实现组消费
  1. package kafka
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. cluster "github.com/bsm/sarama-cluster"
  6. "github.com/wonderivan/logger"
  7. "testing"
  8. "time"
  9. )
  10. func Test_SASL_Group(t *testing.T) {
  11. config := cluster.NewConfig()
  12. config.Group.Return.Notifications = true
  13. config.Net.ReadTimeout = 10 * time.Second //time.Millisecond
  14. config.Net.SASL.Enable = true
  15. config.Net.SASL.User = username
  16. config.Net.SASL.Password = password
  17. config.Net.SASL.Version = sarama.SASLHandshakeV1 //version//SASLHandshakeV1
  18. config.Consumer.Offsets.CommitInterval = 1 * time.Second
  19. config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始
  20. /*// beginning设置 通过获取当前log值指定消费位置
  21. if consumer.beginning {
  22. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  23. }*/
  24. c, err := cluster.NewConsumer([]string{saslTestConnect}, "filebeat01", []string{"testtopic"}, config)
  25. if err != nil {
  26. logger.Info("连接失败:\n %v", err)
  27. return
  28. } else {
  29. logger.Info("连接成功")
  30. }
  31. defer c.Close()
  32. //接受错误消息
  33. go func(c *cluster.Consumer) {
  34. errors := c.Errors()
  35. noti := c.Notifications()
  36. for {
  37. select {
  38. case <-errors:
  39. case <-noti:
  40. }
  41. }
  42. }(c)
  43. for m := range c.Messages() {
  44. fmt.Println("消费:", m.Value)
  45. c.MarkOffset(m, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
  46. }
  47. }
  1. github.com/segmentio/kafka-go 生产数据
  1. package channel
  2. import (
  3. "context"
  4. "fs.com/ezlogic/utils"
  5. "github.com/segmentio/kafka-go/sasl/plain"
  6. "sync"
  7. "time"
  8. "github.com/segmentio/kafka-go"
  9. "github.com/wonderivan/logger"
  10. )
  11. type KafkaProducer struct {
  12. writer *kafka.Writer
  13. }
  14. type Producers struct {
  15. Write sync.Map //make(map[string]*channel.KafkaProducer)
  16. }
  17. func NewProducer(topic, username, password string, brokers []string) *KafkaProducer {
  18. defer utils.DeferFunc("NewProducer", nil)
  19. var (
  20. conn *kafka.Conn
  21. dialer *kafka.Dialer
  22. err error
  23. )
  24. if username == "" {
  25. conn, err = kafka.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)
  26. } else {
  27. //sasl
  28. dialer = &kafka.Dialer{
  29. SASLMechanism: plain.Mechanism{
  30. Username: username,
  31. Password: password},
  32. }
  33. // 读topic,如果topic不存在,则创建topic,因此后续可以正常写
  34. conn, err = dialer.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)
  35. }
  36. if err != nil {
  37. logger.Painc("NewProducer err:", topic, brokers, "====", err)
  38. }
  39. conn.ReadPartitions(topic)
  40. conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
  41. conn.Close()
  42. kp := &KafkaProducer{
  43. writer: kafka.NewWriter(kafka.WriterConfig{
  44. Brokers: brokers,
  45. Dialer: dialer,
  46. Topic: topic,
  47. Balancer: &kafka.LeastBytes{},
  48. Async: true,
  49. //0613 提高写性能
  50. BatchTimeout: 100 * time.Millisecond,
  51. BatchSize: 10000,
  52. }),
  53. }
  54. return kp
  55. }
  56. func (producer *KafkaProducer) Write(msg []byte) {
  57. if err := producer.writer.WriteMessages(context.Background(), kafka.Message{
  58. Value: msg,
  59. }); err != nil {
  60. logger.Error("producer write error:%v", err)
  61. }
  62. }
标签: kafka golang linux

本文转载自: https://blog.csdn.net/weixin_45222119/article/details/126506244
版权归原作者 weixin_45222119 所有, 如有侵权,请联系我们删除。

“go kafka 配置SASL认证及实现SASL PLAIN认证功能”的评论:

还没有评论