0


Kafka配置用户名密码访问

1 软件版本

kafka_2.12-2.4.0.tgz(带zookeeper)

2 kafka服务端部署

2.1 将安装包上传到服务器,并解压

tar zxvf kafka_2.12-2.4.0.tgz -C /data

mv kafka_2.12-2.4.0 kafka

2.2 修改kafka配置文件 server.properties

vim /data/kafka/config/server.properties:

#############################ServerBasics##############################Theidofthebroker.Thismustbesettoauniqueintegerforeachbroker.broker.id=0host.name=10.7.2.201listeners=SASL_PLAINTEXT://10.7.2.201:9092advertised.listeners=SASL_PLAINTEXT://10.7.2.201:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAINauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizerallow.everyone.if.no.acl.found=true

2.3 添加SASL配置文件

在kafka安装目录下创建一个 kafka_server_jaas.conf 文件
vim /data/kafka/kafka_server_jaas.conf:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpasswd"
    user_admin="adminpasswd"
    user_producer="producerpwd"
    user_consumer="consumerpwd";};

说明:该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。

  • usemame和password指定该代理与集群其他代理初始化连接的用户名和密码
  • "user_"为前缀后接用户名方式创建连接代理的用户名和密码,例如,user_producer=“producerpwd” 是指用户名为producer,密码为producerpwd
  • username为admin的用户,和user为admin的用户,密码要保持一致,否则会认证失败
  • 上述配置中,创建了三个用户,分别为admin、producer和consumer(创建多少个用户,可根据业务需要配置,用户名和密码可自定义设置)

2.4 修改启动脚本

vim /data/kafka/bin/kafka-server-start.sh ,找到 export KAFKA_HEAP_OPTS , 添加jvm 参数为kafka_server_jaas.conf文件:

-Djava.security.auth.login.config=/data/kafka/kafka_server_jaas.conf

如下图:
在这里插入图片描述

2.5 启动kafka服务

# 先启动zookeeper/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties  

# 启动kafka/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties 

3 kafka客户端配置登录认证

3.1 安装kafka客户端

在/root 路径下新拷贝一个kafka安装包,解压后重命名,作为kafka客户端,客户端路径 /root/kafka

3.2 创建客户端认证配置文件

在 /root/kafka 路径下创建一个 kafka_client_jaas.conf 文件:
vim /root/kafka/kafka_client_jaas.conf:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="producer"
    password="producerpwd";};

说明: 这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了producer这个用户

3.3 添加kafka-console-producer.sh认证文件路径,后面启动生产者测试时使用

vim /root/kafka/bin/kafka-console-producer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

-Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf

如下图:
在这里插入图片描述

3.4 添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用

vim /root/kafka/bin/kafka-console-consumer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

-Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf

如下图:
在这里插入图片描述

4 用客户端连接

#开启生产者:/root/kafka/bin/kafka-console-producer.sh--broker-list10.7.2.201:9092--topictest--producer-propertysecurity.protocol=SASL_PLAINTEXT--producer-propertysasl.mechanism=PLAIN#开启消费者:/root/kafka/bin/kafka-console-consumer.sh--bootstrap-server10.7.2.201:9092--topictest--from-beginning--consumer-propertysecurity.protocol=SASL_PLAINTEXT--consumer-propertysasl.mechanism=PLAIN

在这里插入图片描述
生产者可正常生产数据,消费者能消费到数据

5 python连接kafka(python2环境)

5.1 安装kafka-python模块包

下载模块包:kafka-python-2.0.2.tar.gz,使用手册:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
解压后,执行以下命令进行安装:

pythonsetup.pyinstall

5.2 向kafka中写入数据

# -*- coding: utf-8 -*-from __future__ import absolute_import
from __future__ import print_function
from kafka import KafkaProducer
import json
import sys

reload(sys)
sys.setdefaultencoding('utf8')defproduceLog(topic_name, log):
    kafka_producer= KafkaProducer(
                                  sasl_mechanism="PLAIN",
                                  security_protocol='SASL_PLAINTEXT',
                                  sasl_plain_username="producer",
                                  sasl_plain_password="producerpwd",
                                  bootstrap_servers='10.7.2.201:9092')                                            
    kafka_producer.send(topic_name, log, partition=0)
    kafka_producer.flush()
    kafka_producer.close()for i inrange(10):
    sendlog_dict ={"name":"测试"}
    sendlog_dict['ID']= i
    sendlog_srt = json.dumps(sendlog_dict, ensure_ascii=False)
    produceLog("pythontest", sendlog_srt)

执行后,在kafka中查看 pythontest 主题中的数据,如下图:
在这里插入图片描述
数据写入成功

5.3 消费数据

# -*- coding: utf-8 -*-from __future__ import absolute_import
from __future__ import print_function
from kafka import KafkaConsumer
from kafka import TopicPartition
import sys

reload(sys)
sys.setdefaultencoding('utf8')

consumer= KafkaConsumer('pythontest',# 消息主题
                        group_id="test",
                        client_id="python",
                        sasl_mechanism="PLAIN",
                        security_protocol='SASL_PLAINTEXT',
                        sasl_plain_username="producer",
                        sasl_plain_password="producerpwd",
                        bootstrap_servers='10.7.2.201:9092',
                        auto_offset_reset='earliest'# 从头开始消费)print("主题的分区信息:{}".format(consumer.partitions_for_topic('pythontest')))print("主题列表:{}".format(consumer.topics()))print("当前消费者订阅的主题:{}".format(consumer.subscription()))print("当前消费者topic分区信息:{}".format(consumer.assignment()))print("当前消费者可消费的偏移量:{}".format(consumer.beginning_offsets(consumer.assignment())))for msg in consumer:print(msg.value)

运行结果:
在这里插入图片描述

标签: kafka

本文转载自: https://blog.csdn.net/d1240673769/article/details/124042854
版权归原作者 Jepson2017 所有, 如有侵权,请联系我们删除。

“Kafka配置用户名密码访问”的评论:

还没有评论