1、概述
1)作用
flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素
2)使用
1.匿名内部类
2.lambda表达式
3.实现FlatMapFunction接口
4.继承RichFlatMapFunction
2、代码实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class MyFlatmapDemo {
public static void main(String[] args) throws Exception {
// 创建执行环境的配置,添加webUI的端口号
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 从端口接入数据
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
// 1、匿名内部类
SingleOutputStreamOperator<String> flatMapStream1 = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});
// 2、lambda表达式
SingleOutputStreamOperator<String> flatMapStream2 = lines.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
}).returns(String.class);
// 3、继承 FlatmapFunction
SingleOutputStreamOperator<String> flatMapStream3 = lines.flatMap(new MyFlatmapFunc());
// 4、继承 RichFlatMapFunction
SingleOutputStreamOperator<String> flatMapStream4 = lines.flatMap(new MyRichFlatMapFunc());
flatMapStream1.print();
flatMapStream2.print();
flatMapStream3.print();
flatMapStream4.print();
env.execute();
}
}
class MyFlatmapFunc implements FlatMapFunction<String,String>{
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("可以在rich方法中创建状态和定时器");
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
3、执行结果
1)输入测试数据
nc -lk 8888
hello world
控制台输出执行结果
3> hello
3> world
5> hello
5> world
8> hello
8> world
7> hello
7> world
本文转载自: https://blog.csdn.net/m0_50186249/article/details/122312668
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。