Flink 系列文章
1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例
13、Flink 的table api和sql的介绍、示例等系列综合文章链接
文章目录
本文介绍了source、transformations和sink的基本用法,下一篇将介绍各自的自定义用法。
一、source-transformations-sink示例
1、maven依赖
下文中所有示例都是用该maven依赖,除非有特殊说明的情况。
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.12.0</flink.version></properties><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>
2、source示例
1)、基于Collection
importjava.util.Arrays;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassSource_Collection{/**
* 一般用于学习测试时使用
* 1.env.fromElements(可变参数);
* 2.env.fromColletion(各种集合);
* 3.env.generateSequence(开始,结束);
* 4.env.fromSequence(开始,结束);
*
* @param args 基于集合
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("i am alanchan","i like flink");DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan","i like flink"));DataStream<Long> ds3 = env.generateSequence(1,10);//已过期,使用fromSequence方法DataStream<Long> ds4 = env.fromSequence(1,10);// transformation// sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();// execute
env.execute();}}//运行结果3>310>106>61>19>94>42>27>75>514>114>214>314>414>514>614>711>811>911>108>811> i am alanchan
12> i like flink
10> i like flink
9> i am alanchan
2)、基于文件
//自己创建测试的文件,内容都会如实的展示了,故本示例不再提供importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassSource_File{/**
* env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以
*
*
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");// transformation// sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();// execute
env.execute();}}// 运行结果//单个文件8> i am alanchan
12> i like flink
1> and you ?//目录15>1,"英文",901>2,"数学",7011> and you ?8> i am alanchan
9> i like flink
3>3,"英文",8613>1,"数学",905>1,"语文",50//tar.gz12> i am alanchan
12> i like flink
12> and you ?//hdfs6>(hive,2)10>(flink,4)15>(hadoop,3)
3)、基于socket
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
* @author alanchan
* 在192.168.10.42上使用nc -lk 9999 向指定端口发送数据
* nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
* 如果没有该命令可以下安装 yum install -y nc
*
*/publicclassSource_Socket{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{//envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);//transformation//sink
lines.print();//execute
env.execute();}}
验证步骤:
1、先启动该程序
2、192.168.10.42上输入
3、观察应用程序的控制台输出
3、transformations示例
对流数据中的单词进行统计,排除敏感词vx:alanchanchn
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
* @author alanchan
*
*/publicclassTransformationDemo{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformationDataStream<String> words = lines.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{String[] arr = value.split(",");for(String word : arr){
out.collect(word);}}});DataStream<String> filted = words.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String value)throwsException{return!value.equals("vx:alanchanchn");// 如果是vx:alanchanchn则返回false表示过滤掉}});SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = filted
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{returnTuple2.of(value,1);}});KeyedStream<Tuple2<String,Integer>,String> grouped = wordAndOne.keyBy(t -> t.f0);SingleOutputStreamOperator<Tuple2<String,Integer>> result = grouped.sum(1);// sink
result.print();// execute
env.execute();}}
验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、在192.168.10.42中输入测试数据,如下
4、观察应用程序的控制台,截图如下
4、sink示例
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*/publicclassSinkDemo{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");System.setProperty("HADOOP_USER_NAME","alanchan");// transformation// sink// ds.print();// ds.print("输出标识");// 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件
ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(1);// execute
env.execute();}}
并行度为1的输出,到hdfs看结果
并行度为2的时候,生成了2个文件,内容如下
以上,简单的介绍了source、transformations和sink的使用示例。
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。