0


十八、Flink自定义多并行Source

1、概述

1)作用

自定义多并行的Source,即Source的并行度可以是1到多个。

2)实现

1.继承RichParallelSourceFunction,重写run()方法。

2、代码实现
  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  5. import java.util.Random;
  6. public class CustomerParallelSource {
  7. public static void main(String[] args) throws Exception {
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. DataStreamSource<String> nums = env.addSource(new ParallelSourceFunc());
  10. System.out.println("自定义ParallelSourceFunc得到的DataStream的并行度为:" + nums.getParallelism());
  11. nums.print();
  12. env.execute();
  13. }
  14. private static class ParallelSourceFunc extends RichParallelSourceFunction<String> {
  15. private boolean flag = true;
  16. public ParallelSourceFunc() {
  17. System.out.println("构造方法执行了!!!!!!!!!!!");
  18. }
  19. /**
  20. * 先调用Open方法
  21. *
  22. * @param parameters
  23. * @throws Exception
  24. */
  25. @Override
  26. public void open(Configuration parameters) throws Exception {
  27. int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
  28. System.out.println(indexOfThisSubtask + ": Open方法被调用了");
  29. }
  30. /**
  31. * open方法调用完后再调用run方法
  32. * run方法task启动后会执行一次
  33. * 如果run方法一直不退出,就是一个无限的数据流
  34. * 如果数据读取完了,run方法退出,就是一个有限的数据流,Source退出,job也停止了
  35. *
  36. * @param ctx
  37. * @throws Exception
  38. */
  39. @Override
  40. public void run(SourceContext<String> ctx) throws Exception {
  41. int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
  42. System.out.println(indexOfThisSubtask + " : Run方法被调用了");
  43. Random random = new Random();
  44. //获取当前SubTask的Index
  45. while (flag) {
  46. int i = random.nextInt(100);
  47. ctx.collect(indexOfThisSubtask + " --> " + i);
  48. Thread.sleep(1000);
  49. }
  50. }
  51. /**
  52. * task cancel会执行一次
  53. */
  54. @Override
  55. public void cancel() {
  56. int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
  57. System.out.println(indexOfThisSubtask + " : Cancel方法被调用了~~~~~");
  58. flag = false;
  59. }
  60. /**
  61. * 如果人为将job cancel先调用cancel方法再调用close方法
  62. * 如果没有将job人为的cancel,任务停掉前一定会调用close方法
  63. *
  64. * @throws Exception
  65. */
  66. @Override
  67. public void close() throws Exception {
  68. int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
  69. System.out.println(indexOfThisSubtask + " : Close方法被调用了");
  70. }
  71. }
  72. }
标签: flink apache big data

本文转载自: https://blog.csdn.net/m0_50186249/article/details/122156015
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。

“十八、Flink自定义多并行Source”的评论:

还没有评论