一、实时计算:
1.大数据离线实时框架 流程:(采集,存储,处理,应用)
- 做项目的流程:第一步:数据采集,第二步:数据存储,第三步:数据处理,最后就是数据应用(对最后结果进行使用)
2.数据和业务?
- 以前的系统都是对事务进行处理的,事务可以理解为一些事情。增删改查
- 从数据挖掘价值推动决策
- 数据时效性:数据的价值随着时间延迟迅速降低。
- 越快越好,越快越有竞争优势,大数据实时化——实时计算
- spark Streaming底层也是批处理,job由时间决定的。
3.离线计算和实时计算的区别
- 离线计算是用户启动的,数据已经准备好了,已完成采集,等待数据,加载数据,处理结果。
- 实时计算要等待数据过来,才会立马处理。是事件驱动
4.主流实时计算框架对比
** 模型API保证次数容错机制延时吞吐量批流统一业务模式**易用性StormNative组合式At-least-onceRecord ACKs★★★★不支持需要其他框架★Spark StreamingMirco-batching声明式Exectly-onceRDD Checkpoint★(批处理)慢★★★支持需要其他框架★★Apache FlinkNative组合式Exectly-onceCheckpoint★★★★★★支持需要其他框架★★
- Apache Flink 组合式:source,transfrom(转换),sink
- 保证次数:2.1、At-least-once:至少一次2.2、Exectly-once:完全一次
- Checkpoint(容错),失败了可以找到最新的状态恢复。
- 状态:状态跟时间有关系,某一时刻的结果。状态是做容错考虑的。
5.Spark Streaming微批处理和Flink流式处理
- 微批处理可能会导致雪崩
- 流式处理一条一条处理数据,会导致某一时间段延迟会很大,但不会导致雪崩。
二、flink概念
1.什么是flink?
- Flink是一个分布式实时计算框架。用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
- 有状态:有状态计算是将当前批次结果加上上一批次计算的结果。
- 无界流 有定义流的开始,但没有定义流的结束(没有边界)。它们会无休止地产生数据。流处理。
- 有界流 有定义流的开始,也有定义流的结束。有界流(假如1000条数据),即可批处理(一次处理一批),也可以流处理(一条一条处理)。
- 任意规模进行计算:指的是flink主从节点。jobmanager:driver端;task manager:executor端。
2.flink特性
3.Flink底层原理
spark与flink底层原理对比
- spark底层基于mapreduce,要先执行map端再执行reduce端,延迟高。
- flink底层结构重新设计,持续流。
4.flink在流处理和批处理上的source
5.flink并行度
- 一个任务对应一个并行度,每个并行度对应一个槽位
- 并行度根据吞吐量决定的,Task Slot数量 是由任务中最大的并行度决定,Task的数量由并行度以及有无Shuffle一起决定
- flink并行度设置的集中方式
1、通过env设置,不推荐,如果需要台调整并行度得修改代码重新打包提交任务2、每个算子可以单独设置并行度,视实际情况决定,一般不常用3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
- Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度,并行度直接决定了Flink程序需要申请多少资源
6.事件时间
事件时间:数据自带的时间(是由数据自带的时间去触发窗口计算) 处理时间:数据时间以系统时间为准。
为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用
TimestampAssigner
API 从元素中的某个字段去访问/提取时间戳。
时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定
WatermarkGenerator
来配置 watermark 的生成方式。
7.窗口
窗口三大类:时间,计算,会话
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
7.1滚动窗口
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建
7.2滑动窗口
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据
8、Flink 中的Watermark是什么概念,起到什么作用
- Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。
- 水位线默认等于最新数据的时间戳,水位线只能增长不能降低。
- 由于数据在传输的过程中可能会乱序,为了解决乱序问题,可以将水位线前移,延迟窗口的计算,避免数据丢失
三、flink代码
1.flink core
public class Demo01StreamWordCount {
/**
* flink整体代码大致分为四块
* 第一部分:构建flink环境
* 第二部分:构建第一个DStream
* 第三部分:DStream之间的转换
* 第四部分:打印得到结果进行保存
*/
public static void main(String[] args) throws Exception {
//1.构建flink环境
//ExecutionEnvironment.getExecutionEnvironment();执行环境。获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setBufferTimeout(200);
//2.通过Socket(套接字)模拟无界流环境,方便flink处理
//虚拟机启动:nc-lk 10086 //cpu核数有关
//从source构建第一个DataStream
DataStream<String> lineDS = env.socketTextStream("master", 10086);
System.out.println("lineDS并行度:"+lineDS.getParallelism());
//统计每个单词的数量
//第一步:将每行数据的每个单词进行扁平化处理
//<String, Integer>输入数据类型和输出数据类型
DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
/**
*
* @param line DS中的一条数据
* @param out 通过collect方法将数据发送到下游
* @throws Exception
*/
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
out.collect(word);
}
}
});
System.out.println("wordsDS并行度:"+wordsDS.getParallelism());
// 第二步:将每个单词变成 KV格式,V置为1
DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
System.out.println("wordKVDS并行度:"+wordKVDS.getParallelism());
// 第三步:按每一个单词进行分组
// keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
// 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
});
System.out.println("keyedDS并行度:" + keyedDS.getParallelism());
// 第四步:对1进行聚合sum
DataStream<Tuple2<String, Integer>> wordCutDS = keyedDS.sum(1);
System.out.println("wordCntDS并行度:" + wordCutDS.getParallelism());
//打印结果:将DS中的内容sink到控制台
wordsDS.print();
env.execute();
}
}
public class Demo02BatchWordCount {
public static void main(String[] args) throws Exception {
//1.构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置flink的处理方式:默认是流处理
/**
* BATCH:批处理,只能处理有界流,底层是MR模型,可以进行预聚合
* STREAMING:流处理,可以处理有界流,也可以处理无界流,底层是持续流
* AUTOMATIC:自动判断,当所有的Source都是有界流则使用BATCH,当Source有一个是无界流则会使用STREAMING模式
*/
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.获得第一个DS
// 通过readTextFile可以基于文件构建有界流
DataStream<String> wordsFileDS = env.readTextFile("flink/data/words.txt");
//3.DS之间的转换
// Flink处理逻辑传入的方式
// new XXXFunction 使用匿名内部类
DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
// 将每个单词发送到下游
out.collect(word);
}
}
});
// 使用lambada表达式
/**
* ()->{}
* 通过 -> 分隔,左边是函数的参数,右边是函数实现的具体逻辑
*/
DataStream<String> wordDS = wordsFileDS.flatMap((line, out) -> {
for (String word : line.split(",")) {
out.collect(word);
}
}, Types.STRING);
// 第二步:将每个单词变成 KV格式,V置为1
DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 第三步:按每一个单词进行分组
// keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
// 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
});
KeyedStream<Tuple2<String, Integer>, String> keyDS = wordKVDS.keyBy(kv -> kv.f0, Types.STRING);
// 第四步:对1进行聚合sum
DataStream<Tuple2<String, Integer>> wordCutDS = keyDS.sum(1);//索引,第二个位置
wordCutDS.print();
//4.最终结果的处理
env.execute();
}
}
public class Demo03Parallelism {
public static void main(String[] args) throws Exception {
/*
* 如何设置并行度?
* 1、考虑吞吐量
* 有聚合操作的任务:1w条/s 一个并行度
* 无聚合操作的任务:10w条/s 一个并行度
* 2、考虑集群本身的资源
* Task的数量由并行度以及有无Shuffle一起决定
*
* Task Slot数量 是由任务中最大的并行度决定
* TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定
*/
// FLink 并行度设置的集中方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1、通过env设置,不推荐,如果需要调整并行度得修改代码重新打包提交任务
// env.setParallelism(2);
DataStreamSource<String> ds = env.socketTextStream("master", 8888);
// 2、每个算子可以单独设置并行度,视实际情况决定,一般不常用
SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
.setParallelism(4);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCntDS2P = kvDS.keyBy(kv -> kv.f0)
.sum(1)
.setParallelism(2);
// 如果算子不设置并行度则以全局为准
wordCntDS2P.print();
// 3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式
// 命令行:flink run 可以通过 -p 参数设置全局并行度
// web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
env.execute();
}
}
public class Demo04EventTime {
public static void main(String[] args) throws Exception {
// 事件时间:数据本身自带的时间
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度
env.setParallelism(1);
/*
数据格式:单词,时间戳(很大的整数,Long类型)
a,1722233813000
a,1722233814000
a,1722233815000
*/
DataStreamSource<String> wordTsDS = env.socketTextStream("master", 8888);
SingleOutputStreamOperator<Tuple2<String, Long>> mapDS = wordTsDS
.map(line -> Tuple2.of(line.split(",")[0], Long.parseLong(line.split(",")[1])), Types.TUPLE(Types.STRING, Types.LONG));
// 指定数据的时间戳,告诉Flink,将其作为事件时间进行处理
//assignTimestampsAndWatermarks 分配时间戳和水印
//WatermarkStrategy 水印策略
SingleOutputStreamOperator<Tuple2<String, Long>> assDS = mapDS
.assignTimestampsAndWatermarks(//分配时间戳和生成水印
WatermarkStrategy
// // 单调递增时间戳策略,不考虑数据乱序问题
// .<Tuple2<String, Long>>forMonotonousTimestamps()
/**
* 容忍5s的数据乱序到达,本质上将水位线前移5s,缺点:导致任务延时变大
* 水位线:某个线程中所接收到的数据中最大的时间戳
* forBoundedOutOfOrderness 有界乱序数据
* forMonotonousTimestamps 单调时间戳
* Duration 持续时间
*/
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 可以提取数据的某一部分作为事件时间
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> t2, long recordTimestamp) {
return t2.f1;
}
})
);
// 不管是事件时间还是处理时间都需要搭配窗口操作一起使用
assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
// 窗口触发的条件:水位线超过了窗口的结束时间
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
env.execute();
}
}
2.flink source
2.1、基于集合的source
public class Demo01ListSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 本地集合Source
ArrayList<String> arrList = new ArrayList<>();
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
// 有界流
DataStream<String> lineDS = env.fromCollection(arrList);
lineDS.print();
env.execute();
}
}
2.2、基于文件的source
public class Demo02FileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 历史版本读文件的方式,有界流
DataStream<String> oldFileDS = env.readTextFile("flink/data/words.txt");
// oldFileDS.print();
// 新版本加载文件的方式:FileSource,默认是有界流
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat()
, new Path("flink/data/words.txt")
)
.build();
// 从Source加载数据构建DS
DataStream<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
// fileSourceDS.print();
// 将读取文件变成无界流
FileSource<String> fileSource2 = FileSource
.forRecordStreamFormat(
new TextLineInputFormat()
, new Path("flink/data/words")
)
// 类似Flume中的spool dir,可以监控一个目录下文件的变化
.monitorContinuously(Duration.ofSeconds(4))
.build();
DataStream<String> fileSourceDS2 = env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),"fileSource2");
fileSourceDS2.print();
env.execute();
}
}
2.3、基于自定义的source
public class Demo03MySource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> mySourceDS = env.addSource(new MySource());
mySourceDS.print();
env.execute();
}
}
class MySource implements SourceFunction<String>{
// Source启动时会执行
// run方法如果会结束,则Source会得到一个有界流
// run方法如果不会结束,则Source会得到一个无界流
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("run方法启动了");
// ctx 可以通过collect方法向下游发送数据
long cnt=0l;
while (true){
// ctx.collect(cnt+"");
cnt++;
// 休眠一会
Thread.sleep(1000);
}
}
// Source结束时会执行
@Override
public void cancel() {
System.out.println("Source结束了");
}
}
3.flink transformation
算子数据流转换,数据转换的各种操作,将数据转换计算成想要的数据。
3.1、Map:输入一个元素同时输出一个元素
public class Demo01Map {
public static void main(String[] args) throws Exception {
// 传入一条数据返回一条数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds = env.socketTextStream("master", 8888);
// 1、使用匿名内部类
DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// mapDS.print();
// 2、使用lambda表达式
DataStream<Tuple2<String, Integer>> mapDS2 = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
mapDS2.print();
env.execute();
}
}
3.2、flatMap:输入一个元素同时产生零个、一个或多个元素
public class Demo02FlatMap {
public static void main(String[] args) throws Exception {
// 传入一条数据返回多条数据,类似UDTF函数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("master", 8888);
// 1、使用匿名内部类
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.split(",")) {
out.collect(Tuple2.of(word, 1));
}
}
});
// flatMapDS.print();
// 2、使用lambda表达式
SingleOutputStreamOperator<Tuple> flatMapDS2 = ds.flatMap((line, out) -> {
for (String word : line.split(",")) {
out.collect(Tuple2.of(word, 1));
}
}, Types.TUPLE(Types.STRING, Types.INT));
flatMapDS2.print();
env.execute();
}
}
3.3、filter:过滤,为每个元素执行一个布尔 function
public class Demo03Filter {
public static void main(String[] args) throws Exception {
// 过滤数据,注意返回值必须是布尔类型,返回true则保留数据,返回false则过滤数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("master", 8888);
// 只输出大于10的数字
SingleOutputStreamOperator<String> ftDS = ds.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return Integer.parseInt(value) > 10;
}
});
// ftDS.print();
ds.filter(value->Integer.parseInt(value)>10).print();
env.execute();
}
}
3.4、keyBy:分组,具有相同 key 的记录都分配到同一个分区
public class Demo04KeyBy {
public static void main(String[] args) throws Exception {
// 用于就数据流分组,让相同的Key进入到同一个任务中进行处理,后续可以跟聚合操作
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("master", 8888);
KeyedStream<String, String> keyByDS = ds.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
// keyByDS.print();
//lambda表达式
ds.keyBy(value->value,Types.STRING).print();
ds.keyBy(value->value.toLowerCase(),Types.STRING).print();
// ds.keyBy(String::toLowerCase,Types.STRING).print();
env.execute();
}
}
3.5、 reduce:在相同 key 的数据流上“滚动”执行 reduce
public class Demo05Reduce {
public static void main(String[] args) throws Exception {
// 用于对KeyBy之后的数据流进行聚合计算
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds = env.socketTextStream("master", 8888);
// 统计班级的平均年龄
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> kvDS = ds.map(line -> {
String[] split = line.split(",");
String clazz = split[0];
int age = Integer.parseInt(split[1]);
return Tuple3.of(clazz, age, 1);
}, Types.TUPLE(Types.STRING, Types.INT, Types.INT));
KeyedStream<Tuple3<String, Integer, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0, Types.STRING);
keyByDS.reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> v1, Tuple3<String, Integer, Integer> v2) throws Exception {
return Tuple3.of(v1.f0,v1.f1+v2.f1,v1.f2+v2.f2);
}
}).map(t3->Tuple2.of(t3.f0,(double)t3.f1/t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE))
.print();
keyByDS.reduce((v1,v2)->Tuple3.of(v1.f0,v1.f1 + v2.f1, v1.f2 + v2.f2)).print();
env.execute();
}
}
3.6、window:可以在已经分区的 KeyedStreams 上定义 Window
public class Demo06Window {
public static void main(String[] args) throws Exception {
// Flink窗口操作:时间、计数、会话
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds = env.socketTextStream("master", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));
// 每隔5s钟统计每个单词的数量 ---> 滚动窗口实现
SingleOutputStreamOperator<Tuple2<String, Integer>> winDS = kvDS.keyBy(kv -> kv.f0, Types.STRING)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// winDS.print();
// 每隔5s钟统计最近10s内的每个单词的数量 ---> 滑动窗口实现
kvDS.keyBy(kv->kv.f0,Types.STRING)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.sum(1).print();
env.execute();
}
}
3.7、union:将两个或多个数据流联合来创建一个包含所有流中数据的新流
public class Demo07Union {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds01 = env.socketTextStream("master", 8888);
DataStream<String> ds02 = env.socketTextStream("master", 6666);
DataStream<String> unionDS = ds01.union(ds02);
// union 就是将两个相同结构的DS合并成一个DS
unionDS.print();
env.execute();
}
}
3.8、process:处理函数
public class Demo08Process {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds01 = env.socketTextStream("master", 5555);
SingleOutputStreamOperator<Object> pDS = ds01.process(new ProcessFunction<String, Object>() {
/*
* 每进来一条数据就会执行一次
* value :一条数据
* ctx:可以获取任务执行时的信息
* out:用于输出数据
*/
@Override
public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) throws Exception {
// 通过processElement实现Map算子操作
out.collect(Tuple2.of(value, 1));
// 通过processElement实现flatMap算子操作
for (String word : value.split(",")) {
out.collect(word);
}
// 通过processElement实现filter算子操作
if ("flink".equals(value)) {
out.collect("flink yes");
}
}
});
pDS.print();
}
}
4.flink sink
4.1、写入文件
public class Demo01FileSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
// 构建FileSink
FileSink<String> flinkSink = FileSink.forRowFormat(
new Path("flink/data/fileFlink"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(10))
.withInactivityInterval(Duration.ofSeconds(10))
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
linesDS.sinkTo(flinkSink);
env.execute();
}
}
4.2、自定义sink
public class Demo02MySink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> arrList = new ArrayList<>();
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
DataStreamSource<String> ds = env.fromCollection(arrList);
ds.addSink(new MySinkFunction());
env.execute();
}
}
class MySinkFunction implements SinkFunction<String>{
@Override
public void invoke(String value,Context context) throws Exception {
System.out.println("进入了invoke方法");
// invoke 每一条数据会执行一次
// 最终数据需要sink到哪里,就对value进行处理即可
System.out.println(value);
}
}
5. flink window
5.1、时间窗口
public class Demo01TimeWindow {
/*
* 时间窗口:滚动、滑动
* 时间类型:处理时间、事件时间
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
.map(new MapFunction<String, MyEvent>() {
@Override
public MyEvent map(String value) throws Exception {
String[] split = value.split(",");
return new MyEvent(split[0], Long.parseLong(split[1]));
}
});
// 基于处理时间的滚动、滑动窗口
myDS.map(e-> Tuple2.of(e.getWord(),1), Types.TUPLE(Types.STRING,Types.INT))
.keyBy(t2->t2.f0)
// 滚动窗口 每隔5s统计一次
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// 滑动窗口 每隔5s统计最近10s内的数据
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1);
// 基于事件时间的滚动、滑动窗口
SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTs())
);
SingleOutputStreamOperator<Tuple2<String, Integer>> eventDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
// 滚动窗口,由于水位线前移了5s,整体有5s的延时
// .window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 滑动窗口
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1);
// processDS.print();
eventDS.print();
env.execute();
}
}
5.2、会话窗口
public class Demo02SessionWindow {
public static void main(String[] args) throws Exception {
// 会话窗口:当一段时间没有数据,那么就认定此次会话结束并触发窗口的执行
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
.map(new MapFunction<String, MyEvent>() {
@Override
public MyEvent map(String value) throws Exception {
String[] split = value.split(",");
return new MyEvent(split[0], Long.parseLong(split[1]));
}
});
// 基于处理时间的会话窗口
SingleOutputStreamOperator<Tuple2<String, Integer>> processSessionDS = myDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.sum(1);
// 基于事件时间的会话窗口
// 指定水位线策略并提供数据中的时间戳解析规则
SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<MyEvent>forMonotonousTimestamps()
.withTimestampAssigner((e, ts) -> e.getTs())
);
SingleOutputStreamOperator<Tuple2<String, Integer>> eventSessionDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.sum(1);
// eventSessionDS.print();
// processSessionDS.print();
env.execute();
}
}
5.3、计算窗口
public class Demo03CountWindow {
public static void main(String[] args) throws Exception {
// 计数窗口:滚动、滑动
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("master", 8888);
ds.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT))
.keyBy(t2->t2.f0)
.countWindow(5) // 每同一个key的5条数据会统计一次
// .countWindow(10,5)// 每隔同一个key的5条数据统计最近10条数据
.sum(1)
.print();
env.execute();
}
}
四、kafka 搭建及使用
1、Kafka简介
kafka是一个高吞吐的分布式消息系统。生产者负责生产数据 消费者负责消费数据
Kafka架构
- 生产者、Broker、消费者、ZK;
- 注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息
2、kafka特点
高性能:单节点支持上千个客户端,百MB/s吞吐; 持久性:消息直接持久化在普通磁盘上且性能好; 分布式:数据副本冗余、流量负载均衡、可扩展; 很灵活:消息长时间持久化+Client维护消费状态。
3、Kafka性能好的原因
kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写 kafka还用了sendFile的0拷贝技术,提高速度 而且还用到了批量读写,一批批往里写,64K为单位。
4、副本数设定、日志保存时间
一般我们设置成2个或3个,很多企业设置为2个;副本的优势:提高可靠性;副本劣势:增加了网络IO传输。默认保存7天;生产环境建议3天
5、Kafka搭建
5.1、上传解压修改环境变量
解压
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local/soft
mv kafka_2.11-1.0.0 kafka-1.0.0
配置环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
5.2、修改配置文件
vim config/server.properties
broker.id=0 每一个节点broker.id 要不一样
zookeeper.connect=master:2181,node1:2181,node2:2181/kafka
log.dirs=/usr/local/soft/kafka-1.0.0/data 数据存放的位置
5.3、将kafka文件同步到node1,node2
同步kafka文件
scp -r kafka-1.0.0/ node1:pwd
scp -r kafka-1.0.0/ node2:pwd
修改node1、node2中的/etc/profile,增加Kafka环境变量
export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin
在ndoe1和node2中执行source
source /etc/profile
5.4、修改node1和node2中的broker.id
vim config/server.properties
# node1
broker.id=1
# node2
broker.id=2
5.5、启动kafka
1、需要启动zookeeper, kafka使用zk保存元数据
需要在每个节点中执行启动的命令
zkServer.sh start
查看启动的状体
zkServer.sh status
2、启动kafka,每个节点中都要启动(去中心化的架构)
-daemon后台启动
kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties
测试是否成功
#生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic topic01
消费者
--from-beginning 从头消费, 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic topic01
6、 kafka使用
6.1、创建topic
在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic
--replication-factor ---每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量
--partition --分区数, 根据数据量设置
--zookeeper zk的地址,将topic的元数据保存在zookeeper中
kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic topic01
6.2、查看topic描述信息
kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181/kafka --topic topic01
6.3、获取所有topic
kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka
6.4、创建控制台生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic topic01
6.5、创建控制台消费者
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node3:9092 --from-beginning --topic topic01
版权归原作者 Act-F 所有, 如有侵权,请联系我们删除。