0


kafka-consumer-消费者代码实例

1 消费一个主题

消费topic为first的消息。

public class ConsumerTest{
    public void main(string[] args){
        // 0 配置
        Properties properties = new Properties();
        //连接bootstrap . servers
        properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092,                     hadoop103:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,        StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组id,必须配置,没有也要配置,不然会抛出异常
        properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );

        // 1 创建一个消费者" ", "hello"
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题first,可以订阅多个topic。
        ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
        kafkaConsumer.subscribe(topics);
        
        // 3 消费数据
        while (true){
            //每一秒拉取一次数据。
            ConsumerRecords<String,String> consumerRecords =      kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

2 消费一个分区

应用场景:当生产者将所有消息发往特定的某个主题分区。

消费first主题0号分区代码:

public class customConsumerPartition {
    public static void main(String[ ] args) {
        // 0 配置
        Properties properties = new Properties();
        //连接
        properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,    StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_    CONFI6,StringDeserializer.class.getName());
        //组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
        // 1 创建一个消费者
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
r<>(properties);
        // 2 订阅主题对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);

        // 3 消费数据
        while (true){
            ConsumerRecords<String,String> consumerReconds =     kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                system.out.println(consumerRecord);
            }
        }
    }
}

3 消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。

创建三个消费者对某一分区进行消费 。

将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。

  1. 生产者发送消息(方便阅读) 此时消息分布在0,1,2三个分区中。
  2. 消费结果发现,三个消费者每个消费者消费一个分区的数据。
标签: kafka

本文转载自: https://blog.csdn.net/weixin_43119856/article/details/128041001
版权归原作者 SeaDhdhdhdhdh 所有, 如有侵权,请联系我们删除。

“kafka-consumer-消费者代码实例”的评论:

还没有评论