0


5、Flink 的 source、transformations、sink的详细示例(一)

Flink 系列文章

1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例

13、Flink 的table api和sql的介绍、示例等系列综合文章链接


文章目录


本文介绍了source、transformations和sink的基本用法,下一篇将介绍各自的自定义用法。

一、source-transformations-sink示例

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.12.0</flink.version></properties><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>

2、source示例

1)、基于Collection

importjava.util.Arrays;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author alanchan
 *
 */publicclassSource_Collection{/**
     * 一般用于学习测试时使用 
     * 1.env.fromElements(可变参数); 
     * 2.env.fromColletion(各种集合);
     * 3.env.generateSequence(开始,结束); 
     * 4.env.fromSequence(开始,结束);
     * 
     * @param args 基于集合
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("i am alanchan","i like flink");DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan","i like flink"));DataStream<Long> ds3 = env.generateSequence(1,10);//已过期,使用fromSequence方法DataStream<Long> ds4 = env.fromSequence(1,10);// transformation// sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();// execute
        env.execute();}}//运行结果3>310>106>61>19>94>42>27>75>514>114>214>314>414>514>614>711>811>911>108>811> i am alanchan
12> i like flink
10> i like flink
9> i am alanchan

2)、基于文件

//自己创建测试的文件,内容都会如实的展示了,故本示例不再提供importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author alanchan
 *
 */publicclassSource_File{/**
     * env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以
     * 
     * 
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");// transformation// sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();// execute
        env.execute();}}// 运行结果//单个文件8> i am alanchan
12> i like flink
1> and you ?//目录15>1,"英文",901>2,"数学",7011> and you ?8> i am alanchan
9> i like flink
3>3,"英文",8613>1,"数学",905>1,"语文",50//tar.gz12> i am alanchan
12> i like flink
12> and you ?//hdfs6>(hive,2)10>(flink,4)15>(hadoop,3)

3)、基于socket

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *         在192.168.10.42上使用nc -lk 9999 向指定端口发送数据
 *         nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 
 *         如果没有该命令可以下安装 yum install -y nc
 *         
 */publicclassSource_Socket{/**
     * @param args
     * @throws Exception 
     */publicstaticvoidmain(String[] args)throwsException{//envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);//transformation//sink
        lines.print();//execute
        env.execute();}}

验证步骤:
1、先启动该程序
2、192.168.10.42上输入
在这里插入图片描述
3、观察应用程序的控制台输出
在这里插入图片描述

3、transformations示例

对流数据中的单词进行统计,排除敏感词vx:alanchanchn

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTransformationDemo{/**
     * @param args
     * @throws Exception 
     */publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformationDataStream<String> words = lines.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{String[] arr = value.split(",");for(String word : arr){
                    out.collect(word);}}});DataStream<String> filted = words.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String value)throwsException{return!value.equals("vx:alanchanchn");// 如果是vx:alanchanchn则返回false表示过滤掉}});SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = filted
                .map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{returnTuple2.of(value,1);}});KeyedStream<Tuple2<String,Integer>,String> grouped = wordAndOne.keyBy(t -> t.f0);SingleOutputStreamOperator<Tuple2<String,Integer>> result = grouped.sum(1);// sink
        result.print();// execute
        env.execute();}}

验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、在192.168.10.42中输入测试数据,如下
在这里插入图片描述
4、观察应用程序的控制台,截图如下
在这里插入图片描述

4、sink示例

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author alanchan 
 */publicclassSinkDemo{/**
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");System.setProperty("HADOOP_USER_NAME","alanchan");// transformation// sink//        ds.print();//        ds.print("输出标识");// 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件

        ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(1);// execute
        env.execute();}}

并行度为1的输出,到hdfs看结果
在这里插入图片描述
并行度为2的时候,生成了2个文件,内容如下
在这里插入图片描述
在这里插入图片描述
以上,简单的介绍了source、transformations和sink的使用示例。


本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/131592673
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。

“5、Flink 的 source、transformations、sink的详细示例(一)”的评论:

还没有评论