点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节完成了如下的内容:
- DataStreamAPI介绍
- 基于文件、Socket、基于集合
- 编写代码进行测试
- Kafka连接器
非并行源
基本介绍
在 Apache Flink 中,非并行源(Non-Parallel Source)是一种特殊的源操作(Source Operator),它的最大并行度被限制为 1。这意味着,无论 Flink 集群中有多少个 Task Manager 和 Slot,该源操作都只能在一个并行实例中运行。这通常用于处理那些不适合并行化的任务或需要集中处理的工作。
主要特点
- 单线程执行:非并行源只能在一个线程中执行,因此不会受益于并行化带来的性能提升。适合需要顺序处理或依赖全局状态的场景。
- 全局状态管理:因为是单线程执行,非并行源可以方便地管理全局状态,而不需要像并行源那样处理多个并行实例间的状态同步问题。
- 实现简单:对于某些简单的数据源,如单个文件读取器、时间戳生成器等,非并行源的实现相对简单,不需要处理复杂的并行和分片逻辑。
使用场景
- 时间戳生成:当需要在流处理作业中引入事件时间(Event Time)时,可以使用一个非并行源来生成时间戳。
- 控制输入:如从一个全局唯一的数据源(例如一个集中式消息队列)读取数据时,通常使用非并行源来确保顺序处理。
- 测试与调试:在开发和调试阶段,非并行源可以用于生成简单的测试数据流。
示例代码
// 创建一个非并行的自定义源publicclassMyNonParallelSourceimplementsSourceFunction<String>{privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(isRunning){
ctx.collect("Non-Parallel Source Data");Thread.sleep(1000);// 模拟数据产生的延迟}}@Overridepublicvoidcancel(){
isRunning =false;}}// 在作业中使用非并行源DataStream<String> stream = env.addSource(newMyNonParallelSource()).setParallelism(1);
在上述示例中,MyNonParallelSource 是一个简单的自定义非并行源,每秒生成一条字符串数据,并且通过 setParallelism(1) 明确指定其并行度为 1。
注意事项
- 性能限制:由于非并行源仅在单个线程中执行,如果数据量较大或需要高吞吐量,可能成为系统的瓶颈。
- 容错与恢复:Flink 提供了检查点机制(Checkpointing)来保证故障恢复时的状态一致性。在使用非并行源时,确保源的状态可以在故障恢复时正确重放。
NoParallelSource
packageicu.wzk;importorg.apache.flink.streaming.api.functions.source.SourceFunction;publicclassNoParallelSourceimplementsSourceFunction<String>{privateLong count =1L;privateboolean running =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(running){
count ++;
ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublicvoidcancel(){
running =false;}}
NoParallelSourceTest
packageicu.wzk;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;publicclassNoParallelSourceTest{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(newNoParallelSource());
data.print();
env.execute("NoParallelSourceTest");}}
运行结果
3>24>35>46>57>68>71>82>93>104>115>126>137>14
运行过程的截图如下所示:
并行源
基本介绍
在 Apache Flink 中,并行源(Parallel Source)是一种可以在多个并行实例中运行的数据源操作。这种源操作允许通过分配多个任务槽(Task Slot)来并行地读取数据,从而提高数据处理的吞吐量和性能。与非并行源相比,并行源更适合处理大规模、可分割的数据源,如分布式文件系统、消息队列、数据库分片等。
主要特点
- 多实例执行:并行源可以通过多个并行实例执行,每个实例处理源数据的一个分片。这种架构允许利用集群中的多个计算资源,从而大大提高数据处理能力。
- 分片处理:并行源通常会将数据源分成多个分片(shard)或分区(partition),每个分片由不同的并行实例处理。这样可以将大量的数据分摊到多个并行实例上,实现更高的处理效率。
- 状态管理:每个并行实例通常会管理自己的状态,而不是像非并行源那样管理全局状态。Flink 提供了状态后端和检查点机制,帮助管理和恢复并行源的状态。
- 横向扩展:由于并行源可以在多个实例中运行,因此随着集群资源的增加(例如增加 Task Manager 和 Slot 的数量),并行源的处理能力也会随之增加。
使用场景
- 分布式文件系统读取:从 HDFS、S3 等分布式文件系统中读取数据时,通常使用并行源将文件分块并分配给不同的并行实例处理。
- 消息队列消费:从 Kafka、RabbitMQ 等消息队列中消费消息时,通常使用并行源来同时处理多个分区的数据。
- 数据库读取:当从分片数据库(例如 MySQL 分片、Cassandra 等)读取数据时,使用并行源可以让多个实例并行读取不同分片的数据。
示例代码
Flink 提供了一些内置的并行源,例如 KafkaSource、Flink’s FileSource 等,这里以 KafkaSource 为例:
// 使用 Flink 内置的 Kafka SourceProperties properties =newProperties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","flink-group");FlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<>("topic-name",newSimpleStringSchema(),
properties
);// 设置 Kafka Source 的并行度DataStream<String> stream = env.addSource(kafkaSource).setParallelism(4);
注意事项
- 数据分区一致性:在使用并行源时,需要确保数据源可以合理分区,并且每个并行实例只处理其分配的分区数据,避免数据重复处理或遗漏。
- 状态恢复:当并行源需要保存状态时,确保状态的正确管理,以便在故障恢复时可以正确地恢复各个并行实例的状态。
- 负载均衡:确保各个并行实例间的负载均衡,避免某些实例过载,而其他实例闲置。
ParallelSource
packageicu.wzk;importorg.apache.flink.streaming.api.functions.source.ParallelSourceFunction;publicclassParallelSourceimplementsParallelSourceFunction<String>{privatelong count =1L;privateboolean running =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(running){
count ++;
ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublicvoidcancel(){
running =false;}}
ParallesSourceTest
packageicu.wzk;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;publicclassParallelSourceTest{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(newParallelSource());
data.print();
env.execute("ParallelSourceTest");}}
运行结果
可以看到运行的速度是非常快的
4>25>21>22>28>23>26>27>26>35>38>37>34>33>32>31>36>4
运行的对应的截图如下所示:
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。