0


大数据篇Kafka消息队列指定Topic打印Key、Value、Offset和Partition

1、概念简介

说到Apache Kafka消息传递系统时,以下是一些关键概念的解释:

Key(键):Kafka消息由Key和Value组成。Key是一个可选的字段,它通常用于消息的路由和分区策略。Key的目的是确保具有相同Key的消息被写入同一个分区。当消费者接收到消息时,可以使用Key来进行消息处理和路由操作。在某些情况下,Key还可以用于数据合并和聚合。

Value(值):Value是Kafka消息中包含的实际数据。它可以是任何形式的字节流,没有特定的格式要求。Value可以是文本、二进制数据、JSON、XML或任何其他格式的信息。消费者通常根据Value进行业务逻辑处理。

Offset(偏移量):Offset是一个用来唯一标识Kafka分区中每条消息的数字。每个分区都有自己的Offset序列,并且它们是连续递增的。Offset的作用是跟踪每个消费者在分区中的处理位置。当消费者读取消息时,它会保存最后处理的Offset,以便在下次读取消息时从正确的位置开始。

Partition(分区):Kafka将主题划分为多个分区,每个分区是一个有序的、持久化的日志文件。分区使得Kafka能够实现高吞吐量和水平扩展。在生产者写入消息时,Kafka会根据特定的分区策略将消息写入到合适的分区中。每个分区都有自己的一系列Offset,并且可以被独立地读取和复制。

总结起来,Kafka的消息由Key和Value组成,Key用于路由和分区策略,Value是实际的消息数据。每个消息都有一个唯一的Offset,用于跟踪消费者在分区中的处理位置。而分区则允许Kafka实现高吞吐量和扩展性。

2、代码实现

写一段代码打印一下当前Kafka队列中指定一个Topic,打印Key、Value、Offset和Partition

packagetest.scala;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.util.Collections;importjava.util.Properties;

public class KafkaDebug {
    public static void main(String[] args){String bootstrapServers ="hadoop101:9092";String topic ="TOPIC_TEST_MESSAGE";// 设置消费者配置
        Properties props =new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id","msg_group");
        props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("auto.offset.reset","earliest");// 创建消费者
        KafkaConsumer<String,String> consumer =new KafkaConsumer<>(props);// 订阅 Topic
        consumer.subscribe(Collections.singletonList(topic));// 从 Offset 0 开始消费
        consumer.poll(0);// 触发分区分配for(TopicPartition partition : consumer.assignment()){
            consumer.seek(partition,0);// 将消费者的偏移量设置为 0}// 消费消息并打印 Key 和 Offsetwhile(true){
            ConsumerRecords<String,String> records = consumer.poll(100);for(ConsumerRecord<String,String> record : records){
                System.out.println("Key: "+ record.key()+", Offset: "+ record.offset()+", Partition: "+ record.partition());
                System.out.println("Value:"+ record.value());}}}}

本文转载自: https://blog.csdn.net/programmer589/article/details/131754810
版权归原作者 申子辰林 所有, 如有侵权,请联系我们删除。

“大数据篇Kafka消息队列指定Topic打印Key、Value、Offset和Partition”的评论:

还没有评论