一、问题背景
使用 Kafka Producer 向指定 Topic 发送消息时,默认的消息大小限制为 1M。如果消息过大,需要调整哪些参数来实现消息的成功发送?
首先,我们需要熟悉发送消息的流程,以便了解各参数的作用环节。
二、Producer发送消息流程
Producer 发送消息涉及三个主要部分:主线程、消息缓冲区(ProducerAccumulator)和发送线程(Sender)。
主线程会将消息按照不同分区分别发送到消息缓冲区的指定队列中。
如果队列中的消息达到指定大小或指定时间,则唤醒发送线程将消息发送给服务端。
在这个过程中,有三个重要参数影响消息发送:Producer 端的 max.request.size、batch.size 和 Broker 端的 max.message.bytes。
接下来我们将结合消息发送的流程了解三个参数的作用。
三、参数作用
- 单条消息的大小限制
(1) 消息由Main线程发送到缓冲区时,KafkaProducer 会在 KafkaProducer#ensureValidRecordSize方法中检查单条消息是否超过max.request.size。
(2) 如果单条消息超过 max.request.size,则会报错。
record-size = 2000
max.request.size = 1000
2. 缓冲区的消息批次大小
(1) 接下来消息会发送到缓冲区ProducerAccumulator。缓冲区会为每个分区创建一个双端队列以存放对应分区的信息。基于微批传输的设计,队列会使用ProducerBatch来存放一批消息。
batch.size决定了ProducerBatch的大小。
batch.size则取{用户配置的batch.size,单条消息}的最大值。
注:如前所述,用户配置的batch.size可能会小于单条消息大小,在这种情况下,消息仍可能会正常发送。
record.size = 100000
batch.size = 8000
(2)在kafka.log.Log#analyzeAndValidateRecords会限制batch.size要小于max.message.bytes。
测试表明,如果batch.size超过max.message.bytes 则会报MESSAGE_TOO_LARGE错误
3. Sender线程中消息批次的发送
当批次大小达到达到batch.size或linger.ms后,会唤醒sender线程发送数据
kafka中各个分区的leader partition会在Broker端的不同节点。在Sender线程中,会为每个节点创建一个请求。
所以Sender线程从缓冲区ProducerAccumulator拉取数据时,会再次按照节点分组,将发往同一个节点的不同分区的batch打包在一起。
在Sender#sendProducerData方法中会限制 发送到单个节点的消息大小要低于max.request.size。
故对于batch.size和max.request.size两个参数,一般建议后者大于前者,以便在sender的一个请求中能发送较多的batch。
故综上所述,如果消息体过大,则一般调整max.request.size和max.message.bytes即可,batch.size则保持默认即可。
四、参考
官网
彻底搞懂 Kafka 消息大小相关参数设置的规则
版权归原作者 HuailiShang 所有, 如有侵权,请联系我们删除。