一、概述
1、Kafka的权限分类
1)、身份认证(Authentication):对client 与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。
2)、权限控制(Authorization):实现对于消息级别的权限控制,clients的读写操作进行Authorization:(生产/消费/group)数据权限。
2、实现方式
自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。
1)SASL验证:
验证方式
Kafka版本
特点
SASL/PLAIN
0.10.0.0
不能动态增加用户
SASL/SCRAM
0.10.2.0
可以动态增加用户
SASL/Kerberos
0.9.0.0
需要独立部署验证服务
SASL/OAUTHBEARER
2.0.0
需自己实现接口实现token的创建和验证,需要额外Oauth服务
2)SSL加密: 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据。
二、docker创建带SASL认证的kafka
1.准备文件kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx"
user_admin="xx"
user_alice="xxx";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx";
};
2.准备文件zk_server_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx"
user_admin="xxx";
- 放入secrets文件夹
4.准备文件docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
hostname: zookeeper
container_name: zookeeper
restart: always
ports:
- 2182:2182
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_MAXCLIENTCNXNS: 0
ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
ZOOKEEPER_JAASLOGINRENEW: 3600000
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
volumes:
- ./secrets:/etc/kafka/secrets
kafka:
image: confluentinc/cp-kafka:5.1.2
hostname: broker
container_name: kafka
restart: always
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2182/kafka'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://xxx.xxx.xxx.xxx:9092
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- ./secrets:/etc/kafka/secrets
上述xxx.xxx.xxx.xxx改成自己的IP地址
5.执行docker-compose -f docker-compose.yaml up -d
- 写python代码验证
import time
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
def producer_event(server_info):
producer = KafkaProducer(bootstrap_servers=server_info,
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='xx',
sasl_plain_password='xxx')
topic = "first.kafka.test"
print("kafka连接成功")
for i in range(7200):
data = {
"name":"hello world"
}
data_json = json.dumps(data)
producer.send(topic, data_json.encode()).get(timeout=30)
print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
time.sleep(1)
producer.close()
server="127.0.0.1:9092"
producer_event(server)
7.写springboot kafka初始化代码验证
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 一次调用poll()操作时返回的最大记录数, 默认500
//props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if(!"xx".equals(protocol)) {
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xxx";");
}
return props;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() { ;
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(Boolean.TRUE);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(150000);
return factory;
}
}
版权归原作者 hay_lee 所有, 如有侵权,请联系我们删除。