在使用
kafkaTemplate.sendDefault(message).addCallback
时,你可以通过
addCallback
方法来处理发送消息后的成功和失败回调。
importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.Callback;importorg.springframework.kafka.support.SendResult;publicclassKafkaProducer{privatefinalKafkaTemplate<String,String> kafkaTemplate;publicKafkaProducer(KafkaTemplate<String,String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}publicvoidsendMessage(String message){
kafkaTemplate.sendDefault(message).addCallback(newCallback(){@OverridepublicvoidonSuccess(SendResult<String,String> result){// **处理成功的逻辑**System.out.println("Message sent successfully: "+ result.getProducerRecord().value());}@OverridepublicvoidonFailure(org.apache.kafka.clients.producer.ProducerRecord<String,String> producerRecord,Exception ex){// **处理失败的逻辑**System.err.println("Message failed to send: "+ ex.getMessage());}});}}
关键点:
- 成功回调:在
onSuccess
方法中,你可以处理消息成功发送后的逻辑。 - 失败回调:在
onFailure
方法中,你可以处理消息发送失败的情况。
@KafkaListener
和
kafkaTemplate.sendDefault(message).addCallback
是 Kafka 中用于不同目的的两个概念,具体区别如下:
1. 功能目的
@KafkaListener
:- 用于消费消息。它是一个注解,用于标记一个方法,使其能够自动接收来自指定主题的消息。kafkaTemplate.sendDefault(message).addCallback
:- 用于发送消息。它是 KafkaTemplate 的一个方法,用于将消息发送到 Kafka 主题,并提供成功和失败的回调处理。
2. 使用场景
@KafkaListener
:- 当你需要处理来自 Kafka 主题的消息时,使用@KafkaListener
注解的方法会被自动调用。kafkaTemplate.sendDefault(message).addCallback
:- 当你需要将消息发送到 Kafka 主题时,使用kafkaTemplate
发送消息,并可以通过回调处理发送结果。
3. 示例代码
@KafkaListener
示例:importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaConsumer{@KafkaListener(topics ="your_topic", groupId ="your_group_id")publicvoidlisten(String message){// **处理接收到的消息**System.out.println("Received message: "+ message);}}
kafkaTemplate.sendDefault
示例:kafkaTemplate.sendDefault("your_topic", message).addCallback(newCallback(){@OverridepublicvoidonSuccess(SendResult<String,String> result){// **处理成功的逻辑**}@OverridepublicvoidonFailure(ProducerRecord<String,String> producerRecord,Exception ex){// **处理失败的逻辑**}});
总结
@KafkaListener
是用于消费消息的,而kafkaTemplate.sendDefault
是用于发送消息的。
在
kafkaTemplate.sendDefault(message).addCallback
的成功回调中,包含的信息主要是
SendResult
对象。这个对象提供了关于发送消息的详细信息,包括:
- ProducerRecord:发送的消息记录。
- RecordMetadata:关于消息的元数据,例如主题、分区、偏移量等。
示例代码
以下是一个示例,展示了如何在成功回调中使用这些信息:
importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.Callback;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaProducer{privatefinalKafkaTemplate<String,String> kafkaTemplate;publicKafkaProducer(KafkaTemplate<String,String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}publicvoidsendMessage(String message){
kafkaTemplate.sendDefault(message).addCallback(newCallback(){@OverridepublicvoidonSuccess(SendResult<String,String> result){// **获取发送的消息记录**String sentMessage = result.getProducerRecord().value();// **获取元数据**String topic = result.getRecordMetadata().topic();int partition = result.getRecordMetadata().partition();long offset = result.getRecordMetadata().offset();// **处理成功的逻辑**System.out.printf("Message sent successfully: %s, Topic: %s, Partition: %d, Offset: %d%n",
sentMessage, topic, partition, offset);}@OverridepublicvoidonFailure(org.apache.kafka.clients.producer.ProducerRecord<String,String> producerRecord,Exception ex){// **处理失败的逻辑**System.err.println("Message failed to send: "+ ex.getMessage());}});}}
关键点
- **
result.getProducerRecord().value()
**:获取发送的消息内容。 - **
result.getRecordMetadata().topic()
**:获取消息发送到的主题。 - **
result.getRecordMetadata().partition()
**:获取消息发送到的分区。 - **
result.getRecordMetadata().offset()
**:获取消息在分区中的偏移量。
总结
在成功回调中,你可以获取到关于发送消息的详细信息,这些信息对于后续的处理和日志记录非常有用。
版权归原作者 你这个代码我看不懂 所有, 如有侵权,请联系我们删除。