0


FlinkToMySql两阶段提交

版本

Flink1.10

实现

  1. 继承TwoPhaseCommitSinkFunction
  2. 实现invoke()、beginTransaction()、preCommit()、commit()、abort()等方法

流程

beginTransaction---->preCommit,同时开启下一次beginTransaction---->commit---->preCommit,同时开启下一次beginTransaction---->commit。。。。。。
大概理解是下边这个流程
beginTransaction,invoke,preCommit,commit(第一次checkpoint)
---------------------------------------------------->beginTransaction,invoke,preCommit,commit(第二次checkpoint)
---------------------------------------------------------------------------------------------------------->beginTransaction,invoke,preCommit,commit(第三次checkpoint)

注意:
1、如果commit期间有异常,则重试commit方法。
2、如果invoke期间有异常,则回滚到上一个checkpoint,执行一下上一个checkpoint的commit方法(很奇怪,没理解这里),然后进行重试后续的数据。

代码测试

publicabstractclassMyTwoPhaseCommitSinkFunction<IN>extendsTwoPhaseCommitSinkFunction<IN,List<IN>,Void>{publicMyTwoPhaseCommitSinkFunction(){super((TypeSerializer<List<IN>>)newKryoSerializer<IN>((Class<IN>)List.class,newExecutionConfig()),VoidSerializer.INSTANCE);}@Overrideprotectedvoidinvoke(List<IN> list,IN value,Context context)throwsException{System.err.println("invoke开始......"+list);
        list.add(value);System.err.println("invoke结束......"+list);}@OverrideprotectedList<IN>beginTransaction()throwsException{System.err.println("beginTransaction......");returnnewArrayList<>();}@OverrideprotectedvoidpreCommit(List<IN> list)throwsException{System.err.println("preCommit......"+list);}@Overrideprotectedvoidabort(List<IN> list){System.err.println("abort......"+list);}}
publicclassApp{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(newFsStateBackend("file:///C:/Jian_Coding/Flink/Study/checkpoint"));SingleOutputStreamOperator<String> sourceDS = env.addSource(MyKafkaUtil.getKafkaSource()).name("kafkaSourceFun");
        sourceDS.addSink(newMyTwoPhaseCommitSinkFunction<String>(){@Overrideprotectedvoidcommit(List<String> list){/*                long l = System.currentTimeMillis();
                if(l%2==0){
                    int i = 1/0;
                }*/System.err.println("commit......"+list);}});
        env.execute();}}
标签: java 开发语言

本文转载自: https://blog.csdn.net/qq_42009405/article/details/125053461
版权归原作者 今天好好洗头了嘛 所有, 如有侵权,请联系我们删除。

“FlinkToMySql两阶段提交”的评论:

还没有评论