1、概述
1)作用
自定义多并行的Source,即Source的并行度可以是1到多个。
2)实现
1.继承RichParallelSourceFunction,重写run()方法。
2、代码实现
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
public class CustomerParallelSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> nums = env.addSource(new ParallelSourceFunc());
System.out.println("自定义ParallelSourceFunc得到的DataStream的并行度为:" + nums.getParallelism());
nums.print();
env.execute();
}
private static class ParallelSourceFunc extends RichParallelSourceFunction<String> {
private boolean flag = true;
public ParallelSourceFunc() {
System.out.println("构造方法执行了!!!!!!!!!!!");
}
/**
* 先调用Open方法
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(indexOfThisSubtask + ": Open方法被调用了");
}
/**
* open方法调用完后再调用run方法
* run方法task启动后会执行一次
* 如果run方法一直不退出,就是一个无限的数据流
* 如果数据读取完了,run方法退出,就是一个有限的数据流,Source退出,job也停止了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(indexOfThisSubtask + " : Run方法被调用了");
Random random = new Random();
//获取当前SubTask的Index
while (flag) {
int i = random.nextInt(100);
ctx.collect(indexOfThisSubtask + " --> " + i);
Thread.sleep(1000);
}
}
/**
* task cancel会执行一次
*/
@Override
public void cancel() {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(indexOfThisSubtask + " : Cancel方法被调用了~~~~~");
flag = false;
}
/**
* 如果人为将job cancel先调用cancel方法再调用close方法
* 如果没有将job人为的cancel,任务停掉前一定会调用close方法
*
* @throws Exception
*/
@Override
public void close() throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(indexOfThisSubtask + " : Close方法被调用了");
}
}
}
本文转载自: https://blog.csdn.net/m0_50186249/article/details/122156015
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。