0


flink中使用外部定时器实现定时刷新

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

packagewikiedits.schedule;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.util.Collector;importorg.apache.flink.util.ExecutorUtils;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;publicclassScheduleRichMapFunctionextendsRichFlatMapFunction<String,String>{// 定时任务执行器privatetransientScheduledExecutorService scheduledExecutorService;// 本地变量privateint threshold;@Overridepublicvoidopen(Configuration parameters)throwsException{// 1.从db查询数据初始化本地变量//        threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
        scheduledExecutorService =Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(()->{// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();//            for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key","value");//            }// 2.2 更新本地变量threshold的值//            threshold = DBManager.SELECTSQL.getConfig("threshold");},0,100,TimeUnit.SECONDS);}@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{}@Overridepublicvoidclose()throwsException{ExecutorUtils.gracefulShutdown(100,TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现packagewikiedits.schedule;importcom.google.common.cache.Cache;importcom.google.common.cache.CacheBuilder;/**
 * 保存Config信息的本地缓存 ---定时同步DB配置表的数据
 */publicclassConfigEntityLocalCache{privatestaticvolatileConfigEntityLocalCache instance =newConfigEntityLocalCache();/**
     * 获取本地缓存实例
     */publicstaticConfigEntityLocalCachegetInstance(){return instance;}/** 缓存内存配置项 */privatestaticCache<String,String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/**
     * 更新本地缓存数据
     */publicbooleanupdate(String key,String value){
        configCache.put(key, value);returntrue;}/**
     * 更新本地缓存数据
     */publicStringgetByKey(String key){return configCache.getIfPresent(key);}}

2.在静态类中通过static语句块创建定时器并定时加载,代码如下

packagewikiedits.schedule;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importcom.google.common.cache.Cache;importcom.google.common.cache.CacheBuilder;/**
 * 静态类定时加载DB配置表到本地内存中
 */publicclassStaticLoadUtil{// 定时任务执行器privatestatictransientScheduledExecutorService scheduledExecutorService;publicstaticfinalCache<String,String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static{
        scheduledExecutorService =Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(()->{// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){
            configCache.put("key","value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");},0,100,TimeUnit.SECONDS);}/**
     * 获取本地缓存
     */publicstaticCache<String,String>getConfigCache(){return configCache;}}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

标签: flink 大数据

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/133716692
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“flink中使用外部定时器实现定时刷新”的评论:

还没有评论