💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
- 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~
- 专栏导航- Python系列: Python面试题合集,剑指大厂- Git系列: Git操作技巧- GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者- 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等- 运维系列: 总结好用的命令,高效开发- 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨💖The Start💖点点关注,收藏不迷路💖#### 📒文章目录
下面是一个可以连接多个节点的Kafka生产者类,并且在其它文件中调用生产者发送消息的示例代码。代码包含了Kafka连接失败和发送消息失败的异常处理。
首先,确保你已经导入了Kafka的依赖。如果你使用的是Maven,可以在
pom.xml
文件中添加以下依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
接下来是Kafka生产者类的实现:
importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.clients.producer.Callback;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassKafkaProducerExample{privateKafkaProducer<String,String> producer;publicKafkaProducerExample(String bootstrapServers){Properties props =newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());try{
producer =newKafkaProducer<>(props);}catch(Exception e){System.err.println("Failed to create Kafka producer: "+ e.getMessage());thrownewRuntimeException(e);}}publicvoidsendMessage(String topic,String key,String value){ProducerRecord<String,String> record =newProducerRecord<>(topic, key, value);try{
producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception !=null){System.err.println("Failed to send message: "+ exception.getMessage());}else{System.out.println("Message sent successfully to topic "+ metadata.topic()+" partition "+ metadata.partition()+" with offset "+ metadata.offset());}}});}catch(Exception e){System.err.println("Failed to send message: "+ e.getMessage());}}publicvoidclose(){
producer.close();}}
然后是在其它文件中调用生产者发送消息的示例代码:
publicclassKafkaProducerDemo{publicstaticvoidmain(String[] args){String bootstrapServers ="localhost:9092,localhost:9093,localhost:9094";KafkaProducerExample producerExample =newKafkaProducerExample(bootstrapServers);try{
producerExample.sendMessage("test-topic","key1","value1");
producerExample.sendMessage("test-topic","key2","value2");}catch(Exception e){System.err.println("Exception occurred while sending messages: "+ e.getMessage());}finally{
producerExample.close();}}}
在上面的代码中,我们创建了一个
KafkaProducerExample
类,该类的构造函数接受一个包含多个节点的Kafka集群地址字符串。
sendMessage
方法用于发送消息,并处理可能的异常。如果Kafka连接失败,或者消息发送失败,都会打印错误信息。
在
KafkaProducerDemo
类中,我们实例化了
KafkaProducerExample
,并调用了
sendMessage
方法发送消息,最后关闭了生产者实例。这样可以确保资源被正确释放。
你可以根据需要修改主题名、消息内容以及Kafka集群的地址。希望这些代码能帮助你实现功能。
🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙
💖The End💖点点关注,收藏不迷路💖
版权归原作者 stormsha 所有, 如有侵权,请联系我们删除。