0


Java Kafka生产者实现


在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~
  • 专栏导航- Python系列: Python面试题合集,剑指大厂- Git系列: Git操作技巧- GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者- 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等- 运维系列: 总结好用的命令,高效开发- 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨💖The Start💖点点关注,收藏不迷路💖#### 📒文章目录

下面是一个可以连接多个节点的Kafka生产者类,并且在其它文件中调用生产者发送消息的示例代码。代码包含了Kafka连接失败和发送消息失败的异常处理。

首先,确保你已经导入了Kafka的依赖。如果你使用的是Maven,可以在

pom.xml

文件中添加以下依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>

接下来是Kafka生产者类的实现:

importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.clients.producer.Callback;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassKafkaProducerExample{privateKafkaProducer<String,String> producer;publicKafkaProducerExample(String bootstrapServers){Properties props =newProperties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());try{
            producer =newKafkaProducer<>(props);}catch(Exception e){System.err.println("Failed to create Kafka producer: "+ e.getMessage());thrownewRuntimeException(e);}}publicvoidsendMessage(String topic,String key,String value){ProducerRecord<String,String> record =newProducerRecord<>(topic, key, value);try{
            producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception !=null){System.err.println("Failed to send message: "+ exception.getMessage());}else{System.out.println("Message sent successfully to topic "+ metadata.topic()+" partition "+ metadata.partition()+" with offset "+ metadata.offset());}}});}catch(Exception e){System.err.println("Failed to send message: "+ e.getMessage());}}publicvoidclose(){
        producer.close();}}

然后是在其它文件中调用生产者发送消息的示例代码:

publicclassKafkaProducerDemo{publicstaticvoidmain(String[] args){String bootstrapServers ="localhost:9092,localhost:9093,localhost:9094";KafkaProducerExample producerExample =newKafkaProducerExample(bootstrapServers);try{
            producerExample.sendMessage("test-topic","key1","value1");
            producerExample.sendMessage("test-topic","key2","value2");}catch(Exception e){System.err.println("Exception occurred while sending messages: "+ e.getMessage());}finally{
            producerExample.close();}}}

在上面的代码中,我们创建了一个

KafkaProducerExample

类,该类的构造函数接受一个包含多个节点的Kafka集群地址字符串。

sendMessage

方法用于发送消息,并处理可能的异常。如果Kafka连接失败,或者消息发送失败,都会打印错误信息。

KafkaProducerDemo

类中,我们实例化了

KafkaProducerExample

,并调用了

sendMessage

方法发送消息,最后关闭了生产者实例。这样可以确保资源被正确释放。

你可以根据需要修改主题名、消息内容以及Kafka集群的地址。希望这些代码能帮助你实现功能。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

标签: java kafka linq

本文转载自: https://blog.csdn.net/sxc1414749109/article/details/141938290
版权归原作者 stormsha 所有, 如有侵权,请联系我们删除。

“Java Kafka生产者实现”的评论:

还没有评论