最近接一个任务,任务发到我这里的时候,已经由同事查了两天,没查出原因。 查了两天的问题转给我来查,当然是因为我能力强 (死马当活马医)。
了解任务背景
业务中发送了大量的kafka消息,因此为了避免kafka的历史消息堆积导致磁盘占满,业务会修改某几个topic的 消息保留时间。测试发现,页面上把topicA消息的保留时间改为只保留一天,但仍能查到超过一天的消息,也就是修改某个topic的消息保留时间,但没有其效果。(排查、分析过程比较长,可以直接跳过看总结)
排查
先按常规步骤试一下,KAFKA-UI上直接将retain时间改为1天中:Topics-[业务topic的名字]-右上角按钮-EditSetting-Time to retain data (in ms) 。
修改后观察到kafka日志确实有打印
ConfigRsource ... name='topicA' ... set configuration retention.ms to 86400000
的字样。 紧接着5分钟(
log.retention.check.interval.ms
)之内,又打印
partition=topicA ... Deleting
和
Deleted ... /data/topicA-0.log.deleted
。 使用
du -sm
查看
/data
目录,确实变小了,日志里面提到的文件也被删掉了。Kafka-ui页面上查的数据也是一天左右(有几条消息超出了十分钟左右)。
到此,我以为就完了。高兴的跟同事反馈:“你看,这个没问题。” 同事一脸不信:“我试了那么久都不行,你一下就行了?”(可能是我天选之人吧) 我拉着同事一起查看日志,他也信了,说着让我再试试其他topic,这次用系统的上接口来调用一下。我想着, 有道理, 也有可能是我们的使用的KAFKA API没调通导致的。
继续排查
打开我们自己的系统页面,点击配置,把topicA、topicB改为只保留一天。 观察到日志里打印 topicB的配置被修改成一天(topicA前面已经改了,这次没打日志也说得过去),紧接着是对topicB的删除。 事情是那么顺利,我不急不忙的打开kafka-ui查看topicB的数据:当前时间是下午两点,数据只保留到昨天的12点后的数据。嗯,只有一天。嗯? 但为何不是只保留有昨天2点后的数据? 可能清除的时候有点误差吧,再观察观察,反正5分钟(
log.retention.check.interval.ms
)后又要清理。
… 1个小时过去了 …
没等来topicB的二次清理。倒是topicA不定时的清理了几次。
我一边bing,一边KIMI搜索:KAFKA日志删除策略。
然后KIMI告诉我:
Kafka 的日志删除策略是通过 log.cleanup.policy 配置参数来控制的,该参数定义了 Kafka 如何处理不再需要的日志数据。以下是几种常见的日志删除策略:
delete:
这是最基本的删除策略,根据时间或大小来删除旧的日志分段。
当基于时间的删除配置(如 log.retention.hours)生效时,超过保留期限的日志分段将被删除。
compact:
这是一种针对键(key)的删除策略,适用于带有键的消息。
它保留了每个键的最新值,删除了旧值,但保留的消息数量不会超过 retention.bytes 配置的大小。
delete,compact(组合策略):
这种策略结合了 delete 和 compact。
它首先对消息进行压缩,删除具有相同键的重复消息,然后根据时间或大小删除旧的日志分段。
unified(适用于 Kafka 2.8 及更高版本):
这种策略是 Kafka 2.8 版本引入的新日志清理策略,旨在简化日志管理。
它提供了一种统一的日志删除方法,无论是否有键,都只保留最新的记录。
日志删除策略的配置示例:
log.cleanup.policy=delete
或者,对于需要压缩和删除的组合策略:
log.cleanup.policy=delete,compact
请注意,日志删除操作是异步执行的,Kafka 有一个后台线程定期检查并执行删除任务。此外,日志删除策略是 topic 级别的配置,可以为每个 topic 单独设置。
日志删除策略的执行还受到以下参数的影响:
log.retention.hours:基于时间的日志保留策略,设置日志文件的保留时间(小时)。类似参数还有log.retention.ms、log.retention.mintues,优先级为**ms>minutes>hours**。可以单独对某个topicsh设置,优先级大于全局的配置。(此处有修改,非KIMI原文)
log.retention.bytes:基于大小的日志保留策略,设置kafka日志最大大小,超过该大小将被清理。可以单独对某个topicsh设置,优先级大于全局的配置。(此处有修改,非KIMI原文)
log.retention.check.interval.ms:日志清理检查的频率。
合理配置这些参数对于管理 Kafka 集群的存储使用和性能至关重要。不当的配置可能导致数据丢失或存储空间的不必要增长。
这么长,你也懒得看,总结一下就是类似于log4j日志清理一样的策略:日志时间**超过指定保留时间(
log.retention.xx
)就删除,日志大小超过指定大小(
log.segement.bytes
)就删除**。二者满足其一即可。
看到这里, 我就猜测是不是上面两个参数没生效?在什么时候retention.ms被还原(超过一天)了?
(问了KIMI后)使用
kafka-configs.sh --bootstrap-server [kafkaIP:port] --describe --topic topicB
查看topicB的配置:
retention.ms=86400000 ... retention.bytes=-1
,
retention.ms=86400000
是只保留一天的数据,
retention.bytes=-1
的意思是,topicA 也是如此,没有什么不一样。
猜想
既然topicA和topicB的配置都一样,为何topicA能被反复清理,topicB只清理了一次?①为何topicA清理的次数也不固定?②
生成这些疑惑的时候,我又发现了一些其他现象:
1.目前topicB都不会有新数据进来。
2.topicA的数据比topicB多十倍的数据。
是不是 没有新数据导致的问题①?
数据量是不是清理频次的条件之一,导致了问题②?
- 新数据会触发清理吗? 我为topicB投递了几条新数据,并观察了一段时间(n个5分钟过后),并没有清理topicB。
- 数据量是不是清理频次的条件之一,导致了问题②? 之前查过topicA的配置
retention.bytes=-1
,因此不应该是数据量/大小 导致的。
再次问KIMI,会有哪些原因导致不清理/清理不及时,它跟我说,有系统负载高、日志分段没关闭等,逐一尝试并没有效果。甚至我怀疑是KAFKA的BUG,毕竟我用的版本不是最新的,但一番搜索无果。在我快放弃的时候,我在用bing国际版秀了一把英语
kafka log delete check interval bug
(关键词尽量堆上),看到一篇Kafka Log Retention – Delete Policy,嗯,很多英文,丢给KIMI分析:“xxxUrl 总结一下删除的逻辑”,它就给我输出了下面的内容:
保留策略:Kafka 不会在消息被消费后立即删除它们。删除日志保留是一种清理策略,当旧的日志分段在时间或大小上超过设定的阈值时,这些分段将被清除或删除。
你应该也有注意到了, 这里说到一个概念:日志分段。
日志分段(segment)
日志分段文件:Kafka 将每个主题的每个分区的数据存储在一系列日志分段中。每个日志分段由两个文件组成:日志数据文件(.log)和索引文件(.index)
新的猜想
这里我想起我在查看kafka的日志目录的时候,观察到的 topicA-0/xx.log,就是kafka的日志数据文件,它就是日志分段的一部分。到此,我合理怀疑,kafka清理策略依据的时间,就是取的log的最后写入时间。
验证猜想
观察到topicB的目录下只有一个log文件,它最后写入时间是12号下午4点,当前时间是12号下午5点。
因此我将topicA的消息保留时间改为30分钟。2分钟后,topicB开始清理,然后删除完成。kafka-ui查看数据,也没有昨天的数据了。
结合任务分析
验证了kafka清理策略依据的时间,就是取的log的最后写入时间。回过头来分析一开始观察到的现象,【1.保留有超过指定时间的数据】、【2.topicB的数据长时间不清理】,原因就是topicB的数据量小,超过1天的数据集中在1个分段文件中,而分段文件的最后写入时间,是今天。那么该segment不会被清理。
保证磁盘不堆积
分析到这里,对于我来说, 就够了,因为我们的系统是保证磁盘数据不会持续堆积,验证了kafka的删除策略是生效了的,即使保留有超过1天的数据,也不影响。因为当数据量小的时候,就多了1个segment而已。当数据量大的时候,会生成多个segment,会及时的在第二天被删除,如果担心堆积,可以结合
log.retention.bytes
去设置。
保证业务数据的有效性
如果有的朋友说,我的业务就是要求保证不包含指定时间外的数据该怎么办?都看到这里, 我就好心的告诉你参数吧
log.roll.ms
,它的作用是超过指定时间生成新的segment。比如需要保证只要一天的数据,那么把它改成12小时,这样最多只会保留(24小时–36小时内)的数据,尽可能的保证了业务。
总结
- Kafka 会基于空间大小判断,超过
log.retention.bytes
大小的数据将被删除。可以基于topic设置高优先级的配置。 - Kafka 会基于时间判断,只保留
log.retention.ms
时间内的分段(segment)文件。可以基于topic设置高优先级的配置。Kafka不会单独针对某条消息去删除,只会删除整个分段(segment)。 - Kakfa可以基于时间(
log.roll.ms
)和大小(log.retention.bytes
)来生成新的分段。可以通过控制这两个参数,来减小Kafka删除整个分段保留的消息时间的正确性。
参考
根据提供的链接 Kafka Log Retention – Delete Policy – cloudoses,以下是对 Kafka 删除逻辑的总结:
保留策略:Kafka 不会在消息被消费后立即删除它们。删除日志保留是一种清理策略,当旧的日志分段在时间或大小上超过设定的阈值时,这些分段将被清除或删除。
默认配置:Kafka 默认只按时间限制保留消息,并没有默认的大小限制。因此,需要适当配置集群和主题,使用 log.retention.bytes(集群范围)或主题级别的 retention.bytes 设置。
时间限制配置:保留时间可以通过集群范围的 log.retention.ms、log.retention.minutes 或 log.retention.hours 来设置,优先级从低到高。如果同时指定了多个,最小的单位将优先生效。
大小限制配置:对于按大小限制的保留,集群范围的 log.retention.bytes 或主题级别的 retention.bytes 应用于每个分区。例如,一个主题有 10 个分区,每个分区配置了 1GB 的 retention.bytes,则该主题最多保留 10GB 的数据。如果有副本,比如常见的副本因子为 3,则该主题的总存储需求估计为 30GB。
清理策略:清理策略应用于整个日志分段,而不是分段中单独的消息。日志清理器会检查分段年龄(最后一个消息写入到分段的时间戳)是否超过了保留时间限制。
删除保留政策的工作方式:日志清理器会检查是否有日志分段符合删除条件。值得注意的是,即使是活动的分段(即还有消费者正在使用的数据)也可能在某些边缘情况下被删除。
日志分段滚动:log.segment.bytes 定义了单个日志文件的最大大小,log.roll.ms 或 log.roll.hours 定义了即使日志文件未满也会创建新分段的最大时间,以确保日志清理可以进行。
清理检查频率:log.retention.check.interval.ms 定义了日志清理器检查是否有日志需要删除的频率。
集群和主题配置:表中列出了集群或 broker 级别的配置和主题覆盖配置,以及它们的默认值和描述。
请注意,Kafka 的日志清理操作是异步进行的,并且受多种配置参数影响。合理配置这些参数对于管理 Kafka 集群的存储使用和性能至关重要。
Kafka 的日志分段是 Kafka 存储机制中的关键概念,用于高效地管理大量数据。以下是 Kafka 日志分段的基本简述:
日志分段文件:Kafka 将每个主题的每个分区的数据存储在一系列日志分段中。每个日志分段由两个文件组成:日志数据文件(.log)和索引文件(.index)。
日志数据文件:实际存储消息数据的地方。消息以追加的方式写入,当文件达到一定大小时,会创建新的日志分段。
索引文件:包含日志数据文件中消息的索引信息,允许快速定位消息的偏移量和物理位置。
分段大小:日志分段的大小由 log.segment.bytes 配置参数决定,默认大小通常为 1GB。
滚动机制:当当前日志分段文件达到最大大小时,Kafka 会创建一个新的日志分段继续写入消息。
时间基础的滚动:除了大小,日志分段还可以基于时间进行滚动,由 log.roll.ms 或 log.roll.hours 参数控制。
保留策略:日志分段的保留基于 log.retention.bytes 和 log.retention.hours 等参数,可以配置每个日志分段的保留时间和大小。
清理操作:Kafka 定期检查并清理超出保留策略的日志分段,释放磁盘空间。
日志索引:Kafka 使用索引来加速消息的查找,包括偏移量索引和时间戳索引。
事务日志:对于事务性消息,Kafka 还使用 .txnindex 索引文件来支持事务性操作。
文件命名:日志分段文件按照 <topic-name>-<partition-id>-<segment-index>.log 格式命名。
副本和分区:每个日志分段在 Kafka 集群中的多个 broker 上以副本的形式存在,以提供数据的冗余和高可用性。
日志分段是 Kafka 高效存储和消息管理的核心,使得 Kafka 能够处理高吞吐量的数据流。
版权归原作者 thewindkee 所有, 如有侵权,请联系我们删除。