0


Kafka 实战 - Kafka生产者之消息发送流程及同步异步发送API

Apache Kafka 生产者(Producer)负责将消息发送到 Kafka 集群中的指定 Topic。消息发送流程涉及以下几个关键步骤,同时提供同步和异步两种发送 API 供开发者选择,以适应不同场景的需求。

消息发送流程

  1. 创建生产者实例: 使用 KafkaProducer 类的构造函数创建生产者实例,传入配置参数(如 Bootstrap Servers、Serializer、Acknowledgment 策略等)。Properties props =newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer",StringSerializer.class);props.put("value.serializer",StringSerializer.class);KafkaProducer<String,String> producer =newKafkaProducer<>(props);
  2. 构建 ProducerRecord: 创建 ProducerRecord 对象,指定要发送的消息所属的 Topic、分区(可选)、键(可选)、值。ProducerRecord<String,String> record =newProducerRecord<>("my-topic","key","value");
  3. 发送消息: 调用 KafkaProducersend() 方法发送消息。此方法返回一个 Future<RecordMetadata>,表示异步发送操作的结果。Future<RecordMetadata> sendResult = producer.send(record);- 同步发送:若需等待消息发送完成并获取发送结果,可以调用 get() 方法阻塞等待。try{RecordMetadata metadata = sendResult.get();System.out.println("Message sent to partition "+ metadata.partition()+" with offset "+ metadata.offset());}catch(Exception e){// Handle exception}- 异步发送:若不需立即等待结果,可在后续逻辑中通过回调或轮询 Future 对象获取发送结果。sendResult.whenComplete((metadata, exception)->{if(exception !=null){// Handle exception}else{System.out.println("Message sent to partition "+ metadata.partition()+" with offset "+ metadata.offset());}});
  4. 关闭生产者: 当不再发送消息时,调用 close() 方法关闭生产者,释放资源。producer.close();

同步与异步发送的区别

  • 同步发送:- 阻塞:调用 send() 后立即调用 get(),主线程会阻塞直到消息发送完成或超时。- 强一致性:确保消息已发送到 Kafka 并得到确认,除非抛出异常。- 低吞吐量:由于每次发送都需要等待确认,不适合高吞吐量场景。
  • 异步发送:- 非阻塞:调用 send() 后立即返回 Future,主线程继续执行其他任务。- 弱一致性:消息发送结果通过回调或异步查询 Future 得到,可能存在消息未发送成功的情况。- 高吞吐量:无需等待单个消息的确认,可并发发送大量消息,提升整体发送速率。

最佳实践

  • 选择发送模式:- 对于需要严格保证消息发送成功的场景(如交易通知、重要日志记录),选择同步发送。- 对于容忍一定消息丢失且追求高吞吐量的场景(如实时分析、日志收集),选择异步发送。
  • 错误处理:- 异步发送时,通过 FuturewhenComplete()handle() 方法设置回调处理发送结果,对异常情况进行妥善处理。- 同步发送时,捕获 get() 方法抛出的异常,记录并采取适当措施(如重试、发送至死信队列等)。
  • 批量发送:- 利用 KafkaProducersend() 方法批量发送多个 ProducerRecord,提高网络利用率和整体性能。

通过理解 Kafka 生产者的消息发送流程以及同步与异步发送 API 的使用,开发者可以根据实际业务需求选择合适的发送模式,有效利用 Kafka 实现消息的高效、可靠传输。

标签: kafka 分布式

本文转载自: https://blog.csdn.net/qq_33240556/article/details/137629373
版权归原作者 用心去追梦 所有, 如有侵权,请联系我们删除。

“Kafka 实战 - Kafka生产者之消息发送流程及同步异步发送API”的评论:

还没有评论