0


docker创建带SASL认证的kafka

一、概述

1、Kafka的权限分类

1)、身份认证(Authentication):对client 与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。

2)、权限控制(Authorization):实现对于消息级别的权限控制,clients的读写操作进行Authorization:(生产/消费/group)数据权限。

2、实现方式

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。

1)SASL验证:

验证方式

Kafka版本

特点

SASL/PLAIN

0.10.0.0

不能动态增加用户

SASL/SCRAM

0.10.2.0

可以动态增加用户

SASL/Kerberos

0.9.0.0

需要独立部署验证服务

SASL/OAUTHBEARER

2.0.0

需自己实现接口实现token的创建和验证,需要额外Oauth服务

2)SSL加密: 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据。

二、docker创建带SASL认证的kafka

1.准备文件kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx"
user_admin="xx"
user_alice="xxx";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx";
};

2.准备文件zk_server_jaas.conf


Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xx"
password="xxx"
user_admin="xxx";
  1. 放入secrets文件夹

4.准备文件docker-compose.yml

version: '2'
services:
    zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        hostname: zookeeper
        container_name: zookeeper
        restart: always
        ports:
            - 2182:2182
        environment:
            ZOOKEEPER_CLIENT_PORT: 2182
            ZOOKEEPER_TICK_TIME: 2000
            ZOOKEEPER_MAXCLIENTCNXNS: 0
            ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
            ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
            ZOOKEEPER_JAASLOGINRENEW: 3600000
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
        volumes:
            - ./secrets:/etc/kafka/secrets
    kafka:
        image: confluentinc/cp-kafka:5.1.2
        hostname: broker
        container_name: kafka
        restart: always
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2182/kafka'
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
            KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://xxx.xxx.xxx.xxx:9092
            KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
            KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
            KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
            KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
            KAFKA_SUPER_USERS: User:admin
        volumes:
            - ./secrets:/etc/kafka/secrets

上述xxx.xxx.xxx.xxx改成自己的IP地址

5.执行docker-compose -f docker-compose.yaml up -d

  1. 写python代码验证
import time
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
 
 
def producer_event(server_info):
    producer = KafkaProducer(bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='xx',
                             sasl_plain_password='xxx')
    topic = "first.kafka.test"
    print("kafka连接成功")
    for i in range(7200):
        data = {
            "name":"hello world"
        }
        data_json = json.dumps(data)
        producer.send(topic, data_json.encode()).get(timeout=30)
        print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
        time.sleep(1)
    producer.close()
 
 
server="127.0.0.1:9092"
producer_event(server)

7.写springboot kafka初始化代码验证

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

   private Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 一次调用poll()操作时返回的最大记录数, 默认500
        //props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        if(!"xx".equals(protocol)) {
            props.put("sasl.mechanism", "PLAIN");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xxx";");
        }

        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() { ;
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(Boolean.TRUE);
        factory.setConcurrency(1);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(150000);
        return factory;
    }

}
标签: kafka docker java

本文转载自: https://blog.csdn.net/yztezhl/article/details/127627854
版权归原作者 hay_lee 所有, 如有侵权,请联系我们删除。

“docker创建带SASL认证的kafka”的评论:

还没有评论