0


【Flink】flink入门案例(2)

Flink的API

Flink提供了多个层次的api供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用难度就越大。DataSet已经不提倡使用了,被流批一体的DataStream代替。

Flink的编程模型

Flink的应用程序结构主要包括三部分:Source/Transformation/Sink

Flink的入门案例

需求

使用Flink实现WordCount

编码步骤

  1. 准备环境-env
  2. 准备数据-source
  3. 处理数据-transformation
  4. 输出结果-sink
  5. 触发执行

DataSet编码实现

新建maven工程

修改pom文件,引入依赖

导入成功

建立包和类

这个当时没截图了,不过下面结果那的图有里有包和类的结构

编写代码

package cn.edu.hgu.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 使用Flink中的DataSet实现单词计数
*/
public class WordCount {
    public static void main(String[] args) throws Exception {
    //1·准备环境-env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //2·准备数据-source
        DataSet<String> lineDS = env.fromElements("flink hadoop java hbase",
                "hadoop flink 0","hadoop hbase flink");
    //3·处理数据-transformation
//        3.1 将每一行数据切分成一个一个的单词,组成一个集合
        DataSet<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //参数 s 就是一行行的数据再将每一行切分成一个个的单词
                String[] words = s.split(" ");
                // 将切分的单词收集起来,发到集合中去
                for (String word:words){
                    collector.collect(word);
                }
            }
        });
//        3.2对集合中的每一个单词记为1,成为一个三元组集合
        DataSet<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                // 此处的s就是进来的一个个单词,再跟一组成一个二元组返回
                return Tuple2.of(s,1);
            }
    });
//        3.3对新的集合按照key(单词名称) 进行分组groupby
        UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOneDS.groupBy(0);//此处的0代表二元组的第一个元素t0
//        3.4对各个组内的数据按照value进行聚合,也就是求和SUM
        DataSet<Tuple2<String,Integer>> aggResult = groupedDS.sum(1);//此处1 表示二元组的第二个元素t1
//        3.5对结果进行排序sorted
        DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
        //4·输出结果-Sink
        result.print();
    //5·触发执行
    }
}

执行,查看结果

DataStream编码实现

创建类,并编写代码

package cn.edu.hgu.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @description:使用flink的dataStream实现单词计数
 */
public class WordCountDataStream {
    public static void main(String[] args) throws Exception {
        //1.准备环-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //指定运行环境为流处理模式
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //指定运行模式为批处理模式
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        //指定运行模式为自动,既支持流处理也支持批处理
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.准备数据-source
        DataStream<String> lineDS = env.fromElements("hadoop flink hadoop java hbase ",
                "hadoop flink c","hadoop hbase flink");

        //3.处理数据-transformation
        //3.1 将每一行数据切分成一个个的单词再组成一个集合
        DataStream<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //参数s就是一行行的数据,再将每一行切分为一个个的单词
                String[] words = s.split(" ");
                //将切分单词收集起来,发到集合中
                for (String word:words) {
                    collector.collect(word);
                }
            }
        });
        //3.2 对集合中的每一个单词计为1,成为一个二元组
        DataStream<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                //此处的s就是进来的一个个单词再和1组成一个个二元组返回
                return Tuple2.of(s,1);
            }
        });
        //3.3 对新的集合按照key(单词名)分组 groupBy
        KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOneDS.keyBy(t-> t.f0);//此处的f0代表二元组的第一个元素(索引为0)t0
        // 3.4 再对各个组内的数据按照value(1)进行聚合,也就是求和sum
        DataStream<Tuple2<String,Integer>> result = groupedDS.sum(1);//此处的1代表二元组的第二个元素
        //3.5 对结果进行排序sort
        //  DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
        //4.输出结果-sink
        result.print();
        //5.触发执行
        env.execute();//DataStream需要调用execte方法来执行

    }
}

执行,查看结果

在集群上运行

打jar包

上传服务器

提交时需要指定主类

cn.edu.hgu.flink.WordCountDataSet

per-job模式提交任务-批处理模式

flink run -m yarn-cluster -yjm 1024 -ytm 1024 flink-dataset-demo-1.0-SNAPSHOT.jar

没有指定主类,出现错误提示

重新提交

flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.edu.hgu.flink.WordCountDataSet flink-dataset-demo-1.0-SNAPSHOT.jar

去改yarn-site发现改了

这里就提到内存的事情了,yarn和hadoop以及其他的组件有好多虚拟内存的限制

需要手动更改

修改代码

在yarn的web ui上查看执行情况

再次以流处理模式提交任务

获取主类

flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.edu.hgu.flink.WordCountDataStream flink-dataset-demo-1.0-SNAPSHOT.jar

对代码进行修改,新建一个类,添加存储到hdfs的代码

添加pom的hadoop依赖

编写代码

本地运行测试

重新打包,并上传服务器

指定新的主类,重新提交任务

flink run -m yarn-cluster -yjm 2048 -ytm 2048 -c cn.edu.hgu.flink.WordCountDataStreamYarn flink-dataset-demo-1.0-SNAPSHOT.jar

在yarn的web ui上查看

在hadoop上查看生成的文件

说明执行成功

到这里flink的入门案例wordcount单词计数就结束了,下一文章(3)是fkink重要的流批一体

如遇侵权,请联系删除。

标签: hadoop 大数据 flink

本文转载自: https://blog.csdn.net/hx1156477702/article/details/127160039
版权归原作者 星欲冷hx 所有, 如有侵权,请联系我们删除。

“【Flink】flink入门案例(2)”的评论:

还没有评论