0


Flink 维表异步查询的实现以及问题排查

背景

本文基于Flink 1.13.3
Flink计算引擎VVR版本的hbase Connector
具体maven依赖如下:

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-cloudhbase</artifactId>
    <version>1.13-vvr-4.0.7</version>
</dependency>

在基于VVR版本的cloudHbase维表查询的时候,发现同步查询的速度很慢,所以我们打算做基于异步的维表查询。

在运行的过程中发现了NPE问题,具体的报错堆栈如下:

2022-06-08 15:01:05
java.lang.Exception: Could not complete the stream element: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@d8014011.
    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382)
    at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253)
    at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275)
    at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61)
    at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121)
    at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219)
    at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48)
    at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
    at TableCalcMapFunction$8.flatMap(Unknown Source)
    at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
    ... 15 more

先说结论

  • Flink计算引擎VVR版本的hbase Connector把hbase的数据转化为RowData的时候存在多线程问题,这种会导致NPE问题
  • 相比Asynchronous I/O for External Data Access 的实现,我们不需要实现RichAsyncFunction类的asyncInvoke方法,只需要实现*eval(CompletableFuture<Collection> future, RowData rowData)*方法即可,因为flink做在codegen的时候做封装

分析

  • 初始定位 定位到 AsyncLookupJoinWithCalcRunner 119行如下:
@Override
        public void complete(Collection<RowData> result) {
            if (result == null || result.size() == 0) {
                joinConditionResultFuture.complete(result);
            } else {
                for (RowData row : result) {
                    try {
                        calc.flatMap(row, calcCollector);
                    } catch (Exception e) {
                        joinConditionResultFuture.completeExceptionally(e);
                    }
                }
                joinConditionResultFuture.complete(calcCollector.collection);
            }
        }

起初的时候是怀疑 calc是null,后来经过排查不是此问题.

  • 再定位 重新定位到自己实现的类 AsyncHBaseLRURowFetcher 223行,如下:
       RowData rowData = readHelper.convertToRow(result);
           if (cache != null) {
               resultFuture.complete(Collections.singletonList(rowData));
               cache.put(rowKey, rowData);
           } else {
               resultFuture.complete(Collections.singletonList(rowData));
           }

经过测试发现rowData数据居然为null,也就是说 传给calc.flatMap(row, calcCollector) 中的row是null,难道calc没有对null做处理么?(calc 是flink codegen出来的对象)。我们把codegen的代码打印出来如下:

public class TableCalcMapFunction$8
          extends org.apache.flink.api.common.functions.RichFlatMapFunction {

        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6;
        org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);

        public TableCalcMapFunction$8(Object[] references) throws Exception {
          typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
        }

        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
          
        }

        @Override
        public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
          
          org.apache.flink.table.data.binary.BinaryStringData field$5;
          boolean isNull$5;
          org.apache.flink.table.data.binary.BinaryStringData field$7;
          
          isNull$5 = in1.isNullAt(0);
          field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$5) {
            field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
          }
          field$7 = field$5;
          if (!isNull$5) {
            field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7));
          }

          if (isNull$5) {
            out.setField(0, null);
          } else {
            out.setField(0, field$7);
          }
                    
          c.collect(out);
        }

        @Override
        public void close() throws Exception {
          
        }
        
      }
    

还真是没有对null进行处理,难道是Flink的codegen的实现有问题?不是的。
再次找到 RowData rowData = readHelper.convertToRow(result) 这段代码,发现该方法存在多线程问题,如下:

this.rowDataGenerator.start();
...

return this.rowDataGenerator.end();

this.rowDataGenerator.start()的实现如下:

this.rowData = genericRowData()

其中this.rowDataGenerator.end()内部实现如下:

 GenericRowData localRowData = this.rowData;
 this.rowData = null;
 return localRowData;

对同一个rowData对象进行操作,这显然在多线程环境下是有问题的。

所以说这块代码进行修改,每次都重新创建对象,即可规避这个问题,也解决了NPE问题。

额外话题

对于Flink SQL在内部是怎么实现异步操作呢?如果按照Asynchronous I/O for External Data Access,我们是应该继承RichAsyncFunction类从而实现asyncInvoke方法,然而我们的实现仅仅是*eval(CompletableFuture<Collection> future, RowData rowData)*方法。

这还是得从AsyncLookupJoinRunner 这个类说起,

public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> {
...
private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
...
 public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
        FunctionUtils.openFunction(fetcher, parameters);
'''
 @Override
    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
        JoinedRowResultFuture outResultFuture = resultFutureBuffer.take();
        // the input row is copied when object reuse in AsyncWaitOperator
        outResultFuture.reset(input, resultFuture);

        // fetcher has copied the input field when object reuse is enabled
        fetcher.asyncInvoke(input, outResultFuture);
    }

其中generatedFetcher这个就是codegen生成的代码,我们打印generatedFetcher中的code,如下:

public class LookupFunction$3
          extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction {

        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;
        private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3;

        public LookupFunction$3(Object[] references) throws Exception {
          typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1]));
        }

        

        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
          
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
                 
        }

        @Override
        public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
          
          org.apache.flink.table.data.binary.BinaryStringData field$0;
          boolean isNull$0;
          org.apache.flink.table.data.binary.BinaryStringData field$2;
          
          isNull$0 = in1.isNullAt(2);
          field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$0) {
            field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2));
          }
          field$2 = field$0;
          if (!isNull$0) {
            field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
          }
          
          if (isNull$0) {
            c.complete(java.util.Collections.emptyList());
          } else {
            org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
            function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
              delegates.getCompletableFuture(),
              isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
          }
          
        }

        @Override
        public void close() throws Exception {
          
          function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close();
                 
        }

      }

可以看到:

org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
            function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
              delegates.getCompletableFuture(),
              isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
  • 用到了DelegatingResultFuture类作为CompletableFuture到ResultFuture类的转换,所以我们的自己实现的方法签名是CompletableFuture
  • function_org a p a c h e apache apacheflink c o n n e c t o r connector connectorxx c l o u d h b a s e cloudhbase cloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 是我们自己写类的对象(要继承自AsyncTableFunction类),这样就只需要实现一个*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
标签: flink big data java

本文转载自: https://blog.csdn.net/monkeyboy_tech/article/details/125226486
版权归原作者 鸿乃江边鸟 所有, 如有侵权,请联系我们删除。

“Flink 维表异步查询的实现以及问题排查”的评论:

还没有评论