0


Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

publicclassKafkaProducerCallbackTest{publicstaticvoidmain(String[] args)throwsInterruptedException{//创建producerHashMap<String,Object> config =newHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String,String> producer =newKafkaProducer<String,String>(config);for(int i =0; i <10; i++){//创建recordProducerRecord<String,String> record =newProducerRecord<String,String>("test2",""+i,"我是你爹"+i
            );//发送record
            producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producer
        producer.close();}}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

publicclassKafkaProducerCallbackTest{publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{//创建producerHashMap<String,Object> config =newHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String,String> producer =newKafkaProducer<String,String>(config);for(int i =0; i <10; i++){//创建recordProducerRecord<String,String> record =newProducerRecord<String,String>("test2",""+i,"我是你爹"+i
            );//发送recordFuture<RecordMetadata> send = producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");
            send.get();}//关闭producer
        producer.close();}}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/m0_51390969/article/details/140507023
版权归原作者 木子dn 所有, 如有侵权,请联系我们删除。

“Kafka Producer发送消息流程之消息异步发送和同步发送”的评论:

还没有评论