springboot集成flink,写代码学习flink,集成步骤如下:
1、maven引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
2、配置文件配置相关参数:
# Flink配置
flink.jobmanager.host=localhost
flink.jobmanager.port=6123
flink.parallelism=1
3、写测试类,代码如下 :
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import java.util.Random;
public class Demo {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Thread.sleep(10);
long timestamp = System.currentTimeMillis() - random.nextInt(5) * 1000;
String str = "key" + random.nextInt(10) + "," + timestamp;
ctx.collectWithTimestamp(str, timestamp);
ctx.emitWatermark(new Watermark(timestamp));
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// 将数据源解析成二元组(key, timestamp)
DataStream<Tuple2<String, Long>> parsedStream = stream.map((String line) -> {
String[] parts = line.split(",");
return new Tuple2<>((String)parts[0], Long.parseLong(parts[1]));
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 设置事件时间和水位线
DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = parsedStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
// 按键值进行分组
KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);
// 每5秒钟统计最近一分钟的数据(使用滚动时间窗口)
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
// 进行聚合计算
DataStream<Tuple2<String, Long>> resultStream = windowedStream
.reduce((Tuple2<String, Long> v1, Tuple2<String, Long> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1));
// 输出结果
resultStream.print();
// 启动作业
env.execute("Demo");
}
}
标签:
spring boot
flink
本文转载自: https://blog.csdn.net/u013558123/article/details/131044753
版权归原作者 香至-人生万事须自为,跬步江山即寥廓。 所有, 如有侵权,请联系我们删除。
版权归原作者 香至-人生万事须自为,跬步江山即寥廓。 所有, 如有侵权,请联系我们删除。