0


配置Kafka发送大消息

Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。

问题描述

Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要调整这些参数,本文使用Kafka2.5,在进入配置部分之前,首先需要安装Kafka。

安装

这里搭建单节点kafka代理,生产者应用发送消息给指定主题,该主题为单分区主题。

我们看到整个过程涉及多个环节,kafka生产者、kafka代理、主题、kafka消费者。因此,所有这些配置需要调整,以适用大消息传输。我们的目标是调整参数能够发送20M大消息。

Kafka生产者配置

第一步是消息产生地,我们使用Spring Kafka从应用发送消息给Kafka服务器。因此,首先需要更新

max.request.size

属性。详细细节可参考官方文档,该有效值为常量,对于Kafka Client库中的ProducerConfig.MAX_REQUEST_SIZE_CONFIG 定义,需要增加Spring Kafka依赖。我们配置其值为

20971520

字节:

public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

    return new DefaultKafkaProducerFactory<>(configProps);
}

Kafka主题配置

生产者发送消息给Kafka代理上的主题。因此,接下来选哟配置kafka主题,这意味着需要更新

max.message.bytes

属性,其默认值为1M。

该参数控制kafka压缩后(如果启用了压缩)最大记录批次大小,详细内容参考官方文档。我们可以通过cli命令手动配置该属性:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520 

当然也可以通过Kafka Client进行配置:

public NewTopic topic() {
    NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "20971520");
    newTopic.configs(configs);

    return newTopic;
}

要发送大消息,至少需要配置这两个参数。

Kafka 代理配置

一个可选配置属性为:

message.max.bytes

,用于允许所有在代理上的主题接收大于1M的消息。该属性控制kafka压缩后(如何启用了压缩)允许的最大记录批次大小,详细内容参考官方文档。

通过在server.properties配置文件中增加下列属性:

message.max.bytes=20971520

另外,该属性将使用

message.max.bytes

max.message.bytes

两者中的最大值。

Kafka消费者配置

下面讨论Kafka消费端配置属性。虽然这些变化对消费大消息不是必须的,为了避免消费端程序性能问题,最好也调整相应参数:

  • max.partition.fetch.bytes: 该属性限制从主题分区能消费的字节数,详细内容可参考官方文档。在Kafka Client 库中对应为ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG常量。
  • fetch.max.bytes: 该属性限制消费者从kafka服务器获取的字节数,kafka能够监听多个分区,详细内容参考官方文档。在Kafka Client 库中对应为ConsumerConfig.FETCH_MAX_BYTES_CONFIG 常量。

因此对配置消费者,需要创建KafkaConsumerFactory。另外需要说明的是:应该配置比topic/broker配置较大的值。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");

    return new DefaultKafkaConsumerFactory<>(props);
}

这里配置值于前面一样,是因为创建的主题仅有一个分区。但是

FETCH_MAX_BYTES_CONFIG

值应该高于

MAX_PARTITION_FETCH_BYTES_CONFIG

。当消费者监听多个分区时,

FETCH_MAX_BYTES_CONFIG

表示从多个分区获取消息的大小,而

MAX_PARTITION_FETCH_BYTES_CONFIG

表示从单个分区获取消息的大小。

其他选项

前面提及配置Kafka生产者、主题、代理和Kafka消费者中的不同参数,以实现发送、接收大消息。单通常应该避免使用Kafka发送大消息,处理大消息会消耗生产者和消费者更多的CPU和内存,最终在一定程度上限制了它们处理其他任务的能,还有可能导致终端用户高延迟问题。

还有其他一些调优方式:

  • Kafka生产者提供了压缩消息选项,它支持不同的压缩方法。我们可以compression.type property属性配置。
  • 可以将大消息存储在共享存储位置,并通过Kafka消息发送该位置。这可能是更快的方式,具有最小的处理开销。
  • 另一种选择是在生产者端将大消息拆分为每个大小为1KB的小消息。之后,我们可以使用分区键将所有这些消息发送到单个分区,以确保正确的顺序。然后在消费者端可以从较小消息重构为完整大消息。

总结

在本文中,我们介绍了配置调优Kafka选项以发送大于1MB的大消息。包括生产者端、主题、代理服务和消费者端的配置选项。其中一些选项是强制配置,一些是可选配置,虽然消费者配置是可选的,但可以避免负面的性能影响。最后,我们还介绍了发送大消息的其他可能选项。

内容参考:[Send Large Messages With Kafka](Send Large Messages With Kafka)

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/neweastsun/article/details/127052911
版权归原作者 梦想画家 所有, 如有侵权,请联系我们删除。

“配置Kafka发送大消息”的评论:

还没有评论