0


kafka安全机制(SASL_SCRAM)

在 Kafka 中,SASL 是一种重要的安全协议,用于提供基于身份验证的访问控制。Kafka 使用 SASL 来支持各种身份验证机制,如:

  • PLAIN(基于用户名和密码的认证)
  • GSSAPI(基于 Kerberos 的认证)
  • SCRAM(Salted Challenge Response Authentication Mechanism)
  • OAUTHBEARER

具体信息可以参考官网:kafka安全机制官网-2.7
这里采用SCRAM用于kafka安全机制的实现,而不是采用其他方式实现,主要有如下原因:

  1. SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是为 Kerberos 使用,如果当前已有 Kerberos 认证,只需要为集群中每个 Broker 和访问用户申请 Principle ,然后在 Kafka 配置文件中开启 Kerberos 的支持即可,一般用于大型公司。
  2. SASL/PLAIN - starting at version 0.10.0.0。一个简单的用户名和密码身份认证机制,通常与 TLS/SSL 一起用于加密,以实现身份验证。是一种比较容易使用的方式,但是也有一个很明显的缺点,这种方式会把用户账户文件配置到静态文件中,每次想要添加新的账户都需要重启 Kafka 去加载静态文件,才能生效,十分不方便
  3. SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通过将认证信息保存在 ZooKeeper 里面,从而动态的获取用户信息,相当于把 ZK 用作一个认证中心使用。这种认证可以在使用过程中,使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群,十分方便。
  4. SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新认证机制,主要是为了实现与 OAuth2 框架的集成,Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全 Json Web Token,必须配以 SSL 加密才能在生产环境中使用。

一、环境搭建

1.1 环境准备

  • jdk1.8
  • apache-zookeeper-3.5.9-bin
  • kafka_2.12-2.7.1

安装顺序:jdk–>zookeeper–>kafka

小知识:kafka版本命名约定
kafka: 这部分指的是 Apache Kafka,一个开源的分布式事件流平台。Kafka 提供了一种可靠的、可扩展的发布-订阅消息系统,可以处理大规模的实时数据流。
2.12: 这表示 Scala 的版本。在 Kafka 的情况下,2.12意味着它是使用 Scala 2.12 编译的。Scala 是一种运行在 Java 虚拟机上的多范式编程语言,被用于 Kafka 的实现。
2.7.1: 这是 Kafka 的版本号。在这个例子中,版本号是 2.7.1。版本号通常表示软件的发布版本,新版本通常包含新功能、改进和修复之前版本的 bug。

  • 安装文件解压之后,需要将kafka的libs目录下的如下jar赋值到zookeeper的lib目录下:
cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib
  • 每个kafka对应的zookeeper版本不一样,建议先下载想要的kafka版本,解压之后,查看libs下依赖的zookeeper版本,然后去官网下载对应的版本进行安装!!!

1.2 JDK的安装与配置

Kafka从2.0.0版本开始就不再支持JDK7及以下版本

  1. 到官网下载jdk安装包,并上传至Linux的/opt目录下
  2. 解压安装包
  3. 配置jdk环境变量,修改/etc/profile文件并向其中添加如下配置
exportJAVA_HOME=/opt/jdk解压后的文件名
exportJRE_HOME=$JAVA_HOME/jre
exportPATH=$PATH:$JAVA_HOME/bin
exportCLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
  1. 生效配置文件,source/etc/profile命令使配置生效
source /etc/profile
  1. 通过java-version命令验证JDK 是否已经安装配置成功
java-version

1.3 Zookeeper的安装与配置

ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。

  • ZooKeeper是一个开源的分布式协调服务,是Google Chubby的一个开源实现。
  • 分布式应用程序可以基于ZooKeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、配置维护等功能。
  • 在ZooKeeper中共有3个角色:leader、follower和observer,同一时刻 ZooKeeper集群中只会有一个leader,其他的都是follower和observer。
  • observer不参与投票,默认情况下 ZooKeeper 中只有 leader 和 follower 两个角色。更多相关知识可以查阅ZooKeeper官方网站来获得。

1.3.1 单机模式安装

  1. 到官网下载zookeeper安装包,并上传至Linux的/opt目录下
  2. 解压压缩包
  3. 添加配置,向/etc/profile配置文件中添加如下内容
exportZOOKEEPER_HOME=/opt/zookeeper解压后的文件名
exportPATH=$PATH:$ZOOKEEPER_HOME/bin
  1. 执行source/etc/profile命令使配置生效
source /etc/profile
  1. 修改 ZooKeeper 的配置文件。首先进入$ZOOKEEPER_HOME/conf 目录,并将zoo_sample.cfg文件修改为zoo.cfg
  2. 修改zoo.cfg配置文件,zoo.cfg文件的内容参考如下
# ZooKeeper服务器心跳时间,单位为mstickTime=2000# 投票选举新leader的初始化时间initLimit=10# leader与follower心跳检测最大客忍时间,响应超过syncLimit*tickTime,leader认为# fo11ower“死掉”,从服务器列表中别除fol1owersyncLimit=5# 数据目录dataDir=/tmp/zookeeper/data
# 日志目录dataLogDir=/tmp/zookeeper/log
# ZooKeeper对外服务端口clientPort=2181
  1. 默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录
mkdir-p /tmp/zookeeper/data
mkdir-p /tmp/zookeeper/log
  1. 在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号
  2. 通过zkServer.sh start启动Zookeeper服务
zkServer.sh start
  1. 通过zkServer.sh status查看启动状态
zkServer.sh status

1.3.2 集群模式安装

以上是关于ZooKeeper单机模式的安装与配置,一般在生产环境中使用的都是集群模式,集群模式的配置也比较简单,相比单机模式而言只需要修改一些配置即可。下面以3台机器为例来配置一个ZooKeeper集群。首先在这3台机器的**/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,示例如下(3个IP地址分别对应3台机器)
image.png
然后在这3台机器的
zoo.cfg**文件中添加以下配置:
image.png

  • 为了便于讲解上面的配置,这里抽象出一个公式,即 server.A=B:C:D。其中: - A是一个数字,代表服务器的编号,就是前面所说的myid文件里面的值。集群中每台服务器的编号都必须唯一,所以要保证每台服务器中的myid文件中的值不同。- B代表服务器的IP地址。- C表示服务器与集群中的 leader 服务器交换信息的端口。- D表示选举时服务器相互通信的端口

1.3.3 Zookeeper安全认证配置

zookeeper和kafka在默认情况下,是没有开启安全认证的,那么任意客户端可以在不需要任何身份认证的情况下访问zookeeper和kafka下的各节点,甚至可以进行节点的增加,修改以及删除的动作。注意,前面的动作是基于客户端能访问服务端所在的网络,如果进行了物理隔绝或者做了防火墙限制,那前述内容就不一定成立。但是,在某些对安全加固要求比较严格的客户或者生产环境中,那就必须开启安全认证才行。除了最基本的身份认证以外,还有针对每个节点的权限访问,但本文不涉及该话题。
进入正题,先从zookeeper开始配置,zookeeper官网提供了认证配置的参考,点击下方官网地址,即可查看详情。配置分两种情况:

  1. 客户端和服务端的双向认证(3.4.0开始引入)
  2. 服务端与服务端的双向认证(2.4.10开始引入)

如果是非集群模式下,仅配置客户端和服务端的双向认证即可。集群模式下,则需要客户端和服务端的认证以及zookeeper服务器之间的双向认证。
Zookeeper 使用的是Java自带的认证和授权服务(简称:JAAS),详细内容请看官网,该链接是 Java 8 的 JAAS 的介绍。这里为zookeeper和kafka分别在对应配置文件下创建jass配置文件为(文件名可以随意):

  • zookeeper:${ZOOKEEPER_HOME}/conf/zoo_jaas.conf
  • kafka:${KAFKA_HOME}/config/kafka-server-jaas.conf

注意:本节中的客户端指的kafka,服务端指的是zookeeper

1.3.3.1 客户端和服务端的双向认证
  1. 配置zookeeper服务端
  • 在zoo_jaas.conf添加如下配置
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="zookeeper"
  password="zookeepersecret”
  user_kafka="kafkasecret";};
  • 修改zoo.cfg配置
# 强制进行SASL认证
sessionRequireClientSASLAuth=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  • 增加jvm参数,在${ZOOKEEPER_HOME}/bin/zkEnv.sh脚本中增加:
exportSERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"
  1. 配置客户端
  • 在kafka-server-jaas.conf中添加如下配置:
Client{
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="kafka"
  password="kafkasecret";};
  • 修改客户端的启动脚本${KAFKA_HOME}/bin/kafka-server-start.sh,增加jvm参数:
exec$base_dir/kafka-run-class.sh $EXTRA_ARGS-Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"
1.3.3.2 服务端与服务端的双向认证
  1. 修改zoo.cfg,增加如下配置:
quorum.auth.enableSasl=true # 打开sasl开关, 默认是关的
quorum.auth.learnerRequireSasl=true # ZK做为leaner的时候, 会发送认证信息
quorum.auth.serverRequireSasl=true # 设置为true的时候,learner连接的时候需要发送认证信息,否则拒绝
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=20 # 建议设置成ZK节点的数量乘2
  1. 修改zoo_jaas.conf,增加如下配置:
QuorumServer {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_test="test";};

QuorumLearner {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="test"
  password="test";};

QuorumServer 和 QuorumLearner 都是配置的ZK节点之间的认证配置

1.4 Kafka的安装与配置

1.4.1 单机模式安装

  1. 到官网下载kafka安装包,并上传至Linux的/opt目录下
  2. 解压压缩包
  3. 修改broker的配置文件**$KAFKA_HOME/conf/server.properties**
# broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同broker.id=0# broker对外提供的服务入口地址listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,为了方便演示,我们假设Kafka和ZooKeeper都安装在本机zookeeper.connect=localhost:2181/kafka

1.4.2 集群模式安装

如果是单机模式,那么修改完上述配置参数之后就可以启动服务。如果是集群模式,那么只需要对单机模式的配置文件做相应的修改即可:确保集群中每个broker的broker.id配置参数的值不一样,以及listeners配置参数也需要修改为与broker对应的IP地址或域名,之后就可以各自启动服务。注意,在启动 Kafka 服务之前同样需要确保 zookeeper.connect参数所配置的ZooKeeper服务已经正确启动。

1.4.3 Kafka安全认证配置

  1. 通过kafka-configs.sh脚本生成一个用户admin作为超级管理员
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin

注意:在配置kafka的server.properties对于zookeeper的连接我们采用的是zookeeper的CHROOT,所以上述命令中也需要指定对应路径,不然启动的kafka时会获取不到生成的SCRAM认证信息!!!

小知识:
ZooKeeper 中的 CHROOT 是指将 ZooKeeper 的命名空间限定在一个特定的路径下。这就是说,ZooKeeper 的所有数据和操作都将在指定的路径下进行,而不是整个 ZooKeeper 服务器上。CHROOT 功能允许在一个 ZooKeeper 集群上运行多个独立的 ZooKeeper 实例,每个实例都有自己的命名空间。
在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通过配置项 chroot 来设置。例如:
chroot=/myapp
在这个例子中,ZooKeeper 就会将其根路径设置为 /myapp,而不是默认的根路径。这样,对于 ZooKeeper 中的所有路径,都将以 /myapp 为根进行解释。这就好比把 ZooKeeper 变成了一个容器,其内部的所有路径都相对于 /myapp 这个容器。
CHROOT 的使用场景包括:

  1. 隔离命名空间: 允许多个应用在同一个 ZooKeeper 集群上使用不同的命名空间,防止彼此之间的命名冲突。
  2. 模拟多个独立环境: 允许在同一个 ZooKeeper 集群上模拟多个独立的环境,每个环境有自己的数据和配置。

要注意的是,如果你在使用 CHROOT,ZooKeeper 客户端在连接到 ZooKeeper 服务器时,也需要指定相应的 CHROOT 路径。例如,如果 CHROOT 设置为 /myapp,那么客户端在连接时需要指定 “/myapp” 作为根路径。

总的来说,CHROOT 提供了一种简单而有效的方式,使得在同一个 ZooKeeper 集群上可以支持多个隔离的命名空间。

  • 生成之后可以通过如下命令进行查看:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin
  • 也可以添加其他SCRAM认证信息,例如SCRAM-SHA-512:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin
  • 也可使用如下命令删除已经添加的认证信息:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
  1. 修改kafka-server-jaas.conf,增加kafka服务的SCRAM认证用户信息
KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin"
  user_admin="admin";};
  1. 修改server.properties,新增如下配置:
# 启用ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置本例中admin为超级用户;在Zookeeper的“/kafka/config/users”下存在用户
super.users=User:admin
# 同时启用SCRAM和PLAIN机制
sasl.enabled.mechanisms=SCRAM-SHA-256
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.64.102:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092

如果是集群,上述配置每个节点都应该配置一份!!!

1.4.4 服务启动

  1. 启动Kafka服务,在$KAFKA_HOME目录下执行下面的命令即可
bin/kafka-server-start.sh config/server.properties

如果要在后台运行Kafka服务,那么可以在启动命令中加入-daemon参数或&字符,示例如下:

bin/kafka-server-start.sh -daemon config/server.properties

或者

bin/kafka-server-start.sh config/server.properties &
  1. 通过jps命令查看Kafka服务进程是否已经启动

二、Admin API使用

2.1 SCRAM用户操作

packagecom.kafka.adminclient;importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.KafkaFuture;importjava.util.Collections;importjava.util.Map;/**
 * @Author: Jiangxx
 * @Date: 2023/11/24
 * @Description:
 */publicclassKafkaUserOperator{privatefinalAdminClient adminClient;publicKafkaUserOperator(AdminClient adminClient){this.adminClient = adminClient;}publicbooleancreateScramUser(String username,String password){boolean res =false;//指定一个协议ScramMechanism,迭代次数iterations还没搞清楚干嘛的,设置太小会报错ScramCredentialInfo scramCredentialInfo =newScramCredentialInfo(ScramMechanism.SCRAM_SHA_256,10000);//创建Scram用户凭证,用户不存在,会先创建用户UserScramCredentialAlteration userScramCredentialUpsertion =newUserScramCredentialUpsertion(username, scramCredentialInfo, password);AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion));for(Map.Entry<String,KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                future.get();}catch(Exception exc){System.err.println("返回信息:"+ exc.getMessage());}
            res =!future.isCompletedExceptionally();}return res;}publicbooleandeleteScramUser(String username){boolean res =false;//删除Scram用户凭证,删除后用户无权限操作kafka,zk中用户节点还会存在UserScramCredentialAlteration userScramCredentialDeletion =newUserScramCredentialDeletion(username,ScramMechanism.SCRAM_SHA_256);AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion));for(Map.Entry<String,KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                future.get();}catch(Exception exc){System.err.println("返回信息:"+ exc.getMessage());}
            res =!future.isCompletedExceptionally();}return res;}}

2.2 主题操作

packagecom.kafka.adminclient;importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.KafkaFuture;importorg.apache.kafka.common.config.ConfigResource;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;/**
 * @Author: Jiangxx
 * @Date: 2023/11/24
 * @Description:
 */publicclassKafkaTopicOperator{privatefinalLogger logger =LoggerFactory.getLogger(KafkaTopicOperator.class);privatefinalAdminClient adminClient;publicKafkaTopicOperator(AdminClient adminClient){this.adminClient = adminClient;}/**
     * 创建系统对应的topic
     *
     * @param topicName         主题名称
     * @param partitions        分区
     * @param replicationFactor 副本
     * @param retention         数据有效期
     * @return boolean
     */publicbooleancreateTopic(String topicName,Integer partitions,Integer replicationFactor,Integer retention){boolean res =false;Set<String> topics =getTopicList();if(!topics.contains(topicName)){
            partitions = partitions ==null?1: partitions;
            replicationFactor = replicationFactor ==null?1: replicationFactor;NewTopic topic =newNewTopic(topicName, partitions, replicationFactor.shortValue());long param = retention *24*60*60*1000;Map<String,String> configs =newHashMap<>();
            configs.put("retention.ms",String.valueOf(param));
            topic.configs(configs);CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic));for(Map.Entry<String,KafkaFuture<Void>> e : createTopicsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                    future.get();}catch(Exception exc){
                    logger.warn("创建topic参数异常,返回信息:{}", exc.getMessage());}
                res =!future.isCompletedExceptionally();}}else{
            res =true;
            logger.warn("该主题已存在,主题名称:{}", topicName);}return res;}/**
     * 修改topic数据有效期
     *
     * @param topicName 主题名称
     * @param retention 天数
     * @return boolean
     */publicbooleanupdateTopic(String topicName,Integer retention){if(retention <0){returnfalse;}boolean res =false;Map<ConfigResource,Config> alertConfigs =newHashMap<>();ConfigResource configResource =newConfigResource(ConfigResource.Type.TOPIC, topicName);//转换为毫秒long param = retention *24*60*60*1000;ConfigEntry configEntry =newConfigEntry("retention.ms",String.valueOf(param));Config config =newConfig(Collections.singletonList(configEntry));
        alertConfigs.put(configResource, config);AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs);for(Map.Entry<ConfigResource,KafkaFuture<Void>> e : alterConfigsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                future.get();}catch(Exception exc){
                logger.warn("修改topic参数异常,返回信息:{}", exc.getMessage());}
            res =!future.isCompletedExceptionally();}return res;}/**
     * 删除topic
     *
     * @param topicName 主题
     * @return boolean
     */publicbooleandeleteTopic(String topicName){boolean res =false;Set<String> topics =getTopicList();if(topics.contains(topicName)){DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));for(Map.Entry<String,KafkaFuture<Void>> e : deleteTopicsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                    future.get();}catch(Exception exc){
                    logger.warn("删除topic参数异常,返回信息:{}", exc.getMessage());}
                res =!future.isCompletedExceptionally();}}else{
            logger.info("topic不存在,名称:{}", topicName);
            res =true;}return res;}/**
     * 获取主题列表
     *
     * @return Set
     */publicSet<String>getTopicList(){Set<String> result =null;ListTopicsResult listTopicsResult = adminClient.listTopics();try{
            result = listTopicsResult.names().get();}catch(Exception e){
            logger.warn("获取主题列表失败,失败原因:{}", e.getMessage());
            e.printStackTrace();}return result;}}

2.3 ACL操作

packagecom.kafka.adminclient;importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.admin.CreateAclsResult;importorg.apache.kafka.clients.admin.DeleteAclsResult;importorg.apache.kafka.common.KafkaFuture;importorg.apache.kafka.common.acl.*;importorg.apache.kafka.common.resource.PatternType;importorg.apache.kafka.common.resource.ResourcePattern;importorg.apache.kafka.common.resource.ResourceType;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Collections;importjava.util.Map;/**
 * @Author: Jiangxx
 * @Date: 2023/11/24
 * @Description:
 */publicclassAclOperator{privatefinalLogger logger =LoggerFactory.getLogger(AclOperator.class);privatefinalAdminClient adminClient;publicAclOperator(AdminClient adminClient){this.adminClient = adminClient;}/**
     * 添加权限
     *
     * @param resourceType 资源类型
     * @param resourceName 资源名称
     * @param username     用户名
     * @param operation    权限名称
     */publicvoidaddAclAuth(String resourceType,String resourceName,String username,String operation){ResourcePattern resource =newResourcePattern(getResourceType(resourceType), resourceName,PatternType.LITERAL);AccessControlEntry accessControlEntry =newAccessControlEntry("User:"+ username,"*",getOperation(operation),AclPermissionType.ALLOW);AclBinding aclBinding =newAclBinding(resource, accessControlEntry);CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding));for(Map.Entry<AclBinding,KafkaFuture<Void>> e : createAclsResult.values().entrySet()){KafkaFuture<Void> future = e.getValue();try{
                future.get();boolean success =!future.isCompletedExceptionally();if(success){
                    logger.info("创建权限成功");}}catch(Exception exc){
                logger.warn("创建权限失败,错误信息:{}", exc.getMessage());
                exc.printStackTrace();}}}/**
     * 删除权限
     *
     * @param resourceType 资源类型
     * @param resourceName 资源名称
     * @param username     用户名
     * @param operation    权限名称
     */publicvoiddeleteACLAuth(String resourceType,String resourceName,String username,String operation){ResourcePattern resource =newResourcePattern(getResourceType(resourceType), resourceName,PatternType.LITERAL);AccessControlEntry accessControlEntry =newAccessControlEntry("User:"+ username,"*",getOperation(operation),AclPermissionType.ALLOW);AclBinding aclBinding =newAclBinding(resource, accessControlEntry);DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter()));for(Map.Entry<AclBindingFilter,KafkaFuture<DeleteAclsResult.FilterResults>> e : deleteAclsResult.values().entrySet()){KafkaFuture<DeleteAclsResult.FilterResults> future = e.getValue();try{
                future.get();boolean success =!future.isCompletedExceptionally();if(success){
                    logger.info("删除权限成功");}}catch(Exception exc){
                logger.warn("删除权限失败,错误信息:{}", exc.getMessage());
                exc.printStackTrace();}}}privateAclOperationgetOperation(String operation){AclOperation aclOperation =null;switch(operation){case"CREATE":
                aclOperation =AclOperation.CREATE;break;case"WRITE":
                aclOperation =AclOperation.WRITE;break;case"READ":
                aclOperation =AclOperation.READ;break;default:break;}return aclOperation;}privateResourceTypegetResourceType(String type){ResourceType resourceType =null;switch(type){case"Group":
                resourceType =ResourceType.GROUP;break;case"Topic":
                resourceType =ResourceType.TOPIC;break;default:break;}return resourceType;}}

三、参考链接

  • kafka、zookeeper配置sasl认证-CSDN博客
  • Zookeeper & Kafka 开启安全认证的配置_kafka认证配置_IT布道的博客-CSDN博客
  • Kafka安全(以SASL+ACL为例)_kafka 安全-CSDN博客
  • Kafka安全认证授权配置_kafka认证配置-CSDN博客
  • Java版 Kafka ACL使用实战_java kafka acl_芒果无忧的博客-CSDN博客
  • kafka官网
标签: kafka 安全

本文转载自: https://blog.csdn.net/qq_44062110/article/details/136527818
版权归原作者 学编程的荔枝壳 所有, 如有侵权,请联系我们删除。

“kafka安全机制(SASL_SCRAM)”的评论:

还没有评论