一、创建Maven项目
引入依赖
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>
二、异步发送
publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//异步发送 不带回调函数
kafkaProducer.send(newProducerRecord<>("first","kafka "+ i));}//关闭资源
kafkaProducer.close();}
三、回调函数
回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//异步发送带回调函数
kafkaProducer.send(newProducerRecord<>("first","kafka "+ i),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception e){if(e==null){//没用一次输出,输出信息到控制台System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));}else{
e.printStackTrace();}}});//延迟一会,数据会发送到不同分区,发送太快则可能会到同个分区 16K 0msThread.sleep(2);}//关闭资源
kafkaProducer.close();}}
四、同步发送
只需在异步发送的基础上,再调用一下 get()方法即可
publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//同步发送
kafkaProducer.send(newProducerRecord<>("first","kafka "+ i)).get();}//关闭资源
kafkaProducer.close();}
本文转载自: https://blog.csdn.net/weixin_43205308/article/details/131417292
版权归原作者 Long long ago. 所有, 如有侵权,请联系我们删除。
版权归原作者 Long long ago. 所有, 如有侵权,请联系我们删除。