1.map实现数据清洗
package demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.api.common.functions.MapFunction;
import java.text.SimpleDateFormat;
public class Step1 {
public static DataStream<String> mapTest(DataStreamSource<String> dataStreamSource) {
SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
String userId = fields[0];
String gender = fields[1].equals("0") ? "男" : "女";
Long timestamp = Long.parseLong(fields[2]);
String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timestamp);
return userId + "," + gender + "," + dateString;
}
});
return result;
}
}
2.flatmap完成单词切割
package demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.Collector;
public class Step2 {
public static DataStream<String> FlatMapTest(DataStreamSource<String> dataStreamSource) {
DataStream<String> result = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(word);
}
}
}).returns(Types.STRING);
return result;
}
}
- filter完成数据清洗之过滤
package demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
public class Step3 {
/**
* 流数据格式如下:
* 17,女,2016-02-21 20:21:17 ---> 用户ID,用户性别,该用户在平台注册账号的时间戳
* 需求如下:
* 过滤掉注册年份低于2015(不包含2015)的数据
*
* @param dataStreamSource 流数据源
* @return DataStream<String>
*/
public static DataStream<String> FilterTest(DataStreamSource<String> dataStreamSource) {
DataStream<String> filteredDataStream = dataStreamSource.filter(line -> {
// 解析时间戳,获取注册年份
String[] parts = line.split(",");
String timestamp = parts[2];
int registrationYear = Integer.parseInt(timestamp.split("-")[0]);
// 过滤出注册年份在2015年之后的数据
return registrationYear > 2015;
});
return filteredDataStream;
}
}
4.reduce 完成累加操作与求最大值操作
package demo;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.ReduceOperator;
public class Step4 {
/**
* 需求:使用reduce,进行累加操作
*
* @param ds DataSource<Integer> (1,2,3,4,.....)
* @return ReduceOperator<Integer>
*/
public static ReduceOperator<Integer> ReduceTest(DataSource<Integer> ds) {
// 使用reduce操作进行累加
ReduceOperator<Integer> result = ds.reduce((value1, value2) -> value1 + value2);
return result;
}
/**
* 需求:使用reduce,得出最大值
*
* @param ds DataSource<Integer> (1,2,3,4,.....)
* @return ReduceOperator<Integer>
*/
public static ReduceOperator<Integer> ReduceTest2(DataSource<Integer> ds) {
// 使用reduce操作得出最大值
ReduceOperator<Integer> result = ds.reduce((value1, value2) -> Math.max(value1, value2));
return result;
}
}
5.综合案例 - 词频统计
package demo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class Step5 {
/**
* 需求:使用flatMap、groupBy、sum 等算子完成单词统计
*
* @param ds DataSource<String> 数据格式如下:
* area book business
* case child company country day eye
* @return AggregateOperator<Tuple2<String, Integer>>
*/
public static AggregateOperator<Tuple2<String, Integer>> WordCountTest(DataSource<String> ds) {
// 使用flatMap将每行字符串拆分成单词并转换成元组
AggregateOperator<Tuple2<String, Integer>> result = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split("\\s")) {
out.collect(new Tuple2<>(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
// 使用groupBy对单词进行分组
.groupBy(0)
// 使用sum进行求和
.sum(1);
return result;
}
}
版权归原作者 qq_65975926 所有, 如有侵权,请联系我们删除。