1 消费一个主题
消费topic为first的消息。
public class ConsumerTest{
public void main(string[] args){
// 0 配置
Properties properties = new Properties();
//连接bootstrap . servers
properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092, hadoop103:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消费者组id,必须配置,没有也要配置,不然会抛出异常
properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );
// 1 创建一个消费者" ", "hello"
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题first,可以订阅多个topic。
ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
//每一秒拉取一次数据。
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
2 消费一个分区
应用场景:当生产者将所有消息发往特定的某个主题分区。
消费first主题0号分区代码:
public class customConsumerPartition {
public static void main(String[ ] args) {
// 0 配置
Properties properties = new Properties();
//连接
properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ CONFI6,StringDeserializer.class.getName());
//组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
// 1 创建一个消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
r<>(properties);
// 2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);
// 3 消费数据
while (true){
ConsumerRecords<String,String> consumerReconds = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
system.out.println(consumerRecord);
}
}
}
}
3 消费者组案例
测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。
创建三个消费者对某一分区进行消费 。
将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。
- 生产者发送消息(方便阅读) 此时消息分布在0,1,2三个分区中。
- 消费结果发现,三个消费者每个消费者消费一个分区的数据。
版权归原作者 SeaDhdhdhdhdh 所有, 如有侵权,请联系我们删除。