JAVA连接Kafka
1、Maven驱动(注意一定要对应自己的Kafka版本)
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.5.0</version></dependency>
2、生产者生产数据
2.1 普通方式创建Producer
packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String topic ="my-topic";//连接地址
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("acks","all");//所有副本写入该消息才算成功
props.put("retries",0);//retries=MAX 无限尝试
props.put("batch.size",16384);//默认批量处理消息字节数
props.put("linger.ms",1);//延时1ms发送
props.put("buffer.memory",33554432);//缓冲区大小
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
e.printStackTrace();}}
producer.flush();
producer.close();}}
2.2 ssl加密和认证创建Producer(Plain)
packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String user ="admin";String password ="admin";String topic ="my-topic";//连接地址
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("acks","all");//所有副本写入该消息才算成功
props.put("retries",0);//retries=MAX 无限尝试
props.put("batch.size",16384);//默认批量处理消息字节数
props.put("linger.ms",1);//延时1ms发送
props.put("buffer.memory",33554432);//缓冲区大小
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化//ssl加密和认证
properties.put("security.protocol","SASL_PLAINTEXT");
properties.put("sasl.mechanism","PLAIN");
properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
e.printStackTrace();}}
producer.flush();
producer.close();}}
2.3 ssl加密和认证创建Producer(Plain使用配置文件方式)
kafka_client_jaas_plain配置文件信息:
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";};
具体代码实现:
packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");Properties props =newProperties();String topic ="my-topic";//连接地址
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("acks","all");//所有副本写入该消息才算成功
props.put("retries",0);//retries=MAX 无限尝试
props.put("batch.size",16384);//默认批量处理消息字节数
props.put("linger.ms",1);//延时1ms发送
props.put("buffer.memory",33554432);//缓冲区大小
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
e.printStackTrace();}}
producer.flush();
producer.close();}}
2.4 ssl加密和认证创建Producer(Scram)
packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){Properties props =newProperties();String user ="admin";String password ="admin";String topic ="my-topic";//连接地址
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("acks","all");//所有副本写入该消息才算成功
props.put("retries",0);//retries=MAX 无限尝试
props.put("batch.size",16384);//默认批量处理消息字节数
props.put("linger.ms",1);//延时1ms发送
props.put("buffer.memory",33554432);//缓冲区大小
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化//ssl加密和认证
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","SCRAM-SHA-512");
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
e.printStackTrace();}}
producer.flush();
producer.close();}}
2.5 ssl加密和认证创建Producer(Scram使用配置文件方式)
kafka_client_jaas_scram配置文件信息:
KafkaClient{org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";};
具体代码实现:
packagecom.tyhh.test;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Properties;importjava.util.concurrent.Future;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaProducerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");Properties props =newProperties();String topic ="my-topic";//连接地址
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("acks","all");//所有副本写入该消息才算成功
props.put("retries",0);//retries=MAX 无限尝试
props.put("batch.size",16384);//默认批量处理消息字节数
props.put("linger.ms",1);//延时1ms发送
props.put("buffer.memory",33554432);//缓冲区大小
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//序列化KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){ProducerRecord<String,String>record=newProducerRecord<String,String>(topic,"topic_key_"+ i,"topic_value_"+ i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata =null;try{
recordMetadata = metadataFuture.get();System.out.println("发送成功!");System.out.println("topic:"+ recordMetadata.topic());System.out.println("partition:"+ recordMetadata.partition());System.out.println("offset:"+ recordMetadata.offset());}catch(Exception e){System.out.println("发送失败!");
e.printStackTrace();}}
producer.flush();
producer.close();}}
3、消费者消费数据
3.1 普通方式创建消费者
packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
3.2 ssl加密和认证创建Consumer(Plain)
packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String user ="admin";String password ="admin";String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//ssl加密和认证
properties.put("security.protocol","SASL_PLAINTEXT");
properties.put("sasl.mechanism","PLAIN");
properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule "+"required username=\""+ user +"\" password=\""+ password +"\";");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
3.3 ssl加密和认证创建Producer(Plain使用配置文件方式)
kafka_client_jaas_plain配置文件信息:
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";};
具体代码实现:
packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
3.4 ssl加密和认证创建Producer(Scram)
packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){String user ="admin";String password ="admin";String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//ssl加密和认证
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","SCRAM-SHA-512");
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='"+user+"' password='"+password+"';");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
3.5 ssl加密和认证创建Producer(Scram使用配置文件方式)
kafka_client_jaas_scram配置文件信息:
KafkaClient{org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";};
具体代码实现:
packagecom.tyhh.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Collections;importjava.util.Properties;/**
* @author:
* @version: v1.0
* @description:
* @date:
**/publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){//使用配置文件方式进行ssl认证System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");String topic ="my-topic";String groupId ="my-group";String autoCommit ="true";String offsetReset ="earliest";Properties props =newProperties();
props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(1000L);for(ConsumerRecord<String,String>record: records){System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n",record.partition(),record.offset(),record.key(),record.value());}}}}
本文转载自: https://blog.csdn.net/zhi_yin/article/details/129547766
版权归原作者 小小白帝 所有, 如有侵权,请联系我们删除。
版权归原作者 小小白帝 所有, 如有侵权,请联系我们删除。