0


大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)

文章目录

一、概述

Kafka0.9.0开始引入丰富的安全认证机制,实现基础安全用户认证,将kafka上云或进行多租户管理的必要步骤安全,目前kafka支持

SASL

SSL

Delegation Token

这三种认证机制。而

SASL

认证又分为了以下几种方式:

1)SASL认证概述

  • SASL/GSSAPI:kerberos认证方式,一般使用随机密码的keytab认证方式,密码是加密的,也是企业里使用最多的认证方式,在0.9版本引入;
  • SASL/PLAIN:这种方式其实就是一个账号/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,密码明文等等!这些特性决定了它比较鸡肋,但好处是足够简单,这使得我们可以方便地对它进行二次开发,在0.10版本引入;
  • SASL/SCRAM针对SASL/PLAIN方式的不足而提供的另一种认证方式。这种方式的用户名/密码是存储中zookeeper的,因此能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些,在0.10.2版本引入;
  • SASL/OAUTHBEARER:是基于OAuth 2.0的认证框架,实现较为复杂,目前业内应该较少使用,在2.0版本引入。

kafka和zookeeper的

SASL/GSSAPI

SASL/PLAIN

认证实现,可以参考我以下几篇文章:
Kerberos认证原理与环境部署
大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)
大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)

2)Delegation Token认证概述

Delegation Token

:基于Delegation Token的认证是一种轻量级的认证机制,使用

Delegation Token

Broker和客户端在作认证的时候,能够直接使用这个token,不是每次都去KDC获取对应的ticket或传输Keystore文件,是对SASL认证机制的补充, 1.1.0引入。企业里主要用的多的还是

SASL/GSSAPI

SASL/PLAIN

这两种,所以这里不讲

Delegation Token

认证怎么配置,感兴趣的小伙伴可以去试试,对应管理脚本

kafka-delegation-tokens.sh

3)SSL认证概述(本章实现)

0.9版本以前kafka是没有用户认证模块的,从0.9开始引入安全认证模块,基于SSL的认证主要是指Broker和客户端的双路认证,Broker和客户端互相认对方的证书,其实就是基于证书认证的。

二、各种安全认证机制对比和使用场景

认证机制引入版本使用场景SSL0.9适用于一般测试场景SASL/GSSAPI0.9适用于已经实现Kerberos认证的场景,【常用】SASL/PLAIN0.10.2使用于中小型公司的kafka集群SASL/SCRAM0.10.2使用于中小型公司的kafka集群,支持用户的动态增减SASL/OAUTHBEARER2.0适用于支持OAuth 2.0认证框架场景Delegation Token1.1适用于Kerberos认证中出现TGT分发性能瓶颈的场景

三、Kafka SSL认证实现

Kafka允许客户端使用SSL来连接,默认情况下,SSL是禁止的,但是可以通过手动开启。安装Kafka SSL的流程如下所示:
在这里插入图片描述

1)创建ssl证书

1、创建存储目录

$ mkdir$KAFKA_HOME/config/certificates

2、创建KeyStore密钥库

【温馨提示】Keytool是一个Java数据证书的管理工具 ,Keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中,Keystore可以简单理解为一个存放应用签名的文件。我们可以称它为秘钥库。在keystore里,包含两种数据:

  • **密钥实体(Key entity)**——密钥(secret key)又或者是私钥和配对公钥(采用非对称加密)
  • **可信任的证书实体(trusted certificate entries)**——只包含公钥

keytool 工具参数详解:

-genkey 在用户主目录中创建一个默认文件”.keystore”,还会产生一个mykey的别名,mykey中包含用户的公钥、私钥和证书(在没有指定生成位置的情况下,keystore会存在用户系统默认目录)
-alias 产生别名 每个keystore都关联这一个独一无二的alias,这个alias通常不区分大小写
-keystore 指定密钥库的名称(产生的各类信息将不在.keystore文件中)
-keyalg 指定密钥的算法 (如 RSA DSA,默认值为:DSA)
-validity 指定创建的证书有效期多少天(默认 90)
-keysize 指定密钥长度 (默认 1024)
-storepass 指定密钥库的密码(获取keystore信息所需的密码)
-keypass 指定别名条目的密码(私钥的密码)
-dname 指定证书发行者信息 其中: “CN=名字与姓氏,OU=组织单位名称,O=组织名称,L=城市或区域名 称,ST=州或省份名称,C=单位的两字母国家代码”
-list 显示密钥库中的证书信息 keytool -list -v -keystore 指定keystore -storepass 密码
-v 显示密钥库中的证书详细信息
-export 将别名指定的证书导出到文件 keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码
-file 参数指定导出到文件的文件名
-delete 删除密钥库中某条目 keytool -delete -alias 指定需删除的别 -keystore 指定keystore – storepass 密码
-printcert 查看导出的证书信息 keytool -printcert -file g:ssomichael.crt
-keypasswd 修改密钥库中指定条目口令 keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage
-storepasswd 修改keystore口令 keytool -storepasswd -keystore g:ssomichael.keystore(需修改口令的keystore) -storepass pwdold(原始密码) -new pwdnew(新密码)
-import 将已签名数字证书导入密钥库 keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书

dname选项详解:

[CN]
What is your first and last name? 这里输入域名 如 www.ssltest.com   或 子域名 mail.ssltest.com  或通配符域名 *.ssltest.com

[OU]
What is the name of your organizational unit?  – 购买或管理证书的部门,可以直接回车跳过 或输入 NA

[O]
What is the name of your organization?  – 输入公司名称, 如果没有公司名称,可以输入 NA

[L]
What is the name of your City or Locality?  输入所在的城市

[ST]
What is the name of your State or Province?  输入所在的省市,或自治区

[C]
What is the two-letter country code for this unit? (Country) – 输入国家代码,中国填写CN

配置/etc/hosts

192.168.0.113 hadoop-node1.ssltest.com
192.168.0.114 hadoop-node2.ssltest.com
192.168.0.115 hadoop-node3.ssltest.com

开始创建

$ keytool -keystore $KAFKA_HOME/config/certificates/kafka.keystore -alias kafka -validity 365 -genkey -keyalg RSA -storepass 123456 -keypass 123456 -dname "CN=*.ssltest.com, OU=kafka, O=kafka, L=shenzhen, ST=guangdong, C=CN"# 查看KeyStore内容
$  keytool -list -v -keystore $KAFKA_HOME/config/certificates/kafka.keystore -storepass 123456

3、创建CA(Certificate Authority:认证机构)

CA是负责签发证书、认证证书、管理已颁发证书的机构,为了保证整个证书的安全性,所以需要使用CA进行证书的签名保证。

$ openssl req -new -x509 -keyout $KAFKA_HOME/config/certificates/ca-key -out $KAFKA_HOME/config/certificates/ca-cert -days 365 -passin pass:123456 -passout pass:123456 -subj "/C=CN/ST=guangdong/L=shenzhen/O=kafka/CN=*.ssltest.com"

4、将CA导入到TrustStore中

keystore 与 truststore 区别

  • keystore是存储密钥(公钥、私钥)的容器。
  • keystoretruststore其本质都是keystore。只不过二者存放的密钥所有者不同而已。本质都是相同的文件,只不过约定通过文件名称区分类型以及用途
  • 对于keystore一般存储自己的私钥和公钥,而truststore则用来存储自己信任的对象的公钥。
$ keytool -keystore "$KAFKA_HOME/config/certificates/kafka.truststore" -alias CARoot -import -file "$KAFKA_HOME/config/certificates/ca-cert" -storepass "123456" -keypass "123456" -noprompt

# 不加-noprompt,会提示确认。

5、导出证书

$ keytool -keystore $KAFKA_HOME/config/certificates/kafka.keystore -alias kafka -certreq -file $KAFKA_HOME/config/certificates/kafka-cert -storepass "123456" -keypass "123456"

6、给证书签名

$ openssl x509 -req -CA $KAFKA_HOME/config/certificates/ca-cert -CAkey $KAFKA_HOME/config/certificates/ca-key -in $KAFKA_HOME/config/certificates/kafka-cert -out $KAFKA_HOME/config/certificates/kafka-cert-signed -days 365 -CAcreateserial -passin pass:"123456"

7、导入CA到KeyStore

$ keytool -keystore $KAFKA_HOME/config/certificates/kafka.keystore -alias CARoot -import -file $KAFKA_HOME/config/certificates/ca-cert -storepass "123456" -keypass "12345" -noprompt

8、导入证书到KeyStore

$ keytool -keystore $KAFKA_HOME/config/certificates/kafka.keystore -alias kafka -import -file $KAFKA_HOME/config/certificates/kafka-cert-signed -storepass "123456" -keypass "123456"

最后给一个完整版脚本生成所有的证书文件

$ vi$KAFKA_HOME/bin/create-ssl-certificates.sh
#! /bin/bashKAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
CERT_OUTPUT_PATH="$KAFKA_HOME/config/certificates"PASSWORD=123456KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore"TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore"KEY_PASSWORD=$PASSWORDSTORE_PASSWORD=$PASSWORDTRUST_STORE_PASSWORD=$PASSWORDTRUST_KEY_PASS=$PASSWORDCLUSTER_NAME=kafka
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert"DAYS_VALID=365D_NAME="CN=*.ssltest.com, OU=kafka, O=kafka, L=shenzhen, ST=guangdong, C=CN"# 【第一步】创建存储目录mkdir -p $CERT_OUTPUT_PATH# 【第二步】创建KeyStore密钥库
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -dname "$D_NAME"# 【第三步】创建CA(Certificate Authority:认证机构)
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out $CERT_AUTH_FILE -days $DAYS_VALID -passin pass:123456 -passout pass:123456 -subj "/C=CN/ST=guangdong/L=shenzhen/O=kafka/CN=*.ssltest.com"# 【第四步】将CA导入到TrustStore中
keytool -keystore "$TRUST_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

#【第五步】导出证书
keytool -keystore $CERT_OUTPUT_PATH/kafka.keystore -alias $CLUSTER_NAME -certreq -file $CERT_OUTPUT_PATH/kafka-cert -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD"#【第六步】给证书签名
openssl x509 -req -CA $CERT_AUTH_FILE -CAkey $CERT_OUTPUT_PATH/ca-key -in $CLUSTER_CERT_FILE -out ${CLUSTER_CERT_FILE}-signed -days $DAYS_VALID -CAcreateserial -passin pass:"$PASSWORD"#【第七步】导入CA到KeyStore
keytool -keystore $KEY_STORE -alias CARoot -import -file $CERT_AUTH_FILE -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

#【第八步】导入证书到KeyStore
keytool -keystore $KEY_STORE -alias kafka -import -file "${CLUSTER_CERT_FILE}-signed" -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD"

2)配置kafka

1、配置server.properties

# copy一份配置文件
$ cp$KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-ssl.properties

配置如下:

listeners=SSL://0.0.0.0:19092
advertised.listeners=SSL://haoop-node1:19092
ssl.keystore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.keystore
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.truststore
ssl.truststore.password=123456# ssl.client.auth = none (“required”=>客户端身份验证是必需的,“requested”=>客户端身份验证请求,客户端没有证书仍然可以连接。使用“requested”是纸老虎,因为它提供了一种虚假的安全感,错误的配置客户端仍将连接成功。
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
# 这里关闭主机名的https验证,如果是域名且支持https可以开启# ssl.endpoint.identification.algorithm=HTTPS# 默认是PLAINTEXT
security.inter.broker.protocol=SSL

2、配置jaas

这里zookeeper选择Kerberos认证,如果不选择安全认证,可以忽略。

$ cat>$KAFKA_HOME/config/certificates/zkcli-jaas.conf<<EOF
// Zookeeper client authentication
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/zk-kerberos/zkcli.keytab"
    principal="[email protected]";
};
EOF

3、配置启动脚本kafka-server-start.sh

$ cd$KAFKA_HOME
$ cp bin/kafka-server-start.sh bin/kafka-server-start-zkcli-kerberos-ssl.sh

$ vi bin/kafka-server-start-zkcli-kerberos-ssl.sh
# 添加如下配置:exportKAFKA_OPTS="-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username=zookeeper -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/zkcli-jaas.conf"

4、将配置和证书copy到其它节点

$ scp$KAFKA_HOME/config/server-ssl.properties hadoop-node2:$KAFKA_HOME/config/
$ scp$KAFKA_HOME/config/server-ssl.properties hadoop-node3:$KAFKA_HOME/config/

$ scp -r $KAFKA_HOME/config/certificates hadoop-node2:$KAFKA_HOME/config/
$ scp -r $KAFKA_HOME/config/certificates hadoop-node3:$KAFKA_HOME/config/

$ scp -r $KAFKA_HOME/bin/kafka-server-start-zkcli-kerberos-ssl.sh hadoop-node2:$KAFKA_HOME/bin/
$ scp -r $KAFKA_HOME/bin/kafka-server-start-zkcli-kerberos-ssl.sh hadoop-node3:$KAFKA_HOME/bin/

修改broker.id和advertised.listeners

$ vi$KAFKA_HOME/config/server-ssl.properties

5、启动服务

# 先启动zookeeper服务,这里选择Kerberos认证
$ cd$KAFKA_HOME
$ ./bin/zookeeper-server-start-kerberos.sh -daemon ./config/zookeeper-kerberos.properties
$ ./bin/zookeeper-shell-kerberos.sh hadoop-node1:12181

# 启动kafka
$ ./bin/kafka-server-start-zkcli-kerberos-ssl.sh -daemon ./config/server-ssl.properties

6、启动kafka客户端测试验证

配置客户端

$ cd$KAFKA_HOME# 这里启用了客户端验证,所以选择以下这段配置
$ cat> config/client-ssl.properties<<EOF
security.protocol=SSL
ssl.truststore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.truststore
ssl.truststore.password=123456
ssl.keystore.password=123456
ssl.keystore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.keystore
EOF

开始测试

$ cd$KAFKA_HOME# 【温馨提示】证书是跟域名绑定的,所以这里的主机或域名一定要跟上面绑定的域名一致,一般多台机器用通配符配置,上面有解释# 查看topic 列表
$ ./bin/kafka-topics.sh --list --bootstrap-server hadoop-node1.ssltest.com:19092 --command-config ./config/client-ssl.properties

在这里插入图片描述

  • 创建topic
$ cd$KAFKA_HOME
$ ./bin/kafka-topics.sh --create --topic test-ssl --bootstrap-server hadoop-node1.ssltest.com:19092  --partitions 3 --replication-factor 3 --config retention.ms=259200000 --command-config ./config/client-ssl.properties
  • 生产消费
$ cd$KAFKA_HOME# 生产
$ ./bin/kafka-console-producer.sh --bootstrap-server hadoop-node1.ssltest.com:19092 --topic test-ssl --producer.config ./config/client-ssl.properties

{"id":"1","name":"n1","age":"20"}{"id":"2","name":"n2","age":"21"}{"id":"3","name":"n3","age":"22"}# 消费
$ ./bin/kafka-console-consumer.sh --bootstrap-server hadoop-node1.ssltest.com:19092 --topic test-ssl --from-beginning --consumer.config ./config/client-ssl.properties

#消费组
$ ./bin/kafka-console-consumer.sh --bootstrap-server hadoop-node1.ssltest.com:19092 --topic test-ssl --from-beginning --group test-ssl --consumer.config ./config/client-ssl.properties

# 查看消费组积压
$ ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop-node1.ssltest.com:19092 --describe --group test-ssl --command-config ./config/client-ssl.properties

在这里插入图片描述
Kafka安全机制介绍和Kafka SSL认证实现就到这里了,有疑问的小伙伴欢迎给我留言哦,后续会持续更新更优质的大数据相关的文章,请小伙伴耐心等待~

标签: kafka ssl zookeeper

本文转载自: https://blog.csdn.net/qq_35745940/article/details/125232809
版权归原作者 大数据老司机 所有, 如有侵权,请联系我们删除。

“大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)”的评论:

还没有评论