Flink的API
Flink提供了多个层次的api供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用难度就越大。DataSet已经不提倡使用了,被流批一体的DataStream代替。
Flink的编程模型
Flink的应用程序结构主要包括三部分:Source/Transformation/Sink
Flink的入门案例
需求
使用Flink实现WordCount
编码步骤
- 准备环境-env
- 准备数据-source
- 处理数据-transformation
- 输出结果-sink
- 触发执行
DataSet编码实现
新建maven工程
修改pom文件,引入依赖
导入成功
建立包和类
这个当时没截图了,不过下面结果那的图有里有包和类的结构
编写代码
package cn.edu.hgu.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 使用Flink中的DataSet实现单词计数
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//1·准备环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2·准备数据-source
DataSet<String> lineDS = env.fromElements("flink hadoop java hbase",
"hadoop flink 0","hadoop hbase flink");
//3·处理数据-transformation
// 3.1 将每一行数据切分成一个一个的单词,组成一个集合
DataSet<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
//参数 s 就是一行行的数据再将每一行切分成一个个的单词
String[] words = s.split(" ");
// 将切分的单词收集起来,发到集合中去
for (String word:words){
collector.collect(word);
}
}
});
// 3.2对集合中的每一个单词记为1,成为一个三元组集合
DataSet<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
// 此处的s就是进来的一个个单词,再跟一组成一个二元组返回
return Tuple2.of(s,1);
}
});
// 3.3对新的集合按照key(单词名称) 进行分组groupby
UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOneDS.groupBy(0);//此处的0代表二元组的第一个元素t0
// 3.4对各个组内的数据按照value进行聚合,也就是求和SUM
DataSet<Tuple2<String,Integer>> aggResult = groupedDS.sum(1);//此处1 表示二元组的第二个元素t1
// 3.5对结果进行排序sorted
DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
//4·输出结果-Sink
result.print();
//5·触发执行
}
}
执行,查看结果
DataStream编码实现
创建类,并编写代码
package cn.edu.hgu.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @description:使用flink的dataStream实现单词计数
*/
public class WordCountDataStream {
public static void main(String[] args) throws Exception {
//1.准备环-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定运行环境为流处理模式
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//指定运行模式为批处理模式
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//指定运行模式为自动,既支持流处理也支持批处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2.准备数据-source
DataStream<String> lineDS = env.fromElements("hadoop flink hadoop java hbase ",
"hadoop flink c","hadoop hbase flink");
//3.处理数据-transformation
//3.1 将每一行数据切分成一个个的单词再组成一个集合
DataStream<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
//参数s就是一行行的数据,再将每一行切分为一个个的单词
String[] words = s.split(" ");
//将切分单词收集起来,发到集合中
for (String word:words) {
collector.collect(word);
}
}
});
//3.2 对集合中的每一个单词计为1,成为一个二元组
DataStream<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
//此处的s就是进来的一个个单词再和1组成一个个二元组返回
return Tuple2.of(s,1);
}
});
//3.3 对新的集合按照key(单词名)分组 groupBy
KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOneDS.keyBy(t-> t.f0);//此处的f0代表二元组的第一个元素(索引为0)t0
// 3.4 再对各个组内的数据按照value(1)进行聚合,也就是求和sum
DataStream<Tuple2<String,Integer>> result = groupedDS.sum(1);//此处的1代表二元组的第二个元素
//3.5 对结果进行排序sort
// DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
//4.输出结果-sink
result.print();
//5.触发执行
env.execute();//DataStream需要调用execte方法来执行
}
}
执行,查看结果
在集群上运行
打jar包
上传服务器
提交时需要指定主类
cn.edu.hgu.flink.WordCountDataSet
per-job模式提交任务-批处理模式
flink run -m yarn-cluster -yjm 1024 -ytm 1024 flink-dataset-demo-1.0-SNAPSHOT.jar
没有指定主类,出现错误提示
重新提交
flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.edu.hgu.flink.WordCountDataSet flink-dataset-demo-1.0-SNAPSHOT.jar
去改yarn-site发现改了
这里就提到内存的事情了,yarn和hadoop以及其他的组件有好多虚拟内存的限制
需要手动更改
修改代码
在yarn的web ui上查看执行情况
再次以流处理模式提交任务
获取主类
flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.edu.hgu.flink.WordCountDataStream flink-dataset-demo-1.0-SNAPSHOT.jar
对代码进行修改,新建一个类,添加存储到hdfs的代码
添加pom的hadoop依赖
编写代码
本地运行测试
重新打包,并上传服务器
指定新的主类,重新提交任务
flink run -m yarn-cluster -yjm 2048 -ytm 2048 -c cn.edu.hgu.flink.WordCountDataStreamYarn flink-dataset-demo-1.0-SNAPSHOT.jar
在yarn的web ui上查看
在hadoop上查看生成的文件
说明执行成功
到这里flink的入门案例wordcount单词计数就结束了,下一文章(3)是fkink重要的流批一体
如遇侵权,请联系删除。
版权归原作者 星欲冷hx 所有, 如有侵权,请联系我们删除。