0


Kafka(二)【文件存储机制 & 生产者】

一、Kafka 文件存储机制

    Kafka中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

    topic 只是逻辑上的概念,而 partition 是物理上的概念。每个partition对应于一个log文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

    由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个 partition 分为多个 segment 。每个 segment 对应两个文件 ——“.index”文件和“.log”文件(一个存储当前文件的索引范围,一个存储真正的数据)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,like 这个 topic 有三个分区,则其对应的文件夹为 like-0,like-1,like-2:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

这种存储规则很像我们 Spark Shuffle 阶段产生的文件的存储规则,顺便回忆一下 Spark 的 Shuffle 阶段:

    Shuffle 过程中每个 Map 任务会产生两个文件,即数据文件和索引文件。其中,数据文件是存储当前 Map 任务的输出结果,而索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的 Reduce 任务就是根据索引文件来获取属于自己处理的那个分区的数据。

    其实 Spark 常见的有两种 Shuffle 策略:HashShuffle 和 SortShuffle ,但是这属于 Spark 调优的内容了,现在不必深究。

二、Kafka 生产者

1、生产者消息发送流程

    我们知道,Kafka 的三大组成部分:Producer、Broker、Consumer。接下来我们要学习的部分就是 Kafka 的生产者是如何把数据发送给 Kafka 集群(Broker)的。

1.1、发送原理

    在消息发送的过程中,涉及到了**两个线程——main线程和Sender线程**。在main线程中创建了**一个双端队列RecordAccumulator**。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main 线程:

  • 拦截器是可选的(一般用 flume 的拦截器,配置更简单)
  • 序列化器(用的是Kafka自己的序列化器,而不是Java的Serializable,因为Java的序列化器太繁重)
  • 分区器(决定数据往哪个分区存储)
  • RecordAccumulator 默认 32MB(内存),分区器会把数据发送到这里,一个分区会创建一个队列,方便数据管理(都是在内存中完成的),队列中每份数据的大小(batch.size)默认是16K

sender线程:

  • 只有数据积累到 batch.size 之后,sender 才会发送数据,默认16K
  • 如果队列中的一份数据迟迟满不了(达不到 batch.size),sender 会等待 linger.ms 后发送这份数据,默认是 0ms
  • NetworkClient:我们发送数据的请求会被放到一个队列(InFlightRequests 队列,通过源码可以看到其底层数据结构Map<String, Deque<ClientRequest>> 以 broker 为 key,队列中放的是一个个发送请求)中去,当第一个请求发出后未得到响应仍然会继续发送第二个请求,但是这个队列的大小默认是 5,也就是说,如果五个请求都发出去后且都没有得到响应,那么就不可以再发出请求,因为默认每个 broker 最多缓存5个请求。
  • Sender线程通过Selector把上面的请求和数据一起发送到Kafka集群,当数据发送到Kafka集群后,Kafka集群会进行副本的复制并返回对应的acks信息。acks有三种应答级别,分别是ACK=0、ACK=1和ACK=all或ACK=-1(默认级别)。 ACK=0:生产者在消息发送后不会等待来自服务器的任何确认。这意味着生产者无法知道消息是否成功存储在Kafka集群中。这个级别提供了最高的吞吐量,但在可靠性方面是最低的,因为可能会丢失消息。ACK=1:生产者会等待直到消息的领导者副本(Leader Replica)确认接收到消息。一旦领导者副本存储了消息,生产者会收到一个确认。这个级别在性能和数据可靠性之间提供了一个平衡。但如果领导者副本在确认后发生故障,而消息还未复制到追随者副本(Follower Replicas),则消息可能会丢失。ACK=all或ACK=-1(默认级别):生产者会等待消息被所有的同步副本(ISR,In-Sync Replicas)确认。这意味着只有当所有的同步副本都已经接收并存储了消息,生产者才会收到一个确认。这个级别提供了最高的数据可靠性,但可能会牺牲一些性能,因为需要等待所有副本的确认。
  • 如果数据发送成功,就把 InFlighRequests 队列中的请求缓存给清除掉,并且把对应 RecordAccuulator 中的数据清除掉。
  • 如果数据发送失败,会进行重试,重试的次数默认是(Intege.MAX_VALUE),也就是死磕到底,直到发送成功为止。

参数名称

描述

bootstrap.servers

生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。一定要写全类名。

buffer.memory

RecordAccumulator缓冲区总大小,默认32m。

batch.size

缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。

如果设置了重试,还想保证消息的有序性,需要设置

MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:none、gzip、snappy、lz4和zstd。

2、异步发送 API

send 方法有两种传参方式,一种有回调函数,一种不带回调函数。

2.1、普通异步发送

    所谓的异步发送指的是外部的数据使用异步的方式吧数据发送到 RecordAccumulator 的内存队列当中去,异步的体现就是外部数据发送到队列中后,并不会等待 RecordAccumulator 把数据传给 Broker 节点并返回成功的消息才继续发送;而是直接把数据扔到 RecordAccumulator 的内存队列后就撒手不管了。
案例演示

导入依赖:

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

注意:使用 kafka 前必须启动 zookeeper,不然报错无法使用。

public class CustomProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","test"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

我们可以看到,一条消息就是一个 ProducerRecord 对象,不管你的消息多么短,哪怕是一个标点,它也会被包装进一个对象里面去。

运行结果:

    我们发现,我们的编程步骤和我们上面Kafka的发送原理中生产者的发送步骤是一致的,只不过我们这里没有设置拦截器和分区器,这是一个最简单的 Kafka 生产者程序了。

2.2、带回调函数的异步发送

现在我们来使用带回调函数的异步发送:

只需要修改send方法即可:

    for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果:

这种带回调函数的方法可以让我们知道数据更多的元数据信息(比如主题、分区...)。

2.3、同步发送 API

所谓的同步发送,就是外部数据发送到 RecordAccumulator 的内存队列后,必须等待数据被 Selector 发送到 Broker 并返回响应信息才能继续发送。代码的实现很简单,只需要在send方法之后加一个 get() 即可:

for (int i = 0; i < 5; i++) {
      kafkaProducer.send(new ProducerRecord<>("like","test"+i)).get();
}

3、生产者分区

3.1、分区的好处

分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。

所以,从大数据的存储和计算的角度来看,分区有这么两种好处:

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡。
  2. 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:

接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:

1、指定分区:

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0

面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?

答:生产者在发送数据时指定数据的分区为 表名 。

2、不指定分区,只指定 key:

kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });

我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:

topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1

3,不指定 partition 也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。

我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:

for (int i = 0; i < 25; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            Thread.sleep(2);
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 1

我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。

3.3、自定义分区器

在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。

1)需求
    实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。
2)实现步骤
  1. 实现 Partitioner 接口

  2. 重写 partition() 方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String val = value.toString();
        if (val.contains("大傻春"))
            return 0;
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用自定义分区器:

// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 发送数据
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record;
    if (i==3)
        record = new ProducerRecord<>("like", "" + i, "大傻春");
    else
        record = new ProducerRecord<>("like", ""+i,"test" + i);

kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> {
    if (e == null){ // 如果异常为空 说明正常执行
           System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());
       }
    });
}

4、生产经验

4.1、生产者如何提高吞吐量

    我们外部的数据是放到 RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置 linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。

    为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:
  • batch.size :内存队列中每个批次的大小,默认 16K

  • linger.ms:等待时间,修改为 5-100ms

      当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。
    
  • compression.type:压缩 snappy

  • RecordAccumulator:缓冲区大小,修改为 64 MB

public class CustomProducerParameters {
    public static void main(String[] args) {

        // 0. 配置信息
        Properties properties = new Properties();
        // 连接 kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        // key 和 value 的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // batch.size 单位: KB
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd
        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB

        // 1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","value"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

4.2、数据可靠性

    数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。

我们知道,ack 的值有三种:

  1. ack = 0:发送完直接走,不用等响应,可靠性差,但是效率高。(一般不用)
  2. ack = 1:发送完需要等待 leader 节点响应才能继续发送下一个数据,可靠性中等,效率中等。
  3. ack = -1:发送完需要等待所有节点(leader 和 其他 fllower)响应才能继续发送下一个数据。可靠性高,效率低。

    在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。

完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。

代码中配置 ACK 级别:

// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);

4.3、数据去重

4.3.1、数据传递语义

就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:

  • 至少一次(At least once)= ACK级别设置为1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
  • 最多一次(At most once)= ACK级别设置为0
  • 总结:- Al least once 可以保证数据不丢失,但是不能保证数据不重复;- At most once 可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次(Exactly once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失。

Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务。

4.3.2、幂等性
  • 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。
  • 精确一次(Exactly once)= 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR 最小副本数 >= 2)

    重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。

    幂等性只能保证数据在单分区内不会重复。

配置幂等性

enable.idempotence 默认就是 true,flase 关闭。

4.3.3、生产者事务

我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的 PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。

说明:开启事务必须开启幂等性。

其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)

Kafka 的事务一共如下 5 个 API:

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

测试:

public class CustomProducerTransaction {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2. 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("like","test"+i));
            }
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            kafkaProducer.abortTransaction();   // 终止事务
        }finally {
            // 3. 关闭资源
            kafkaProducer.close();
        }
    }
}

注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。

4.4、数据有序

    数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。

    至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。

    单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。

4.5、数据乱序

Kafka 1.x 版本之后保证数据单分区有序的条件:

  • 未开启幂等性 - max.in.flight.requests.per.connection 设置为 1(相当于 NetworkClient 中只能存在一个请求,那数据肯定是有序的,都不要使用幂等性)
  • 开启幂等性 - max.in.flight.requests.per.connection 需要设置为 <= 5(启用幂等性之后,Kafka 服务端会缓存最近的 5 个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。而且这个配置的值不能>5,因为Kafka服务端最多缓存5个请求)

解释

    为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。        

标签: kafka 分布式

本文转载自: https://blog.csdn.net/m0_64261982/article/details/135680471
版权归原作者 让线程再跑一会 所有, 如有侵权,请联系我们删除。

“Kafka(二)【文件存储机制 & 生产者】”的评论:

还没有评论