一、并行度
设置并行度的方式:
1、在代码中设置:env.setParallelism(并行度数量) (优先级高,会将代码并行度定死)
2、在提交任务是通过参数设置 -p (推荐使用)如下例:
flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717039073374_0004 -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
3、在配置文件中统一设置
4、每一个算子可以单独设置并行度
flink共享资源:
1、flink需要资源的数量和task数量无关
2、一个并行度对应一个资源(slot)
3、上游task的下游task共享同一个资源
图解:
图中所示,共有5个task,并行度为2
如果我们按照一个槽位一个task的情况部署的话TaskManager部署图如下:
这样需要五个槽位,很浪费资源,所以就需要共享资源槽(上游的task与下游的task可以在同一个槽共用资源)减少资源浪费,充分利用资源,如下图所示:
并行度设置原则:
1、实时计算的任务并行度取决于数据的吞吐量
2、聚合计算(有shuffle)的代码一个并行度大概一秒可以处理10000条数据左右
3、非聚合计算是,一个并行度大概一秒可以处理10万条左右
二、事件时间
数据中有一个时间字段,使用数据的时间字段触发计算,代替真实的时间,可以反应数据真实发生的顺序,计算更有意义。
1、概念与图解
事件时间(Event Time):数据产生的时间
接收时间(Ingestion Time):数据到达flink接收的时间
处理时间(Processing Time):数据被flink算子处理的时间(默认是处理时间)
2、引用原因
flink是分布式处理,可能会出现事件a先进入flink,事件b后进入flink,但是事件b先被处理,而事件b后被处理的情况,从而发送乱序。
3、水位线图解
上图中事件4发生在第二个窗口位于第一个窗口结束时间之后,此时事件4将会被遗漏,为了防止此类情况发生,可以将水位线前移解决,如下图所示。
此时水位线还位于第一个窗口,当水位线位于10s时第一个窗口才会结束。
水位线对齐图解:
当数据是多并行时,下游算子收到上游多个水位线的时候取小的,当上游水位线相同时结束窗口。
4、代码实现EventTime案例
事件数据如下:
java,1685433130000
java,1685433131000
java,1685433132000
java,1685433134000
java,1685433135000
java,1685433137000
java,1685433139000
第一步,解析数据:
DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {
String[] split = line.split(",");
String word = split[0];
long ts = Long.parseLong(split[1]);
return Tuple2.of(word, ts);
}, Types.TUPLE(Types.STRING, Types.LONG));
第二步,指定时间字段与水位线:
DataStream<Tuple2<String, Long>> assDS = tsDS
.assignTimestampsAndWatermarks(
WatermarkStrategy
//指定水位线生产策略,水位线等于最新一条数据的时间戳,如果数据乱序可能会丢失数据
//.<Tuple2<String, Long>>forMonotonousTimestamps()
//水位线前移时间(数据最大乱序时间)
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//指定时间字段
.withTimestampAssigner((event, ts) -> event.f1)
);
第三步,完成需求每隔五秒统计单词数量(设计滚动窗口):
DataStream<Tuple2<String, Integer>> kvDS = assDS
.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
.keyBy(kv -> kv.f0);
//TumblingEventTimeWindows:滚动的事件时间窗口
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
版权归原作者 巡|山 所有, 如有侵权,请联系我们删除。