0


自定义Flink SourceFunction定时读取数据库

文章目录


前言

Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的 sources。Flink提供了多种预定义的 stream source:基于文件、 套接字、集合等source;但没用提供数据库相关的Source。

一、自定义Flink SourceFunction定时读取数据库

有些场景需要定时的读取不断变化的数据库数据作为流数据。本文中的代码实现适用于所有关系数据库。

  • 在构造方法中传递数据库连接参数、定时周期等信息
  • run:在run中定时读取数据库数据并emit到发送到下一节点。
  • cancel: 取消一个 source,running状态改为false将 run 中的循环 emit 元素的行为终止。

二、java代码实现

/**
 * 关系库流数据源 
 *
 */publicclassDbSourceFunctionextendsRichSourceFunction<Row>{privatestaticfinallong serialVersionUID =1L;privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DbSourceFunction.class);privatevolatileboolean isRunning =true;privateString driver =null;//执行周期(秒)privateLong period =null;privateJSONObject conf;privateDataBaseType baseType;publicDbFullSourceFunction(JSONObject conf,DataBaseType baseType){this.conf = conf;this.baseType = baseType;this.driver = baseType.getDriverClassName();// 执行周期
        period = conf.getLong("period");//周期单位String unit = conf.getString("executionWay","seconds");if(period !=null&& period >0){//根据时间单位转换为秒
            period =FuntionUtil.getSeconds(unit, period);}}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);}@Overridepublicvoidrun(SourceContext<Row> ctx)throwsException{while(isRunning){String querySql = conf.getString(Key.QUERY_SQL);List<JSONObject> columnList = conf.getList(Key.COLUMN);int len = columnList.size();Connection connect =null;PreparedStatement ps =null;ResultSet rs =null;try{while(connect ==null){try{
                        connect =getConnection();if(connect !=null){break;}}catch(Exception w){LOG.error("获取连接异常", w.getMessage());}}
                ps = connect.prepareStatement(querySql);try{
                    rs = ps.executeQuery();while(rs.next()){Row row =newRow(len);for(int i =0; i < len; i++){JSONObject column = columnList.get(i);Integer columnType = column.getInt(Key.COLUMN_TYPE);//将ResultSet数据转换为Flink RowRowSetFieldUtil.rowSetFieldResultSet(row, rs, i, columnType, baseType);}// 发送结果
                        ctx.collect(row);}}catch(Exception e){LOG.error("查询出现异常",e);if(ps !=null){
                        ps.close();}if(connect !=null){
                        connect.close();}}}catch(Exception e){LOG.error("查询数据异常", e);throw e;}finally{if(rs !=null){
                    rs.close();}if(ps !=null){
                    ps.close();}if(connect !=null){
                    connect.close();}}if(period ==null|| period <=0){
                isRunning =false;}else{Long takeTime =(end - start)/1000;//去掉执行消耗时间LOG.error("sleep time:"+(period - takeTime));TimeUnit.SECONDS.sleep(period - takeTime);}}}@Overridepublicvoidcancel(){
        isRunning =false;}privateConnectiongetConnection(){Connection connection =null;try{String username = conf.getString(Key.USERNAME);String password = conf.getString(Key.PASSWORD);
            password =PubFunction.decryptStr(password);String jdbcUrl = conf.getString(String.format("%s[0]",Key.JDBC_URL));// 创建连接
            connection =DriverManager.getConnection(jdbcUrl, username, password);}catch(Exception e){LOG.error("get connection occur exception", e);thrownewRuntimeException("get connection occur exception", e);}return connection;}}

总结

完整代码请点击下载自定义Flink SourceFunction定时读取数据库java代码下载。

标签: 数据库 flink

本文转载自: https://blog.csdn.net/gwc791224/article/details/135527878
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。

“自定义Flink SourceFunction定时读取数据库”的评论:

还没有评论