Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。
问题描述
Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要调整这些参数,本文使用Kafka2.5,在进入配置部分之前,首先需要安装Kafka。
安装
这里搭建单节点kafka代理,生产者应用发送消息给指定主题,该主题为单分区主题。
我们看到整个过程涉及多个环节,kafka生产者、kafka代理、主题、kafka消费者。因此,所有这些配置需要调整,以适用大消息传输。我们的目标是调整参数能够发送20M大消息。
Kafka生产者配置
第一步是消息产生地,我们使用Spring Kafka从应用发送消息给Kafka服务器。因此,首先需要更新
max.request.size
属性。详细细节可参考官方文档,该有效值为常量,对于Kafka Client库中的ProducerConfig.MAX_REQUEST_SIZE_CONFIG 定义,需要增加Spring Kafka依赖。我们配置其值为
20971520
字节:
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
Kafka主题配置
生产者发送消息给Kafka代理上的主题。因此,接下来选哟配置kafka主题,这意味着需要更新
max.message.bytes
属性,其默认值为1M。
该参数控制kafka压缩后(如果启用了压缩)最大记录批次大小,详细内容参考官方文档。我们可以通过cli命令手动配置该属性:
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520
当然也可以通过Kafka Client进行配置:
public NewTopic topic() {
NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "20971520");
newTopic.configs(configs);
return newTopic;
}
要发送大消息,至少需要配置这两个参数。
Kafka 代理配置
一个可选配置属性为:
message.max.bytes
,用于允许所有在代理上的主题接收大于1M的消息。该属性控制kafka压缩后(如何启用了压缩)允许的最大记录批次大小,详细内容参考官方文档。
通过在server.properties配置文件中增加下列属性:
message.max.bytes=20971520
另外,该属性将使用
message.max.bytes
和
max.message.bytes
两者中的最大值。
Kafka消费者配置
下面讨论Kafka消费端配置属性。虽然这些变化对消费大消息不是必须的,为了避免消费端程序性能问题,最好也调整相应参数:
max.partition.fetch.bytes
: 该属性限制从主题分区能消费的字节数,详细内容可参考官方文档。在Kafka Client 库中对应为ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG常量。fetch.max.bytes
: 该属性限制消费者从kafka服务器获取的字节数,kafka能够监听多个分区,详细内容参考官方文档。在Kafka Client 库中对应为ConsumerConfig.FETCH_MAX_BYTES_CONFIG 常量。
因此对配置消费者,需要创建KafkaConsumerFactory。另外需要说明的是:应该配置比topic/broker配置较大的值。
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}
这里配置值于前面一样,是因为创建的主题仅有一个分区。但是
FETCH_MAX_BYTES_CONFIG
值应该高于
MAX_PARTITION_FETCH_BYTES_CONFIG
。当消费者监听多个分区时,
FETCH_MAX_BYTES_CONFIG
表示从多个分区获取消息的大小,而
MAX_PARTITION_FETCH_BYTES_CONFIG
表示从单个分区获取消息的大小。
其他选项
前面提及配置Kafka生产者、主题、代理和Kafka消费者中的不同参数,以实现发送、接收大消息。单通常应该避免使用Kafka发送大消息,处理大消息会消耗生产者和消费者更多的CPU和内存,最终在一定程度上限制了它们处理其他任务的能,还有可能导致终端用户高延迟问题。
还有其他一些调优方式:
- Kafka生产者提供了压缩消息选项,它支持不同的压缩方法。我们可以
compression.type property
属性配置。 - 可以将大消息存储在共享存储位置,并通过Kafka消息发送该位置。这可能是更快的方式,具有最小的处理开销。
- 另一种选择是在生产者端将大消息拆分为每个大小为1KB的小消息。之后,我们可以使用分区键将所有这些消息发送到单个分区,以确保正确的顺序。然后在消费者端可以从较小消息重构为完整大消息。
总结
在本文中,我们介绍了配置调优Kafka选项以发送大于1MB的大消息。包括生产者端、主题、代理服务和消费者端的配置选项。其中一些选项是强制配置,一些是可选配置,虽然消费者配置是可选的,但可以避免负面的性能影响。最后,我们还介绍了发送大消息的其他可能选项。
内容参考:[Send Large Messages With Kafka](Send Large Messages With Kafka)
版权归原作者 梦想画家 所有, 如有侵权,请联系我们删除。