前言:
前面我们对 Kafka 有了一个基本的认识,完成了 Spring Boot 项目集成 Kafka,并实现了消息发送和消费,本篇来分享一下 Kafka 的同步、异步消息发送。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 发送消息的三种方式
- 发送即忘:我们前面 Spring Boot 集成 Kafka 的案例中就是使用的这种模式,Kafka 消息生产者只管发送消息,不会等待任何确认,发送完成后就马上发送下一条消息,这种发送方式的特点就是效率高,因为它不需要等待服务器的响应,但也有缺点就是消息可靠性得不到保证,消息发送失败生产者也不会得到通知,无法进行重试或者容错处理。
- 同步发送:在同步发送模式下,Kafka 生产者发送完消息后会阻塞等待 Kafka 服务器的响应,生产者收到 Kafka 服务器的响应后才会进行下一步操作,同步发送的方式大大提高了消息的可靠性,但是也会因此损失一些性能。
- 异步发送:异步发送结合了以上两种发送模式的优点,在保证一定的消息可靠性的基础上也兼顾了性能,异步发送方式,生产者在完成消息发送后不会等待 Kafka 服务器的响应,而是继续去发送下一条消息,但是消息生产者会注册一个回调函数,在消息发送成功或者失败的时候,回调函数都会被调用,这样就可以针对不同的结果做出不同的业务处理,保障了业务的完整性。
Kafka 之消息同步/异步发送
Producer 代码演示
下面我们演示一下调用 API 实现 Kafka 消息的同步异步发送。
packagecom.order.service.kafka.producer;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Component;importorg.springframework.util.concurrent.ListenableFuture;importorg.springframework.util.concurrent.ListenableFutureCallback;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @ClassName: SyncKafkaProducer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description: 同步发送消息
*/@Slf4j@ComponentpublicclassSyncKafkaProducer{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;//同步发送消息publicvoidsendSyncMessage(String message){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());//同步发送消息try{
kafkaTemplate.send("my-topic", message).get();}catch(Exception e){
e.printStackTrace();}
log.info("同步完成消息发送,当前时间:{}", dateStr);}//异步发送消息publicvoidsendAsyncMessage(String message){try{//同步发送消息ListenableFuture<SendResult<String,String>> listenableFuture = kafkaTemplate.send("my-topic", message);
listenableFuture.addCallback(newListenableFutureCallback<SendResult<String,String>>(){@OverridepublicvoidonFailure(Throwable throwable){
log.error("异步消费发送失败");}@OverridepublicvoidonSuccess(SendResult<String,String> stringStringSendResult){
log.info("异步消息发送成功");}});
log.info("异步消息发送完毕");}catch(Exception e){
e.printStackTrace();}}}
根据上面生产者的代码我们可以看到同步发生的逻辑很简单,就是在发送之后调用了 get 方法完成了消息的同步发送,这是因为 send 方法的返回值是 ListenableFuture,该类继承了 Future,根据 Futrue 的特点,我们只需在调用 Future 对象的get方发即可完成阻塞,也就完成了同步发送消息的效果,send 方法源码如下:
//org.springframework.kafka.core.KafkaTemplate#send(java.lang.String, V)publicListenableFuture<SendResult<K,V>>send(String topic,@NullableV data){ProducerRecord<K,V> producerRecord =newProducerRecord(topic, data);returnthis.doSend(producerRecord);}
异步发送的代码要稍微复杂一点,我们在完成消息发送之后调用了一个回调函数 ListenableFutureCallback,该回调函数会在 Producer 收到 ACK 时候调用,根据发送的结果来调用不通的方法,
Consumer 代码演示
Consumer 的代码没有什么特殊之处,这里我们使用最简单的消费监听方式,代码如下:
packagecom.order.service.kafka.consumer;importlombok.extern.slf4j.Slf4j;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @ClassName: MyKafkaConsumer
* @Author: zhangyong
* @Date: 2024/10/22 19:22
* @Description:
*/@Slf4j@ComponentpublicclassMyKafkaConsumer{@KafkaListener(id ="my-kafka-consumer",
groupId ="my-kafka-consumer-groupId",
topics ="my-topic",
containerFactory ="myContainerFactory")publicvoidlisten(String message){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());
log.info("消息消费成功,当前时间:{},消息内容:{}", dateStr, message);}}
Kafka 之消息同步/异步发送结果验证
同步发送结果验证:
2024-10-28 19:43:08.967 INFO 13360 ---[-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer : 消息消费成功,当前时间:2024-10-28 19:43:08,消息内容:我是一条同步消息
2024-10-28 19:43:08.968 INFO 13360 ---[nio-8086-exec-2] c.o.s.kafka.producer.SyncKafkaProducer : 同步完成消息发送,当前时间:2024-10-28 19:43:08
可以看到消息被正确消费发送消费掉,结果符合预期。
异步发送结果验证:
2024-10-28 19:48:54.416 INFO 23216 ---[io-8086-exec-10] c.o.s.kafka.producer.SyncKafkaProducer : 异步消息发送完毕
2024-10-28 19:48:54.519 INFO 23216 ---[ad | producer-1] c.o.s.kafka.producer.SyncKafkaProducer : 异步消息发送成功
2024-10-28 19:48:54.519 INFO 23216 ---[-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer : 消息消费成功,当前时间:2024-10-28 19:48:54,消息内容:我是一条异步消息
我们注意看控制台打印的日志,日志先打印了消息发送完毕,后打印的消息发送成功,而我们代码中消息发送完毕的行数在消息发生成功后面,因此可以证明异步确实生效了,结果符合预期。
总结:本篇我们分享了 Kafka 的同步、异步消息的用法,在真实业务场景中我们需要结合业务场景来选择使用何种消息发送方式,在选择发送方式时,需要根据业务需求平衡性能和可靠性,如果消息的可靠性至关重要,可以选择同步发送或异步发送并妥善处理异常,如果性能是主要考虑因素,且可以容忍一定程度的消息丢失,可以选择发后即忘的方式,在使用异步发送的时候需要注意,需要确保回调函数的迅速执行,不要写太多的无关业务逻辑在回调函数中,避免阻塞 Kafka Producer 的主线程。
如有不正确的地方欢迎各位指出纠正。
版权归原作者 码农爱java 所有, 如有侵权,请联系我们删除。