Flink系列之:动态发现新增分区
为了在不重新启动 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题分区订阅模式下定期发现新分区。要启用分区发现,请为属性partition.discovery.interval.ms设置一个非负值。
一、动态发现新增分区
flink程序增加自动发现分区参数:
- flink.partition-discovery.interval-millis是一个配置属性,用于设置Flink作业中的分区发现间隔时间(以毫秒为单位)。
- 在Flink作业中,数据源(例如Kafka或文件系统)的分区可能会发生变化。为了及时感知分区的变化情况,并根据变化进行相应的处理,Flink提供了分区发现机制。
- flink.partition-discovery.interval-millis配置属性用于设置Flink作业在进行分区发现时的间隔时间。Flink作业会定期检查数据源的分区情况,如果发现分区发生了变化(例如增加或减少了分区),Flink会相应地调整作业的并行度或重新分配任务来适应新的分区情况。
- 通过调整flink.partition-discovery.interval-millis的值,可以控制Flink作业进行分区发现的频率。较小的间隔时间可以实时感知到分区变化,但可能会增加作业的开销;较大的间隔时间可以减少开销,但可能导致较长时间的延迟。
- 需要注意的是,flink.partition-discovery.interval-millis的默认值是5分钟(300000毫秒),可以根据具体需求进行调整。
二、Flink SQL动态发现新增分区
参数:scan.topic-partition-discovery.interval
CREATETABLE KafkaTable (`event_time`TIMESTAMP(3) METADATA FROM'timestamp',`partition`BIGINT METADATA VIRTUAL,`offset`BIGINT METADATA VIRTUAL,`user_id`BIGINT,`item_id`BIGINT,`behavior` STRING
)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');
Connector Options:
OptionRequiredDefaultTypeDescriptionscan.topic-partition-discovery.intervaloptional(none)Duration消费者定期发现动态创建的Kafka主题和分区的时间间隔。
三、Flink API动态发现新增分区
参数:partition.discovery.interval.ms
Java
KafkaSource.builder().setProperty("partition.discovery.interval.ms","10000");// discover new partitions per 10 seconds
Python
KafkaSource.builder() \
.set_property("partition.discovery.interval.ms","10000")# discover new partitions per 10 seconds
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。