0


Flink1.14.3流批一体体验

前言

Flink自从1.10就喊着要搞流批一体,据说1.14是个里程碑,特意体验下。

变化

DataSet消失

笔者隐约记得,Flink1.8老版本和Spark很像,同样分Stream流处理和DataSet批处理。新版本中:

package com.zhiyong.flinkStudy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.operators.Order;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.SortPartitionOperator;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;publicclassFlinkDatasetDemo1{publicstaticvoidmain(String[] args)throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.fromElements("hehe","haha","哈哈","哈哈");//老版本是返回DataSet
        String[] str1 ={"hehe1","haha1","哈哈1","哈哈1"};
        DataSource<String> data1 = env.fromElements(str1);//老版本是返回DataSet

        AggregateOperator<Tuple2<String, Integer>> result = data.flatMap(newFlatMapFunction1()).groupBy(0).sum(1);
        result.print();

        System.out.println("**************************");

        SortPartitionOperator<Tuple2<String, Integer>> result1 = data1.flatMap(newFlatMapFunction2()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING);
        result1.print();}privatestaticclassFlatMapFunction1implementsFlatMapFunction<String, Tuple2<String,Integer>>{@OverridepublicvoidflatMap(String value, Collector<Tuple2<String, Integer>> out)throws Exception {for(String cell : value.split("\\s+")){
                out.collect(Tuple2.of(cell,1));}}}privatestaticclassFlatMapFunction2implementsFlatMapFunction<String, Tuple2<String,Integer>>{@OverridepublicvoidflatMap(String value, Collector<Tuple2<String, Integer>> out)throws Exception {
            String[] split = value.split("\\s+");for(int i =0; i < split.length; i++){
              out.collect(newTuple2<>(split[i],1));}}}}

执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.(hehe,1)(haha,1)(哈哈,2)**************************(哈哈1,2)(hehe1,1)(haha1,1)

Process finished with exit code 0

结果当然是不会有啥变化,但是记忆中的DataSet消失了,变成了DataSource,点进去可以看到:

package org.apache.flink.api.java.operators;import org.apache.flink.annotation.Internal;import org.apache.flink.annotation.Public;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.io.InputFormat;import org.apache.flink.api.common.io.NonParallelInput;import org.apache.flink.api.common.operators.GenericDataSourceBase;import org.apache.flink.api.common.operators.OperatorInformation;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.SplitDataProperties;import org.apache.flink.configuration.Configuration;/**
 * An operation that creates a new data set (data source). The operation acts as the data set on
 * which to apply further transformations. It encapsulates additional configuration parameters, to
 * customize the execution.
 *
 * @param <OUT> The type of the elements produced by this data source.
 */@PublicpublicclassDataSource<OUT>extendsOperator<OUT, DataSource<OUT>>{privatefinal InputFormat<OUT,?> inputFormat;privatefinal String dataSourceLocationName;private Configuration parameters;private SplitDataProperties<OUT> splitDataProperties;// --------------------------------------------------------------------------------------------/**
     * Creates a new data source.
     *
     * @param context The environment in which the data source gets executed.
     * @param inputFormat The input format that the data source executes.
     * @param type The type of the elements produced by this input format.
     */publicDataSource(
            ExecutionEnvironment context,
            InputFormat<OUT,?> inputFormat,
            TypeInformation<OUT> type,
            String dataSourceLocationName){super(context, type);this.dataSourceLocationName = dataSourceLocationName;if(inputFormat == null){thrownewIllegalArgumentException("The input format may not be null.");}this.inputFormat = inputFormat;if(inputFormat instanceofNonParallelInput){this.parallelism =1;}}/**
     * Gets the input format that is executed by this data source.
     *
     * @return The input format that is executed by this data source.
     */@Internalpublic InputFormat<OUT,?>getInputFormat(){returnthis.inputFormat;}/**
     * Pass a configuration to the InputFormat.
     *
     * @param parameters Configuration parameters
     */public DataSource<OUT>withParameters(Configuration parameters){this.parameters = parameters;returnthis;}/** @return Configuration for the InputFormat. */public Configuration getParameters(){returnthis.parameters;}/**
     * Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the {@link
     * org.apache.flink.core.io.InputSplit}s of this DataSource for configurations.
     *
     * <p>SplitDataProperties can help to generate more efficient execution plans.
     *
     * <p><b> IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
     * </b>
     *
     * @return The SplitDataProperties for the InputSplits of this DataSource.
     */@PublicEvolvingpublic SplitDataProperties<OUT>getSplitDataProperties(){if(this.splitDataProperties == null){this.splitDataProperties =newSplitDataProperties<OUT>(this);}returnthis.splitDataProperties;}// --------------------------------------------------------------------------------------------protected GenericDataSourceBase<OUT,?>translateToDataFlow(){
        String name =this.name != null
                        ?this.name
                        :"at "+ dataSourceLocationName
                                +" ("+ inputFormat.getClass().getName()+")";if(name.length()>150){
            name = name.substring(0,150);}@SuppressWarnings({"unchecked","rawtypes"})
        GenericDataSourceBase<OUT,?> source =newGenericDataSourceBase(this.inputFormat,newOperatorInformation<OUT>(getType()), name);
        source.setParallelism(parallelism);if(this.parameters != null){
            source.getParameters().addAll(this.parameters);}if(this.splitDataProperties != null){
            source.setSplitDataProperties(this.splitDataProperties);}return source;}}

继续往下找:

package org.apache.flink.api.java.operators;import org.apache.flink.annotation.Public;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.operators.ResourceSpec;import org.apache.flink.api.common.operators.util.OperatorValidationUtils;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;/**
 * Base class of all operators in the Java API.
 *
 * @param <OUT> The type of the data set produced by this operator.
 * @param <O> The type of the operator, so that we can return it.
 */@PublicpublicabstractclassOperator<OUT, O extendsOperator<OUT, O>>extendsDataSet<OUT>{}

接着往下找:

package org.apache.flink.api.java;import org.apache.flink.annotation.Public;import 省略中间的。。。。。。。。。。。。
import org.apache.flink.util.Preconditions;import java.io.IOException;import java.util.ArrayList;import java.util.List;/**
 * A DataSet represents a collection of elements of the same type.
 *
 * <p>A DataSet can be transformed into another DataSet by applying a transformation as for example
 *
 * <ul>
 *   <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},
 *   <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},
 *   <li>{@link DataSet#join(DataSet)}, or
 *   <li>{@link DataSet#coGroup(DataSet)}.
 * </ul>
 *
 * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
 */@PublicpublicabstractclassDataSet<T>{}

新版本已经废弃了直接操作DataSet,使用船新的DataSource来做批处理!!!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y9VLVq1C-1647876791422)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220321213641398.png)]
可以看到现在使用的DataSet的实现类Operator的实现类DataSource。

DataStream有了实现类

每秒mock一条数据的数据源:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;import java.util.Random;/**
 * @program: study
 * @description: Flink的WordCount数据源,每秒产生1条数据
 * @author: zhiyong
 * @create: 2022-03-17 00:06
 **/publicclassWordCountSource1psimplementsSourceFunction<String>{privateboolean needRun =true;@Overridepublicvoidrun(SourceContext<String> sourceContext)throws Exception {while(needRun){
            ArrayList<String> result =newArrayList<>();for(int i =0; i <20; i++){
                result.add("zhiyong"+i);}
            sourceContext.collect(result.get(newRandom().nextInt(20)));
            Thread.sleep(1000);}}@Overridepublicvoidcancel(){
        needRun =false;}}

DataStream程序:

package com.zhiyong.flinkStudy;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.typeutils.TypeSerializer;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.windows.Window;import org.apache.flink.util.Collector;import java.util.Collection;/**
 * @program: study
 * @description: Flink的DataStreamDemo
 * @author: zhiyong
 * @create: 2022-03-17 00:06
 **/publicclassFlinkDataStreamDemo1{publicstaticvoidmain(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);//防止报网络资源不充分的错

        SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(newKeySelector<Tuple2<String, Integer>, Object>(){@Overridepublic Object getKey(Tuple2<String, Integer> value)throws Exception {return value.f0;}}).sum(1);

        DataStream<Tuple2<String, Integer>> result2 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(0)// 已经过时的方法.sum(1);//        SingleOutputStreamOperator<Tuple2<String, Integer>> result3 = env.addSource(new WordCountSource1ps())//                .flatMap(new FlatMapFunction1())//                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {//                    @Override//                    public Object getKey(Tuple2<String, Integer> value) throws Exception {//                        return value.f0;//                    }//                })//                .window(new WindowAssigner<Tuple2<String, Integer>, Window>() {//                    @Override//                    public Collection<Window> assignWindows(Tuple2<String, Integer> element, long timestamp, WindowAssignerContext context) {//                        return null;//                    }////                    @Override//                    public Trigger<Tuple2<String, Integer>, Window> getDefaultTrigger(StreamExecutionEnvironment env) {//                        return null;//                    }////                    @Override//                    public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {//                        return null;//                    }////                    @Override//                    public boolean isEventTime() {//                        return false;//                    }//                })//                .sum(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result4 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(0)// keyBy已经过时的方法.timeWindow(Time.seconds(30))// timeWindow已经过时的方法.sum(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result5 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(newKeySelector<Tuple2<String, Integer>, Object>(){@Overridepublic Object getKey(Tuple2<String, Integer> value)throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);//result1.print();//result2.print();//result3.print();//result4.print();
        result5.print();
        env.execute("有这句才能执行任务,没有这句会Process finished with exit code 0直接结束");}publicstaticclassFlatMapFunction1implementsFlatMapFunction<String, Tuple2<String, Integer>>{@OverridepublicvoidflatMap(String value, Collector<Tuple2<String, Integer>> out)throws Exception {for(String cell : value.split("\\s+")){
                out.collect(Tuple2.of(cell,1));}}}}

result1.print()执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.10>(zhiyong16,1)30>(zhiyong7,1)32>(zhiyong14,1)33>(zhiyong3,1)29>(zhiyong12,1)2>(zhiyong15,1)10>(zhiyong16,2)2>(zhiyong15,2)10>(zhiyong16,3)30>(zhiyong7,2)17>(zhiyong18,1)30>(zhiyong7,3)35>(zhiyong19,1)35>(zhiyong19,2)4>(zhiyong4,1)18>(zhiyong5,1)35>(zhiyong8,1)18>(zhiyong5,2)25>(zhiyong11,1)23>(zhiyong2,1)23>(zhiyong2,2)25>(zhiyong11,2)35>(zhiyong8,2)18>(zhiyong5,3)35>(zhiyong19,3)35>(zhiyong8,3)33>(zhiyong3,2)35>(zhiyong19,4)35>(zhiyong8,4)35>(zhiyong8,5)4>(zhiyong0,1)23>(zhiyong2,3)32>(zhiyong14,2)10>(zhiyong10,1)25>(zhiyong11,3)35>(zhiyong17,1)35>(zhiyong17,2)32>(zhiyong14,3)

Process finished with exit code 130

SingleOutputStreamOperator这种新类:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x41eAtuW-1647876791424)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317002947550.png)]

官方介绍:

SingleOutputStreamOperator represents a user defined transformation applied on a DataStream with one predefined output type.
Type parameters:<T> – The type of the elements in this stream.

显然这是一种DataStream的继承类。

如果遇到了报错:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
    at akka.dispatch.OnComplete.internal(Future.scala:300)
    at akka.dispatch.OnComplete.internal(Future.scala:297)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
    at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
    at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)...4 more
Caused by: java.io.IOException: Insufficient number of network buffers: required 37, but only 3 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction','taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:386)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:364)
    at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:279)
    at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:151)
    at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:664)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)

Process finished with exit code 1

这是因为网络资源不充分,最简单的方式就是设置并行度来降低网络要求:

env.setParallelism(1);

之后result2.print()可以正常输出:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.(zhiyong3,1)(zhiyong0,1)(zhiyong14,1)(zhiyong3,2)(zhiyong18,1)(zhiyong1,1)(zhiyong13,1)(zhiyong19,1)(zhiyong13,2)(zhiyong4,1)(zhiyong3,3)(zhiyong9,1)(zhiyong0,2)(zhiyong12,1)(zhiyong10,1)(zhiyong6,1)(zhiyong19,2)(zhiyong18,2)(zhiyong15,1)(zhiyong6,2)(zhiyong4,2)(zhiyong16,1)(zhiyong15,2)(zhiyong6,3)(zhiyong10,2)(zhiyong4,3)

Process finished with exit code 130

继承类直接手动强转为父类,调用父类的方法一般不会有啥毛病。

再来试试DataStream的窗口:

SingleOutputStreamOperator<Tuple2<String, Integer>> result4 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(0)// keyBy已经过时的方法.timeWindow(Time.seconds(30))// timeWindow已经过时的方法.sum(1);

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XUWQDA9U-1647876791425)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317224544571.png)]

虽然还能用,但是已经是过时的方法,点进去看timeWindow:

/**
     * Windows this {@code KeyedStream} into tumbling time windows.
     *
     * <p>This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or {@code
     * .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic set
     * using {@link
     * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
     *
     * @param size The size of the window.
     * @deprecated Please use {@link #window(WindowAssigner)} with either {@link
     *     TumblingEventTimeWindows} or {@link TumblingProcessingTimeWindows}. For more information,
     *     see the deprecation notice on {@link TimeCharacteristic}
     */@Deprecatedpublic WindowedStream<T, KEY, TimeWindow>timeWindow(Time size){if(environment.getStreamTimeCharacteristic()== TimeCharacteristic.ProcessingTime){returnwindow(TumblingProcessingTimeWindows.of(size));}else{returnwindow(TumblingEventTimeWindows.of(size));}}

显然是不推荐继续使用timeWindow算子了。

result3的window算子直接new的原生WindowAssigner对象用起来显然是有点复杂,源码也写了可以使用.window(TumblingEventTimeWindows.of(size))或者.window(TumblingProcessingTimeWindows.of(size)),即使用滚动的时间时间窗口或者滚动的处理时间窗口。

点到WindowAssigner看到:

package org.apache.flink.streaming.api.windowing.assigners;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.typeutils.TypeSerializer;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.windows.Window;import java.io.Serializable;import java.util.Collection;/**
 * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane. When a
 * {@link Trigger} decides that a certain pane should fire the {@link
 * org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied to produce output
 * elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */@PublicEvolvingpublicabstractclassWindowAssigner<T, W extendsWindow>implementsSerializable{privatestaticfinallong serialVersionUID =1L;/**
     * Returns a {@code Collection} of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The {@link WindowAssignerContext} in which the assigner operates.
     */publicabstract Collection<W>assignWindows(
            T element,long timestamp, WindowAssignerContext context);/** Returns the default trigger associated with this {@code WindowAssigner}. */publicabstract Trigger<T, W>getDefaultTrigger(StreamExecutionEnvironment env);/**
     * Returns a {@link TypeSerializer} for serializing windows that are assigned by this {@code
     * WindowAssigner}.
     */publicabstract TypeSerializer<W>getWindowSerializer(ExecutionConfig executionConfig);/**
     * Returns {@code true} if elements are assigned to windows based on event time, {@code false}
     * otherwise.
     */publicabstractbooleanisEventTime();/**
     * A context provided to the {@link WindowAssigner} that allows it to query the current
     * processing time.
     *
     * <p>This is provided to the assigner by its containing {@link
     * org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, which, in turn, gets
     * it from the containing {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
     */publicabstractstaticclassWindowAssignerContext{/** Returns the current processing time. */publicabstractlonggetCurrentProcessingTime();}}

父类WindowAssigner有子类:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C8YhSxjZ-1647876791426)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317225928735.png)]

除了2种滚动窗口,当然还有2种滑动窗口。

简单使用下滑动窗口:

SingleOutputStreamOperator<Tuple2<String, Integer>> result5 = env.addSource(newWordCountSource1ps()).flatMap(newFlatMapFunction1()).keyBy(newKeySelector<Tuple2<String, Integer>, Object>(){@Overridepublic Object getKey(Tuple2<String, Integer> value)throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);

输出:

(zhiyong17,1)(zhiyong17,1)(zhiyong7,1)(zhiyong15,1)(zhiyong19,1)(zhiyong4,2)(zhiyong4,2)(zhiyong0,1)(zhiyong15,1)(zhiyong7,1)(zhiyong9,2)(zhiyong18,1)(zhiyong19,1)(zhiyong8,1)

Process finished with exit code 130

算子API变化了很多,过时的老API目前也还能凑合着用,以后肯定是要慢慢习惯新API的,老API搞不好哪个版本就不能用了。

DSL(Table API)更新

构造执行环境的设置对象时发现嘴强王者的BlinkPlanner居然作废了!!!

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()// useBlinkPlanner()已过期.inStreamingMode().build();

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sDMaYXoI-1647876791427)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318001030442.png)]

点进去发现源码写着:

/**
 * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
 *     planner left (previously called the 'blink' planner), this setting will throw an
 *     exception.
 */@Deprecatedpublic Builder useOldPlanner(){thrownewTableException("The old planner has been removed in Flink 1.14. "+"Please upgrade your table program to use the default "+"planner (previously called the 'blink' planner).");}/**
 * Sets the Blink planner as the required module.
 *
 * <p>This is the default behavior.
 *
 * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
 *     planner left (previously called the 'blink' planner), this setting is obsolete and
 *     will be removed in future versions.
 */@Deprecatedpublic Builder useBlinkPlanner(){returnthis;}

好家伙,这2个Planner都要废弃了。

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;importstatic org.apache.flink.table.api.Expressions.$;//可以使用$("变量名")/**
 * @program: study
 * @description: 使用TableAPI实现流批一体
 * @author: zhiyong
 * @create: 2022-03-17 23:52
 **/publicclassFlinkTableApiDemo1{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Flink1.14不需要设置Planner
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> data = env.addSource(newWordCountSource1ps());

        Table table1 = tableEnv.fromDataStream(data,"word");//过时
        Table table1_1 = table1.where($("word").like("%5%"));
        System.out.println("tableEnv.explain(table1_1) = "+ tableEnv.explain(table1_1));//过时

        tableEnv.toAppendStream(table1_1, Row.class).print("table1_1");//过时
        System.out.println("env.getExecutionPlan() = "+ env.getExecutionPlan());
        env.execute();}}

执行后:

tableEnv.explain(table1_1)=== Abstract Syntax Tree ==LogicalFilter(condition=[LIKE($0, _UTF-16LE'%5%')])+-LogicalTableScan(table=[[Unregistered_DataStream_1]])== Optimized Physical Plan ==Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])+-DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])== Optimized Execution Plan ==Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])+-DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])

env.getExecutionPlan()={"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":4,"type":"SourceConversion(table=[Unregistered_DataStream_1], fields=[word])","pact":"Operator","contents":"SourceConversion(table=[Unregistered_DataStream_1], fields=[word])","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])","pact":"Operator","contents":"Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]},{"id":6,"type":"SinkConversionToRow","pact":"Operator","contents":"SinkConversionToRow","parallelism":1,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]},{"id":7,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":36,"predecessors":[{"id":6,"ship_strategy":"REBALANCE","side":"second"}]}]}
table1_1:8>+I[zhiyong5]
table1_1:9>+I[zhiyong5]
table1_1:10>+I[zhiyong5]
table1_1:11>+I[zhiyong15]
table1_1:12>+I[zhiyong15]

Process finished with exit code 130

浏览器:

https://flink.apache.org/visualizer/

将上方打印出的JSON字符串粘贴到网站的文本框,点Draw:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sW1sMumo-1647876791428)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003647976.png)]

可以看到DAG图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WU40bgUt-1647876791429)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003736080.png)]

虽然可以正常使用Table API,但是过时方法太多了:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XnVRBq38-1647876791429)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003843589.png)]

例如过时方法fromDataStream:

/**
 * Converts the given {@link DataStream} into a {@link Table} with specified field names.
 *
 * <p>There are two modes for mapping original fields to the fields of the {@link Table}:
 *
 * <p>1. Reference input fields by name: All fields in the schema definition are referenced by
 * name (and possibly renamed using an alias (as). Moreover, we can define proctime and rowtime
 * attributes at arbitrary positions using arbitrary names (except those that exist in the
 * result schema). In this mode, fields can be reordered and projected out. This mode can be
 * used for any input type, including POJOs.
 *
 * <p>Example:
 *
 * <pre>{@code
 * DataStream<Tuple2<String, Long>> stream = ...
 * // reorder the fields, rename the original 'f0' field to 'name' and add event-time
 * // attribute named 'rowtime'
 * Table table = tableEnv.fromDataStream(stream, "f1, rowtime.rowtime, f0 as 'name'");
 * }</pre>
 *
 * <p>2. Reference input fields by position: In this mode, fields are simply renamed. Event-time
 * attributes can replace the field on their position in the input data (if it is of correct
 * type) or be appended at the end. Proctime attributes must be appended at the end. This mode
 * can only be used if the input type has a defined field order (tuple, case class, Row) and
 * none of the {@code fields} references a field of the input type.
 *
 * <p>Example:
 *
 * <pre>{@code
 * DataStream<Tuple2<String, Long>> stream = ...
 * // rename the original fields to 'a' and 'b' and extract the internally attached timestamp into an event-time
 * // attribute named 'rowtime'
 * Table table = tableEnv.fromDataStream(stream, "a, b, rowtime.rowtime");
 * }</pre>
 *
 * @param dataStream The {@link DataStream} to be converted.
 * @param fields The fields expressions to map original fields of the DataStream to the fields
 *     of the {@link Table}.
 * @param <T> The type of the {@link DataStream}.
 * @return The converted {@link Table}.
 * @deprecated use {@link #fromDataStream(DataStream, Expression...)}
 */@Deprecated<T> Table fromDataStream(DataStream<T> dataStream, String fields);

例如过时方法explain:

/**
 * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
 * the result of the given {@link Table}.
 *
 * @param table The table for which the AST and execution plan will be returned.
 * @deprecated use {@link Table#explain(ExplainDetail...)}.
 */@Deprecated
String explain(Table table);

还有过时方法toAppendStream:

/**
 * Converts the given {@link Table} into an append {@link DataStream} of a specified type.
 *
 * <p>The {@link Table} must only have insert (append) changes. If the {@link Table} is also
 * modified by update or delete changes, the conversion will fail.
 *
 * <p>The fields of the {@link Table} are mapped to {@link DataStream} fields as follows:
 *
 * <ul>
 *   <li>{@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped
 *       by position, field types must match.
 *   <li>POJO {@link DataStream} types: Fields are mapped by field name, field types must match.
 * </ul>
 *
 * @param table The {@link Table} to convert.
 * @param clazz The class of the type of the resulting {@link DataStream}.
 * @param <T> The type of the resulting {@link DataStream}.
 * @return The converted {@link DataStream}.
 * @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type
 *     system and supports all kinds of {@link DataTypes} that the table runtime can produce.
 *     The semantics might be slightly different for raw and structured types. Use {@code
 *     toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should
 *     be used as source of truth.
 */@Deprecated<T> DataStream<T>toAppendStream(Table table, Class<T> clazz);

根据源码API替换为新方法后:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.types.AbstractDataType;import org.apache.flink.table.types.DataType;import org.apache.flink.types.Row;import java.util.ArrayList;import java.util.Arrays;import java.util.List;importstatic org.apache.flink.table.api.Expressions.$;//可以使用$("变量名")/**
 * @program: study
 * @description: 使用TableAPI实现流批一体
 * @author: zhiyong
 * @create: 2022-03-17 23:52
 **/publicclassFlinkTableApiDemo1{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Flink1.14不需要设置Planner
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> data = env.addSource(newWordCountSource1ps());

        System.out.println("***********新方法**************");

        ArrayList<String> strings =newArrayList<>();
        strings.add("f0");//必须写f0
        List<DataType> dataTypes =newArrayList<DataType>();
        dataTypes.add(DataTypes.STRING());
        Schema schema = Schema.newBuilder().fromFields(strings, dataTypes).build();
        List<Schema.UnresolvedColumn> columns = schema.getColumns();for(Schema.UnresolvedColumn column : columns){
            System.out.println("column = "+ column);}

        Table table2 = tableEnv.fromDataStream(data, schema);

        Table table2_1 = table2.where($("f0").like("%5%"));//必须写f0
        System.out.println("table2_1.explain() = "+ table2_1.explain(ExplainDetail.JSON_EXECUTION_PLAN));

        tableEnv.toDataStream(table2_1,Row.class).print("table2_1");

        System.out.println("env.getExecutionPlan() = "+ env.getExecutionPlan());
        env.execute();}}

字段名称必须写【f0】,还没来得及扒源码仔细研究为何是这样。不这么写会报错:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to find a field named 'word' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are:[f0]
    at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromColumn(SchemaTranslator.java:327)
    at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromDeclaredSchema(SchemaTranslator.java:314)
    at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:213)
    at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:158)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromStreamInternal(StreamTableEnvironmentImpl.java:294)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:232)
    at com.zhiyong.flinkStudy.FlinkTableApiDemo1.main(FlinkTableApiDemo1.java:65)

Process finished with exit code 1

DSL(Table API)进行批处理

之前进行了流处理,接下来试试批处理。

由于批处理已经不直接使用DataSet,而是使用DataSource,故如下算子已经消失:

tableEnv.fromDataSet(data1);//老版本Flink中,data1是DataSet的实例对象,该API可以从DataSet创建Table类的实例对象
tableEnv.toDataSet(table1);//老版本Flink中,table1是Table的实例对象,该API可以转出DataSet对象

整个DSL方式如果按照如下方式构建TableEnvironment:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

将会很鸡肋,这样产生的tableEnv实例对象可用方法很少。

但是可以使用如下方式:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;importstatic org.apache.flink.table.api.Expressions.$;/**
 * @program: study
 * @description: Flink使用DSL实现流批一体
 * @author: zhiyong
 * @create: 2022-03-18 01:48
 **/publicclassFlinkTableApiDemo2{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);

        String[] str1 ={"hehe1","haha1","哈哈1","哈哈1"};

        Table table1 = streamTableEnv.fromValues(str1);
        Table table1_1 = table1.where($("f0").like("%h%"));

        DataStream<Row> batchTable1 = streamTableEnv.toDataStream(table1_1);
        batchTable1.print();

        System.out.println("*************************");

        DataStreamSource<String> dataStream2 = env.fromElements(str1);
        Table table2 = streamTableEnv.fromDataStream(dataStream2);
        Table table2_1 = table2.where($("f0").like("%哈%"));
        DataStream<Row> batchTable2 = streamTableEnv.toDataStream(table2_1);
        batchTable2.print();

        env.execute();}}

执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.*************************20>+I[哈哈1]21>+I[哈哈1]20>+I[hehe1]21>+I[haha1]

Process finished with exit code 0

可以发现,Flink1.14.3中,已经可以直接使用流的方式处理批,而不像Flink1.8老版本那样还区分stream和batch。虽然现在还保留了batch的Env及API,但是已经废弃的差不多了,以后可能再也用不上了。事实证明,在Flink1.14.3中,DSL方式的Table API层面已经可以不用做区分,统一转换为DataStream即可。而DataStream也可以不区分是stream环境的Table还是batch环境的Table。

使用DataStream实现流批一体

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
 * @program: study
 * @description: Flink的SQL实现流批一体
 * @author: zhiyong
 * @create: 2022-03-21 22:32
 **/publicclassFlinkSqlApiDemo1{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> data1 = env.addSource(newWordCountSource1ps());

        String inputPath ="E:/study/flink/data/test1";
        DataStreamSource<String> data2 = env.readTextFile(inputPath);

        data1.print("data1");
        data2.print("data2");

        env.execute();}}

执行后:

data1> zhiyong19
data2> 好
data2> 喜欢
data2> 数码宝贝
data2> 宝宝 宝贝
data2> 宝贝 好 喜欢
data2>123
data2>123
data2>123
data2> 哈哈 haha
data2> hehe 呵呵 呵呵 呵呵 呵呵
data2> hehe
data1> zhiyong17
data1> zhiyong7
data1> zhiyong7
data1> zhiyong5
data1> zhiyong11
data1> zhiyong18
data1> zhiyong14
data1> zhiyong13
data1> zhiyong5
data1> zhiyong8

Process finished with exit code 130

可以看出,Flink1.14.3直接使用DataStream即可。不管是批还是流,直接当作流来处理。

使用DSL(Table API)实现流批一体

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;importstatic org.apache.flink.table.api.Expressions.$;/**
 * @program: study
 * @description: Flink的SQL实现流批一体
 * @author: zhiyong
 * @create: 2022-03-21 22:32
 **/publicclassFlinkSqlApiDemo1{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> data1 = env.addSource(newWordCountSource1ps());

        String inputPath ="E:/study/flink/data/test1";
        DataStreamSource<String> data2 = env.readTextFile(inputPath);

        Table streamTable = streamTableEnv.fromDataStream(data1);
        Table batchTable = streamTableEnv.fromDataStream(data2);

        Table streamTable1 = streamTable.where($("f0").like("%2%"));
        Table batchTable1 = batchTable.where($("f0").like("%2%"));

        DataStream<Row> s1 = streamTableEnv.toDataStream(streamTable1);
        DataStream<Row> s2 = streamTableEnv.toDataStream(batchTable1);

        s1.print();
        s2.print();

        env.execute();}}

执行后:

+I[123]+I[123]+I[123]+I[zhiyong2]+I[zhiyong12]+I[zhiyong12]+I[zhiyong12]+I[zhiyong12]+I[zhiyong12]

Process finished with exit code 130

这样我们在使用Flink时,只要运算逻辑一致,就可以使用同一套算子包,不用刻意区分流和批。Flink1.8老版本还需要写2套程序,至少从Flink1.14.3开始,不需要了。代码复用性提高,意味着dev、debug及之后的op工作量大大减少!这一点目前应该是Spark望尘莫及的。

使用SQL实现流批一体

由于SQL是Table的更高层封装,更适合不需要关心平台组件底层实现的业务开发者【也就是俗称的SQL Boy】使用,既然Table层面已经实现了流批一体,那么SQL层面必然也可以实现。

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;importstatic org.apache.flink.table.api.Expressions.$;/**
 * @program: study
 * @description: Flink的SQL实现流批一体
 * @author: zhiyong
 * @create: 2022-03-21 22:32
 **/publicclassFlinkSqlApiDemo1{publicstaticvoidmain(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> data1 = env.addSource(newWordCountSource1ps());

        String inputPath ="E:/study/flink/data/test1";
        DataStreamSource<String> data2 = env.readTextFile(inputPath);

        Table streamTable = streamTableEnv.fromDataStream(data1);
        Table batchTable = streamTableEnv.fromDataStream(data2);

        Table streamTable1 = streamTable.where($("f0").like("%2%"));
        Table batchTable1 = batchTable.where($("f0").like("%2%"));

        Table t1 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm "+ streamTable1);
        Table t2 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm "+ batchTable1);

        DataStream<Row> s1 = streamTableEnv.toDataStream(t1);
        DataStream<Row> s2 = streamTableEnv.toDataStream(t2);

        s1.print();
        s2.print();

        env.execute();}}

执行后:

+I[123]+I[123]+I[123]+I[ZHIYONG2]+I[ZHIYONG2]+I[ZHIYONG2]+I[ZHIYONG12]+I[ZHIYONG12]+I[ZHIYONG12]+I[ZHIYONG12]+I[ZHIYONG12]+I[ZHIYONG2]+I[ZHIYONG12]

Process finished with exit code 130

同样证明,Flink1.14.3中使用同一套API即可实现SQL方式的流批一体。有了SQL层面的流批一体,写业务代码的SQL Boy们就更无需关心底层实现了。技术的发展,总是让业务人员的技术水平越来越低。。。不过这不是坏事。

总结

在Flink1.14.3中,不管是顶层的SQL、次顶层的DSL还是中层的DataStream都可以实现流批一体。SQL调用DSL,DSL调用DataStream,SQL和DSL调用后都是Table对象,而Flink1.14.3中Table和DataStream又可以无缝切换,使用起来灰常方便。较Flink1.8,API大幅变化,可能对SQL Boy们来讲没什么影响,但是对平台及组件二开人员还是造成了一定阻碍,需要再投入时间和精力研习变化。但是从统一了流批API,减少开发、测试、运维工作量来说,付出的代价是值得的。

对懂底层的平台及组件二开人员来说,Flink1.14是个当之无愧的里程碑。至于SQL Boy们,懂也好,不懂也罢。DSL用户不喜欢几千行的SQL,SQL Boy们不喜欢从上到下顺序执行。黑底白斑和白底黑斑的斑马们,可能一时半会儿也不能理解彼此。天之道,不争而善胜。


本文转载自: https://blog.csdn.net/qq_41990268/article/details/123649447
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。

“Flink1.14.3流批一体体验”的评论:

还没有评论