Kafka生产消费实战-JAVA
文章目录
生产者代码
publicstaticvoidmain(String[] args){Properties prop =newProperties();// 指定broker地址
prop.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");// 消息序列化
prop.put("key.serializer",StringSerializer.class.getName());
prop.put("value.serializer",StringSerializer.class.getName());// 创建生产者KafkaProducer producer =newKafkaProducer<String,String>(prop);// f发送数据String topic ="hello";
producer.send(newProducerRecord<String,String>(topic,"hello kafka producer"));// close
producer.close();}
消费者代码
publicstaticvoidmain(String[] args){Properties prop =newProperties();
prop.put("bootstrap.servers","192.168.52.100:9092,192.168.52.101:9092,192.168.52.102:9092");// 反序列化
prop.put("key.deserializer",StringDeserializer.class.getName());
prop.put("value.deserializer",StringDeserializer.class.getName());// 指定消费者组
prop.put("group.id","con-1");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(prop);Collection<String> topics =newArrayList<>();
topics.add("hello");// 订阅指定的topic
consumer.subscribe(topics);while(true){// 消费数据ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord message: consumerRecords
){System.out.println(message);}}}
消费者代码扩展
// 开启自动提交功能,默认是开启
prop.put("enable.auto.commit","true");// 自动提交时间间隔
prop.put("auto.commit.interval.ms","5000");// 先根据group.id指定的消费者组查询保存的offset信息// 如果找到了,说明之前消费过该消费组的消息,则根据之前保存的offset继续消费// 如果没有找到,说明是第一次消费,或者说是之前的offset对应的数据已经不存在了,此时就会根据auto.offset.reset 的值执行不同的消费逻辑// earliest:从最早的数据开始消费,从头开始// latest : 最新的数据开始消费-默认的策略// none : 抛出异常// 在实时计算的场景下,建议设置为latest// 这个参数只会在消费者第一次消费或者对应的offset没有数据的时候才会生效
prop.put("auto.offset.reset","latest");
Consumer消费offset查询
- kafka0.9之前,消费的offset信息是保存在zookeeper中,0.9之后使用了新的消费API,消费者的信息会保存在kafka里面的_consumer_offsets这个topic中
- 如何查询保存在kafka中的consumer的offset信息?
# 查询消费者信息[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --list --bootstrap-server hadoop01:9092
con-1
# 消费组描述[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop01:9092 --group con-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
con-1 hello 2110 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 3110 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 1000 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 0110 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 4220 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
Consumer消费顺序
- 当一个消费者消费一个partition的时候,消费的数据顺序和此partition数据的生产顺序是一致的
- 当一个消费者消费多个partition的时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition的数据
总之,如果一个消费者消费多个partition,只能保证消费者的数据顺序在一个partition内有序
Kafka的三种语义
- 至少一次:at-least-once,有可能对数据重复处理
// 将自动提交设置为false
prop.put("enable.auto.commit","false");// 手动提交
consumer.commitAsync();
- 至多一次:at-most-once,默认实现
- 仅此一次:exactly-once
本文转载自: https://blog.csdn.net/Grady00/article/details/136671676
版权归原作者 拉霍拉卡 所有, 如有侵权,请联系我们删除。
版权归原作者 拉霍拉卡 所有, 如有侵权,请联系我们删除。