0


kafka入门,生产者异步发送、回调函数,同步发送(四)

一、创建Maven项目

引入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>

二、异步发送

publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//异步发送 不带回调函数
            kafkaProducer.send(newProducerRecord<>("first","kafka "+ i));}//关闭资源
        kafkaProducer.close();}

三、回调函数

回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//异步发送带回调函数
            kafkaProducer.send(newProducerRecord<>("first","kafka "+ i),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception e){if(e==null){//没用一次输出,输出信息到控制台System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));}else{
                        e.printStackTrace();}}});//延迟一会,数据会发送到不同分区,发送太快则可能会到同个分区 16K 0msThread.sleep(2);}//关闭资源
        kafkaProducer.close();}}

四、同步发送

只需在异步发送的基础上,再调用一下 get()方法即可

publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//1、创建kafka生产者的配置对象Properties properties=newProperties();//2、给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=newKafkaProducer<String,String>(properties);//4.调用send发送消息for(int i =0; i <100; i++){//同步发送
            kafkaProducer.send(newProducerRecord<>("first","kafka "+ i)).get();}//关闭资源
        kafkaProducer.close();}
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/weixin_43205308/article/details/131417292
版权归原作者 Long long ago. 所有, 如有侵权,请联系我们删除。

“kafka入门,生产者异步发送、回调函数,同步发送(四)”的评论:

还没有评论