0


FlinkSQL优化器查询重写技术引发UDF翻倍调用问题分析及解决方案

​ Flink SQL无疑是实时数仓领域一个最耀眼的明星,他对于统一流批一体的设计可谓是居功至伟。鉴于Flink SQL在实时数仓领域的卓越表现,我们很有必要对Flink SQL在ETL场景下的表现要有深刻的理解。本文聚焦于Flink SQL UDF使用场景下由于SQL重写导致UDF翻倍调用的原理分析及对应的应对策略。

一 场景复现

​ 这里通过一个案例来说明问题,本次依然采用《网上书店项目实时数仓学习模拟数据源》案例进行问题说明。数据源更多的说明请参考笔者对应的博客。

  1. 这里以实时维表关联为例,首先来看UDF定义privatestaticJedisUtil jedis;@Overridepublicvoidopen(FunctionContext context)throwsException{ super.open(context); jedis =JedisUtil.getInstance();}/** * Flink 维表查询 UDF 定义 * * @param nkey * @param timestamp */publicStringeval(String nkey,Long timestamp){ // 1 构造Redis Sorted set集合keyString key ="dim_data:"+nkey;// 2 从Sorted set集合获取key对应的版本数据集合Set<String> values = jedis.SORTSETS.zrange(key,0,-1);// 3 获取对应的版本数据String versionKey =null;for(String value:values){ String[] split = value.split(":");if(Long.valueOf(split[2])> timestamp){ break;}else{ versionKey = value;}}// 4 构造hash对应的keyString hashKey =null;if(versionKey !=null){ hashKey ="versions:"+versionKey;}// 5 获取版本数据Map<String,String> versionValue = jedis.HASHS.hgetAll(hashKey);// 6 返回数据String result =JSONUtils.getJSONObjectFromMap(versionValue).toJSONString();// *** 这里的打印条件用来跟踪UDF被调用次数 *** System.out.println("Log => "+System.currentTimeMillis()+" : "+ result);return result;}这个UDF支持订单明细数据源从Redis中查询对应的图书维度数据,其中System.out.println("Log => " + System.currentTimeMillis() + " : " + result);用来跟踪UDF被调用次数,理论上每调用一次UDF都会打印一条日志记录。
  2. Flink SQL主程序代码如下publicstaticvoidredisUDFDimQueryWithSubQueryDemo(){ StreamExecutionEnvironment executionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(executionEnvironment); executionEnvironment.setParallelism(1); tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");// 注册UDF tableEnvironment.createTemporarySystemFunction("dim_product_with_versions",DimProductWithVersionsForRedis.class);// 定义订单明细数据源 tableEnvironment.executeSql("create table tbl_order_detail(\n"+" order_book_id int comment '订单明细ID',\n"+" order_id int comment '订单ID',\n"+" book_id int comment '图书ID',\n"+" book_number int comment '图书下单数量',\n"+" original_price double comment '原始交易额',\n"+" actual_price double comment '实付交易额',\n"+" discount_price double comment '折扣金额',\n"+" create_time string comment '下单时间',\n"+" update_time bigint comment '更新时间戳'\n"+")with(\n"+" 'connector' = 'kafka',\n"+" 'topic' = 'tbl_order_detail',\n"+" 'properties.bootstrap.servers' = 'localhost:9092',\n"+" 'properties.group.id' = 'testGroup',\n"+" 'scan.startup.mode' = 'latest-offset',\n"+" 'format' = 'json',\n"+" 'json.fail-on-missing-field' = 'false',\n"+" 'json.ignore-parse-errors' = 'true'\n"+" );");// 打印SQL执行计划 String explainSql = tableEnvironment.explainSql("select\n"+"\n"+" order_book_id,\n"+" order_id,\n"+" book_id,\n"+" json_value(dim_product,'$.price') as price,\n"+" json_value(dim_product,'$.book_name') as book_name\n"+"\n"+"from (\n"+" select\n"+"\n"+" order_book_id as order_book_id,\n"+" order_id as order_id,\n"+" book_id as book_id,\n"+" book_number as book_number,\n"+" original_price as original_price,\n"+" actual_price as actual_price,\n"+" discount_price as discount_price,\n"+" create_time as create_time,\n"+" update_time as update_time,\n"+" dim_product_with_versions(concat('dim_book:',cast(book_id as string)),cast(update_time as bigint)) as dim_product\n"+"\n"+" from tbl_order_detail\n"+") tmp\n"+"where json_value(dim_product,'$.price') > 5\n"+";");System.out.println("SQL Explain Plan: ");System.out.println(explainSql);// 定义ETL逻辑,支持订单明细数据源每一条数据查询Redis维表获取对应的维度信息 tableEnvironment.executeSql("select\n"+"\n"+" order_book_id,\n"+" order_id,\n"+" book_id,\n"+" json_value(dim_product,'$.price') as price,\n"+" json_value(dim_product,'$.book_name') as book_name\n"+"\n"&
标签: flink sql 数据仓库

本文转载自: https://blog.csdn.net/wen811651208/article/details/138476490
版权归原作者 GawynKing 所有, 如有侵权,请联系我们删除。

“FlinkSQL优化器查询重写技术引发UDF翻倍调用问题分析及解决方案”的评论:

还没有评论