背景:
我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
外部定时器定时加载实现
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
packagewikiedits.schedule;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.util.Collector;importorg.apache.flink.util.ExecutorUtils;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;publicclassScheduleRichMapFunctionextendsRichFlatMapFunction<String,String>{// 定时任务执行器privatetransientScheduledExecutorService scheduledExecutorService;// 本地变量privateint threshold;@Overridepublicvoidopen(Configuration parameters)throwsException{// 1.从db查询数据初始化本地变量// threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
scheduledExecutorService =Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(()->{// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key","value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");},0,100,TimeUnit.SECONDS);}@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{}@Overridepublicvoidclose()throwsException{ExecutorUtils.gracefulShutdown(100,TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现packagewikiedits.schedule;importcom.google.common.cache.Cache;importcom.google.common.cache.CacheBuilder;/**
* 保存Config信息的本地缓存 ---定时同步DB配置表的数据
*/publicclassConfigEntityLocalCache{privatestaticvolatileConfigEntityLocalCache instance =newConfigEntityLocalCache();/**
* 获取本地缓存实例
*/publicstaticConfigEntityLocalCachegetInstance(){return instance;}/** 缓存内存配置项 */privatestaticCache<String,String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/**
* 更新本地缓存数据
*/publicbooleanupdate(String key,String value){
configCache.put(key, value);returntrue;}/**
* 更新本地缓存数据
*/publicStringgetByKey(String key){return configCache.getIfPresent(key);}}
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
packagewikiedits.schedule;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importcom.google.common.cache.Cache;importcom.google.common.cache.CacheBuilder;/**
* 静态类定时加载DB配置表到本地内存中
*/publicclassStaticLoadUtil{// 定时任务执行器privatestatictransientScheduledExecutorService scheduledExecutorService;publicstaticfinalCache<String,String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static{
scheduledExecutorService =Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(()->{// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){
configCache.put("key","value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");},0,100,TimeUnit.SECONDS);}/**
* 获取本地缓存
*/publicstaticCache<String,String>getConfigCache(){return configCache;}}
总结:
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。