0


kafka用java收发消息

Kafka 是一个分布式流处理平台,它允许你发布和订阅记录流,类似于消息队列或企业消息系统。下面我将为你展示如何使用 Java 来发送(生产者)和接收(消费者)Kafka 中的消息。

1. 添加依赖

首先,你需要将 Kafka 客户端的依赖添加到你的 Java 项目中。如果你使用 Maven,可以添加以下依赖到你的

pom.xml

文件中:

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>你的Kafka客户端版本号</version></dependency><!-- 其他依赖... --></dependencies>

确保将

你的Kafka客户端版本号

替换为当前你需要的 Kafka 客户端版本。

2. 发送消息(生产者)

以下是一个简单的 Kafka 生产者示例,用于向 Kafka 主题发送消息:

importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerExample{publicstaticvoidmain(String[] args){Properties props =newProperties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <100; i++){ProducerRecord<String,String> record =newProducerRecord<>("my-topic",Integer.toString(i),"Hello Kafka "+ i);
            producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception !=null){
                        exception.printStackTrace();}else{System.out.printf("Record sent to partition %d with offset %d%n", metadata.partition(), metadata.offset());}}});}

        producer.close();}}

3. 接收消息(消费者)

以下是一个简单的 Kafka 消费者示例,用于从 Kafka 主题接收消息:

importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.Arrays;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerExample{publicstaticvoidmain(String[] args){Properties props =newProperties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id","test");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");Consumer<String,String> consumer =newKafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String> record : records){System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}}

注意:在实际应用中,你可能需要处理更多的异常和关闭资源,以及使用更复杂的配置和分区策略。上面的示例只是为了展示基本的使用方式。

标签: kafka java linq

本文转载自: https://blog.csdn.net/dulgao/article/details/139135252
版权归原作者 MarkHD 所有, 如有侵权,请联系我们删除。

“kafka用java收发消息”的评论:

还没有评论