0


kafka学习(第三部分)

  1. 问题:简单写一下消费测的示例代码?

public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.132:9091,192.168.239.132:9092,192.168.239.132:9093");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");//这个是指定消费组,必须。

    KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(properties);
    consumer.subscribe(Collections.singletonList("test")); //监听主题,可以是多个。
    String log = "topic:%s,partition:%s,offset:%s,消息体:%s";
    while (true) {
        ConsumerRecords<String, Object> consumerRecords = consumer.poll(100);
        consumerRecords.forEach(record -> {
            System.out.println(String.format(log, record.topic(), record.partition(), record.offset(), record.value()));
        });
        TimeUnit.SECONDS.sleep(1);
    }
}
  1. 问题:消费端的重分配是什么概念?

答:消费端的重分配是,当增加消费者后,会将主题的分片重新分配给不同的消费者。

  • 比如原先消费者1,匹配分片为0,1,2。

  • 当添加消费者2时,触发重新分配,消费者1匹配分片0,1。消费者2匹配分片2。

  • 当消费者数量大于分区数量时,多出来的消费者对接不上片区。

  1. 问题:kafka消费者的分配策略是如何配置的?

答:代码如下。

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

默认应该是Range,还有RoundRobin、Sticky等,其他的建议自己查询一下。

  • Range是分区的,尽量将相连的放在一起。

  • RoundRobin,循环。

  • Sticky,是粘性策略。

  1. 问题:kafka中的重试队列和死信队列有吗?

答:kafka没有提供相应的设计。

  • kafka中的重试队列是我们通过业务实现的,自己新加几个重试主题,消费者消费失败后就将消息发给重试队列。

  • kafka中的死信队列也是我们自己实现的,重试队列中的数据也处理失败后可以放到死信主题中,然后定时靠运维人员查看。

  • 死信队列是最终交给人来处理的消息的队列。

  1. 问题:kafka消费者是如何消费消息的?

答:消费者会启动一个循环任务,不断去集群中获取消息。这个是业务方,也就是我们来完成的。

  • kafka建议单线程执行消费。如果需要多线程,建议将消息手机到后重新分发。

  • 消费者每次去请求是按批获取的,之后每消费一个会提交一次消费地址通知给kafka集群。

  • 这里可能由于消费不及时导致消息在kafka集群中堆积,可以通过一下两个参数优化。

  • fetch.max.bytes,每次拉取的最大字节数,默认 Default: 52428800(50 m)。个人理解这个是一个阈值,当拉取时超过这个阈值才截断。

  • max.poll.records,一次拉取的最大消息条数,默认是500条。

  1. 问题:kafka中的消息是会持久化保存的,那消费者如何了解消费信息的?

答:消费信息是在broker中是通过消费位移来储存的,就是消费到那个位置了。

  • 消费位移现在是存在__consumer_offsets主题中的。

  • 消费位移的key是topic+partition+consumer group组合。

  • __consumer_offsets中有50个分区。一般kafka不允许消费该主题消息,当然有配置。

  • 消费者消费后会将消费后的位移反馈给broker。

  1. 问题:kafka消费者是如何提交消费位移的?

答:默认是自动化提交。

  • 涉及两个值是否开启默认提交和提交的时间间隔。

  • enable.auto.commit,是否开启自动提交。默认是true。

  • auto.commit.interval.ms,提交间隔,单位是ms,默认值是5000。

  • 默认提交的消费位移是离提交时间最近的一次拉取时的最大位移。

  • 自动提交有一定的安全问题,很有可能出现消息丢失。因为当你拉取到位移为0-10的消息后,消费到位移为5的消息的时候达到自动提交时间会将位移10作为消费位移提交给broker,但其实你并没有完成消费。此时发生宕机的话,再重启重新拉取数据是从10往后拉取,5-10位移没有消费的数据就丢失了。

  • 也可以设置手动提交。

  • 先配置consumer,自动提交为FALSE。

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  • 再配置手动提交,有两种。

  • 手动提交拉取到的最后位移。

while (true) {
    ConsumerRecords<String, Object> consumerRecords = consumer.poll(100);
    //A
    consumerRecords.forEach(record -> {
        System.out.println(String.format(log, record.topic(), record.partition(), record.offset(), record.value()));
    });
    TimeUnit.SECONDS.sleep(1);
    //B
    consumer.commitAsync();
}
  • 可以放在A和B处,但A处是同样存在信息丢失的风险,B处可能存在消息重复的风险。相对于消息重复,我们业务上更害怕消息丢失。

  • 手动提交指定位移。

while (true) {
    ConsumerRecords<String, Object> consumerRecords = consumer.poll(100);
    consumerRecords.forEach(record -> {
        System.out.println(String.format(log, record.topic(), record.partition(), record.offset(), record.value()));
        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")));
    });
    TimeUnit.SECONDS.sleep(1);
}
  • 选择是否指定位移提交需要考虑我们的消费速度:如果消费速度快建议不指定,会造成消费者和broker频繁交互。如果消费速度满,可以指定位移提交,可以大幅度降低消息丢失或重复的可能性。

  • 还有一个异步位移提交,使用可能性不高。就是监听tomcat停止时间来提交,这个实际业务中使用的不多。

  • 这里还有个问题是自动提交日志记录会很多,因为默认5s发一次,如果不处理有可能出现将硬盘资源消耗完的情况。为解决这个问题,kafka提供了日志压缩技术,因为消费位移最近一次的记录是有效的,之前的数据其实都是没用来,可以将这部分数据删除。删除后,日志文件自然就变小了。

  1. 问题:kafka消费者时如何重新指定消费位移的?

答:可以指定消费位移从开始或者最后或者指定消费位移位置。代码如下:

 KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(properties);
consumer.subscribe(Collections.singletonList("test"));

/*
 * 第一个参数是主题名称,第二个参数是分区编号。 
 */
TopicPartition test = new TopicPartition("test", 0);

//从开始
consumer.seekToBeginning(Collections.singletonList(test));
//从最后
consumer.seekToEnd(Collections.singletonList(test));
//指定位置
/*
 * 第二个参数是指定的消费位移,当然需要在消费的时候详细记录消费位移。 
 */
consumer.seek(test, 20);
  1. 问题:kafka的分区再均衡是什么概念?

答:分区再均衡就是添加一个消费者或者一个消费者奔溃后,分区会按照指定的分区策略完成再分配。这里需要注意的是再均衡发生的时机:

  • 调整分区数量。

  • 添加一个消费者。

  • 一个消费者关闭或者崩溃。

  1. 问题:kafka的消息是如何储存的?

答:我们先找到储存地址的如何配置的,在简单说明一下储存结构。

  • 储存地址是在broker的system.properties中的logs.dir属性配置的。

  • 打开目录后如下述所示,进入test-0,test主题0分区的数据。

drwxr-xr-x    2 0        0            204 Mar 10 09:02 test-0
drwxr-xr-x    2 0        0            178 Mar 10 09:02 test-1
drwxr-xr-x    2 0        0            178 Mar 10 09:02 test-2
...
drwxr-xr-x    2 0        0            167 Mar 10 03:27 __consumer_offsets-7
drwxr-xr-x    2 0        0            141 Mar  9 08:31 __consumer_offsets-8
drwxr-xr-x    2 0        0            141 Mar  9 08:31 __consumer_offsets-9
  • 打开对应结构后,可以返现主题+分区编号的目录,进入后发现有如下内容:
ls -nh

-rw-r--r--    1 0        0          10.0M Mar 10 09:02 00000000000000000000.index
-rw-r--r--    1 0        0            566 Mar  9 09:00 00000000000000000000.log
-rw-r--r--    1 0        0          10.0M Mar 10 09:02 00000000000000000000.timeindex
-rw-r--r--    1 0        0             10 Mar 10 08:11 00000000000000000014.snapshot
-rw-r--r--    1 0        0              8 Mar 10 09:02 leader-epoch-checkpoint
-rw-r--r--    1 0        0             43 Mar  9 14:21 partition.metadata
  • 其中 00000000000000000000 这个是分段segment,一般一个segment会保存1G数据。

  • 名字是储存的首个数据的偏移量的。

  • .log文件是储存的消息。

  • .index文件储存的是索引文件,为了快速过滤需要的消息。一般用在读取消息的时候。

  • 这里储存的是应该是内存地址,所以可以加快读取时的找寻速度。

  • .timeindex消息的时间索引文件。

  • .snapshot分区的快照信息。

  • 大小可以在server.properties中通过属性log.segment.bytes来指定,默认是:1073741824,1G。

  • leader-epoch-checkpoint 是 epoch 纪元的文件。

  1. 问题:kafka的消息是可以重复消费,那么消息删除一定是必须的,kafka有相关的配置和管理吗?

答:有,kafka提供了三种主要的删除策略:基于时间、基于大小和基于起始偏移量。

  • 基于时间的配置项是:og.retention.hours、log.retention.minutes、log.retention.ms

  • 基于大小的配置项是:log.retention.bytes,默认值是-1,单位是字节数。

  1. 问题:kafka未来速度用来零拷贝技术,能简单介绍一下零拷贝吗?

答:零拷贝现在主要由两种技术方向:sendfile和mmdp,底层用的是DMA技术。

标签: kafka

本文转载自: https://blog.csdn.net/tianqiuhao/article/details/129448578
版权归原作者 田秋浩 所有, 如有侵权,请联系我们删除。

“kafka学习(第三部分)”的评论:

还没有评论