生产者生产数据有两种模式:一种是同步模式,一种是异步模式。
同步模式:生产者生产一条数据,就保存一条数据,保存成功后,再生产下一条数据,能够保证数据不丢失,但是效率太低了。
异步模式(采用ack机制):
Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 1; i <= 600; i++) { kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i)); System.out.println("testkafka"+i); } kafkaProducer.close();
在producer端开启一块buff缓冲,用来缓存数据,缓存一批数据,保存到partition当中.
0:生产者生产数据,不管leader是否保存成功,follower是否同步成功,继续发送下一批数据
1:生产者生产数据,只保证leader保存成功,不管follower是否同步成功,继续发送下一批数据.
-1或者all:生产者生产数据,既要保证leader保存成功,也要保证follower同步成功,继续发送下一批数据.
2.broker端
在Broker端,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。
这样整个集群内的消息不会丢失。
3.消费者
自动提交偏移量改成手动提交偏移量
设置 enable.auto.commit = false , 默认值true,自动提交
手动提交offset
使用kafka的Consumer的类,用方法consumer.commitSync()提交
或者使用spring-kafka的 Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)
版权归原作者 真离谱 所有, 如有侵权,请联系我们删除。