0


JAVA连接Kafka及SSL认证

JAVA连接Kafka

1、Maven驱动(注意一定要对应自己的Kafka版本)

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.5.0</version></dependency>

2、生产者生产数据

2.1 普通方式创建Producer

packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String topic ="my-topic";//连接地址
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","all");//所有副本写入该消息才算成功
        props.put("retries",0);//retries=MAX 无限尝试
        props.put("batch.size",16384);//默认批量处理消息字节数
        props.put("linger.ms",1);//延时1ms发送
        props.put("buffer.memory",33554432);//缓冲区大小
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
                recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
                e.printStackTrace();}}
        producer.flush();
        producer.close();}}

2.2 ssl加密和认证创建Producer(Plain)

packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String user ="admin";String password ="admin";String topic ="my-topic";//连接地址
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","all");//所有副本写入该消息才算成功
        props.put("retries",0);//retries=MAX 无限尝试
        props.put("batch.size",16384);//默认批量处理消息字节数
        props.put("linger.ms",1);//延时1ms发送
        props.put("buffer.memory",33554432);//缓冲区大小
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化//ssl加密和认证
        properties.put("security.protocol","SASL_PLAINTEXT");
        properties.put("sasl.mechanism","PLAIN");
        properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
                recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
                e.printStackTrace();}}
        producer.flush();
        producer.close();}}

2.3 ssl加密和认证创建Producer(Plain使用配置文件方式)

kafka_client_jaas_plain配置文件信息:

KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";};

具体代码实现:

packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");Properties props =newProperties();String topic ="my-topic";//连接地址
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","all");//所有副本写入该消息才算成功
        props.put("retries",0);//retries=MAX 无限尝试
        props.put("batch.size",16384);//默认批量处理消息字节数
        props.put("linger.ms",1);//延时1ms发送
        props.put("buffer.memory",33554432);//缓冲区大小
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
                recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
                e.printStackTrace();}}
        producer.flush();
        producer.close();}}

2.4 ssl加密和认证创建Producer(Scram)

packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String user ="admin";String password ="admin";String topic ="my-topic";//连接地址
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","all");//所有副本写入该消息才算成功
        props.put("retries",0);//retries=MAX 无限尝试
        props.put("batch.size",16384);//默认批量处理消息字节数
        props.put("linger.ms",1);//延时1ms发送
        props.put("buffer.memory",33554432);//缓冲区大小
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化//ssl加密和认证
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","SCRAM-SHA-512");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
                recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
                e.printStackTrace();}}
        producer.flush();
        producer.close();}}

2.5 ssl加密和认证创建Producer(Scram使用配置文件方式)

kafka_client_jaas_scram配置文件信息:

KafkaClient{org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";};

具体代码实现:

packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");Properties props =newProperties();String topic ="my-topic";//连接地址
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","all");//所有副本写入该消息才算成功
        props.put("retries",0);//retries=MAX 无限尝试
        props.put("batch.size",16384);//默认批量处理消息字节数
        props.put("linger.ms",1);//延时1ms发送
        props.put("buffer.memory",33554432);//缓冲区大小
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
                recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
                e.printStackTrace();}}
        producer.flush();
        producer.close();}}

3、消费者消费数据

3.1 普通方式创建消费者

packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("group.id", groupId);//是否自动提交偏移量
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.offset.reset", offsetReset);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");//序列化方式
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}

3.2 ssl加密和认证创建Consumer(Plain)

packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String user ="admin";String password ="admin";String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("group.id", groupId);//是否自动提交偏移量
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.offset.reset", offsetReset);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");//序列化方式
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//ssl加密和认证
        properties.put("security.protocol","SASL_PLAINTEXT");
        properties.put("sasl.mechanism","PLAIN");
        properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}

3.3 ssl加密和认证创建Producer(Plain使用配置文件方式)

kafka_client_jaas_plain配置文件信息:

KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";};

具体代码实现:

packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("group.id", groupId);//是否自动提交偏移量
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.offset.reset", offsetReset);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");//序列化方式
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}

3.4 ssl加密和认证创建Producer(Scram)

packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String user ="admin";String password ="admin";String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("group.id", groupId);//是否自动提交偏移量
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.offset.reset", offsetReset);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");//序列化方式
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//ssl加密和认证
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","SCRAM-SHA-512");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='"+user+"' password='"+password+"';");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}

3.5 ssl加密和认证创建Producer(Scram使用配置文件方式)

kafka_client_jaas_scram配置文件信息:

KafkaClient{org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";};

具体代码实现:

packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
 * @author: 
 * @version: v1.0
 * @description:
 * @date: 
 **/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("group.id", groupId);//是否自动提交偏移量
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.offset.reset", offsetReset);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");//序列化方式
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
标签: kafka java ssl

本文转载自: https://blog.csdn.net/zhi_yin/article/details/129547766
版权归原作者 小小白帝 所有, 如有侵权,请联系我们删除。

“JAVA连接Kafka及SSL认证”的评论:

还没有评论