0


头歌 Flink Transformation(数据转换入门篇)

1.map实现数据清洗

package demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.api.common.functions.MapFunction;

import java.text.SimpleDateFormat;

public class Step1 {

public static DataStream<String> mapTest(DataStreamSource<String> dataStreamSource) {
SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
String userId = fields[0];
String gender = fields[1].equals("0") ? "男" : "女";
Long timestamp = Long.parseLong(fields[2]);
String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timestamp);
return userId + "," + gender + "," + dateString;
}
});
return result;
}
}

2.flatmap完成单词切割

package demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.Collector;

public class Step2 {

 public static DataStream<String> FlatMapTest(DataStreamSource<String> dataStreamSource) {
     DataStream<String> result = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
         @Override
         public void flatMap(String value, Collector<String> out) throws Exception {
             String[] words = value.split("\\s+");
             for (String word : words) {
                 out.collect(word);
             }
         }
     }).returns(Types.STRING);
     return result;
 }

}

  1. filter完成数据清洗之过滤

package demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

public class Step3 {

 /**
  * 流数据格式如下:
  * 17,女,2016-02-21 20:21:17 ---> 用户ID,用户性别,该用户在平台注册账号的时间戳
  * 需求如下:
  * 过滤掉注册年份低于2015(不包含2015)的数据
  *
  * @param dataStreamSource 流数据源
  * @return DataStream<String>
  */
 public static DataStream<String> FilterTest(DataStreamSource<String> dataStreamSource) {
     DataStream<String> filteredDataStream = dataStreamSource.filter(line -> {
         // 解析时间戳,获取注册年份
         String[] parts = line.split(",");
         String timestamp = parts[2];
         int registrationYear = Integer.parseInt(timestamp.split("-")[0]);
         // 过滤出注册年份在2015年之后的数据
         return registrationYear > 2015;
     });
     return filteredDataStream;
 }

}

4.reduce 完成累加操作与求最大值操作

package demo;

import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.ReduceOperator;

public class Step4 {

 /**
  * 需求:使用reduce,进行累加操作
  *
  * @param ds DataSource<Integer> (1,2,3,4,.....)
  * @return ReduceOperator<Integer>
  */
 public static ReduceOperator<Integer> ReduceTest(DataSource<Integer> ds) {
     // 使用reduce操作进行累加
     ReduceOperator<Integer> result = ds.reduce((value1, value2) -> value1 + value2);
     return result;
 }

 /**
  * 需求:使用reduce,得出最大值
  *
  * @param ds DataSource<Integer> (1,2,3,4,.....)
  * @return ReduceOperator<Integer>
  */
 public static ReduceOperator<Integer> ReduceTest2(DataSource<Integer> ds) {
     // 使用reduce操作得出最大值
     ReduceOperator<Integer> result = ds.reduce((value1, value2) -> Math.max(value1, value2));
     return result;
 }

}

5.综合案例 - 词频统计

package demo;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Step5 {

 /**
  * 需求:使用flatMap、groupBy、sum 等算子完成单词统计
  *
  * @param ds DataSource<String>  数据格式如下:
  *           area book business
  *           case child company country day eye
  * @return AggregateOperator<Tuple2<String, Integer>>
  */
 public static AggregateOperator<Tuple2<String, Integer>> WordCountTest(DataSource<String> ds) {
     // 使用flatMap将每行字符串拆分成单词并转换成元组
     AggregateOperator<Tuple2<String, Integer>> result = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
         for (String word : line.split("\\s")) {
             out.collect(new Tuple2<>(word, 1));
         }
     }).returns(Types.TUPLE(Types.STRING, Types.INT))
       // 使用groupBy对单词进行分组
       .groupBy(0)
       // 使用sum进行求和
       .sum(1);

     return result;
 }

}

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_65975926/article/details/139152160
版权归原作者 qq_65975926 所有, 如有侵权,请联系我们删除。

“头歌 Flink Transformation(数据转换入门篇)”的评论:

还没有评论