0


2024.1.15 Spark 阶段原理,八股,面试题

1. 简述什么是Spark?

spark是一款大数据统一分析引擎,底层数据结构是RDD

2. 简述Spark的四大特点

速度快(线程,基于内存的rdd,高效api)

易用性(多种语言python,java,scala等)

通用性(有sparksql,mlib等安装包)

兼容性(适配不同的资源调度,存储工具,可运行在多个系统中)

3. 简述Spark比Mapreduce执行效率高的原因

mapreduce是基于进程执行的,消耗资源多运算慢;

使用磁盘进行计算,反复IO读写效率低下;

API较为原始低级,实现复杂的API需要写很复杂的代码;

Spark基于线程做数据处理,创建时所需要的资源更少,运行速度更快;

引入了新的数据结构-RDD弹性分布式数据集,使得Spark可以基于内存进行数据处理,读写速度相较于磁盘更快

Spark提供了更丰富的编程API,能够轻松的实现功能开发;

4. 简述Spark on Yarn的两种部署模式的区别和特点

两种方式分别是client客户端模式和cluster集群模式

两种方法的本质区别是driver进程运行的地方不一样

Client部署方式:Driver进程运行在你提交程序的那台机器上

    优点是日志和运行结果都输出到了提交的那台机器上,方便查看结果

    缺点是Driver进程和Yarn集群可能不在同一个集群中,会导致Driver进程和Excutor进程间进行数据交换的时候,效率较低

    场景一般在开发和测试环境中使用

Cluster部署方式:Driver进程运行在集群中某台节点上

    优点是Driver进程和Yarn集群在同一个集群中,Driver进程和Excutor进程间进行数据交换的时候效率比较高

    缺点是查看日志与运行结果需要在18080或者8088的页面中进行查看

    一般在生产环境中使用

5. Spark底层工作原理是怎样的

    DAGScheduler:DAG调度器,将job任务形成DAG有向无环图和划分Stage阶段;

            TaskScheduler:Task调度器,将Task线程分配给具体的Executor执行;

以client on spark 为例

  1. 提交spark程序,在哪里提交程序,就在哪里启动Driver进程;

  2. 由于Driver进程是java与scala语言编写的,无法直接执行python代码,所以需要将创建SparkContext对象的代码基于PY4J转为java后再创建对象;

     2.1 Driver进程启动后,底层PY4J创建SparkContext顶级对象,同时还会创建DAGscheduler和TaskSchduler;                
    
  3. Driver连接Master,根据资源的配置,向master申请资源来创建Executor

  4. Master接收到资源申请,进行资源分配,分配的原则采用FIFO先进先出规则,制定资源分配方案并返回给Driver;

  5. Driver连接到对应的worker从节点上,占用相应的资源,通知worker启动Excutor,启动后将信息反向注册回Driver;

  6. Driver开始处理代码

     6.1 Driver加载RDD相关的算子,根据算子间的依赖绘制DAG有向无环图和划分Stage阶段(一个Spark应用程序遇到Action算子后,就会触发一个Job任务的产生,Job任务会将它所依赖的所有算子都加载进来,形成一个Stage; 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中; 如果遇到宽依赖,就经历shuffle阶段,划分形成新的Stage,最后一直回溯完成)
    
     6.2 之后Driver需要确定任务分配给哪些Excutor进行执行,首先确定每个Stage阶段有多少个Task线程,将众多的Task线程放到Taskset集合中,DAG调度器将TaskSet集合给到Task调度器,Task调度器拿到Taskset集合以后,将Task分配给到具体Executor执行,底层是基于SchedulerBackend调度队列来实现的,
    
     6.3 Driver通知对应的Executor进程来执行相应的任务
    
     6.4 Executor开始执行具体的任务,因执行的是python函数,因此会调用服务器上的python解释器,将py函数和输入数据传输到python解释器,执行完后会将数据返回给Executor进程;
    
     6.5 Executor在运行过程中,会判断是否需要将结果返回给Driver进程,如果需要就返回,如不需要就直接输出;
    
     6.6 Driver会定时检查多个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束;
    
  7. Driver调用sparkContext.stop()代码,通知Master回收资源,整个程序运行结束;

6. RDD算子分成了哪几类,各自的特点是什么?

分成两类: Transformation算子和Action算子
Transformation算子:返回值是一个新的RDD;该算子运行后会不会立即执行,需要配合Action算子触发。
Action算子:返回值是None或者非RDD数据类型;算子运行后会立即执行,并且把之前的Transformation算子一并运行。

7. RDD的五大特性和五大特点

五大特性:

    1. RDD由一系列分区组成

    2. RDD计算相当于对RDD每个分区做计算

    3. RDD之间有宽窄依赖;

    4. KeyValue型RDD可以自定义分区;

    5. 尽量让计算程序靠近数据源,移动数据不如移动计算程序;

五大特点:

    1. 分区:RDD的分区是逻辑上的分区,并不是直接对数据进行分区操作,因为RDD本身不存储数据;

    2. 只读:RDD只读的,对其进行增加改变本质都是创建新的RDD

    3. 依赖: 存在宽窄依赖

    4. 缓存: 如果在程序中多次使用同一个RDD可以将其缓存起来,该RDD只有第一次计算时会根据血缘关系得到分区的数据.

    5. checkpoint检查点: 与缓存类似,但可以持久化保存.

8. RDD中的重分区算子,以及各自特点?

重分区算子有:reparation , coalesce , Partitions by

repartition:调整RDD的分区数,得到一个新RDD,既可以增大也可以减小分区数,但都会触发shuffle.

coalesce: 默认只能减小分区,减小的过程中不会触发shuffle,如果将参数2的shuffle改为True也可以增大分区,但会触发shuffle

partitions by: 主要是针对kv类型的RDD进行重分区操作,可以增大也可以减少,但都会shuffle,用户自定义函数fn来指定分区方案.

9. mapPartitions和foreachPartitions分区算子,相对map和foreach有什么优点?

优点:每次都对一整个分区进行操作,减少分区调用操作的次数,减少资源消耗,而且可以对分区内数据批量操作,提高效率.适用于文件的打开和关闭、数据库的连接和关闭等有反复消耗资源的操作.

10. 简述Spark持久化中缓存和checkpotin检查点的区别

区别有4点:

  1. 主要作用

     缓存是为了提升Spark程序运算效率.
    
     检查点是为了提升Spark程序容错性.
    
  2. 存储位置

     缓存存储在内存或磁盘中,或Executor的堆外内存中.
    
     检查点存储在磁盘或Hdfs中
    
  3. 生命周期

     缓存在程序结束或手动调用unpersist后被删除
    
     检查点可永久保存在HDFS上,除非手动删除
    
  4. 血缘关系

     缓存不会切断RDD的血缘关系,因为缓存是不稳定的,如果发生故障,可以从头运行RDD
    
     检查点会切断RDD的血缘关系,因为保存在安全的HDFS上,认为不会丢失
    

一同使用时先设置缓存再设置检查点,可以减少一次IO过程

11. 简述DAG和Stage形成过程

在spark的运行过程中,遇到了一个action算子会生成一个job

一个job将action算子和其依赖的其他算子聚合起来形成一个stage

然后会向前回溯rdd算子,如果没有shuffle阶段(产生宽依赖)就把他们

放在一个stage里面,如果有shuffle发生(产生宽依赖)就会划分一个新的

stage放置这些算子

12. 简述Job调度流程

1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行

2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器

3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。

4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束

13. 简述SparkSQL和Hive的对比

  1. hive只能写SQL,spark既可以写SQL又可以写代码

  2. hive有元数据存储metastore,spark需要手动维护元数据

  3. hive的运行基于磁盘,spark的运行基于内存

  4. hive的计算引擎是MapReduce,spark基于spark RD

相同点:

1都是大数据分布式处理架构

2都是处理结构化数据

3都可以用yarn做集群资源调度

14. 创建得到DataFrame的方式有哪些,适用于什么场景?

1.通过RDD得到一个DataFrame

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

2.内部初始化数据得到DataFrame

场景:一般用在开发和测试中。因为只能处理少量的数据

  1. 读取外部文件得到DataFrame

text , json , csv

csv常设置的参数有path\header\sep\inferschema\encoding等

15. SparkSQL中数据清洗的API有哪些,各自作用是什么?

Drop duplicate 去重: 没有指定subset就默认一整行完全一样才会删除,指定了就将范围限定到指定字段.

dropna去空 : 没有指定参数,那只要有一个是null的那整行就被删除, 可以指定参数thresh,只有当null数量大于了thresh数才会删除整行.

fillna填充替换 : DF.fillna(value={"name":"未知姓名","age":100}).show() , value必须传递参数,用于填充缺失值,subset限定缺失值替换范围。如果不是字典,那么只会替换字段类型匹配的空值,最常用的是value传递字典的形式。

16. 设置SparkSQL的shuffle分区数的方式有哪几种?

shuffle分区数量默认200个,手动调整的方式如下:

  1. 全局设置 : spark.sql.shuffle.partitions 数量

  2. 动态分区: 在客户端通过submit命令提交的时候,动态设置shuffle分区数量,部署上线时,基于spark-submit提交运行的时候:

                             ./spark-submit --conf"spark.sql.shuffle.partition=数量"
    
  3. 写死 : SparkSession.conf.set('spark.sql.shuffle.partitions',数量)

17. 简述基于Pandas实现UDF和UDAF函数的步骤?

sparksql原生只能udf,借助第三方工具可以实现udaf

UDF步骤:

1.创建python自定义函数

    要求输入类型和返回值类型都必须是pandas中的series类型

2.注册进SparkSQL

    方式一:udf对象 = spark.udf.register(参数1,参数2)

    方式二:udf对象 = F.pandas_udf(参数1,参数2)

    方式三:语法糖装饰器写法

            @F.pandas_udf(returnTyep = 返回值SparkSQL的数据类型)放置到对应的python函数上

3.在代码中使用


UDAF步骤:

1.创建python自定义函数

    要求输入参数类型是pandas中的Series对象,返回python中的标量数据类型

2.注册进SparkSQL

    方式一: udf对象= spark.udf.resiter(参数1,参数2)

    方式二: udf对象 = F.pandas_udf(参数1,参数2)

    方式三: 语法糖装饰器写法

            @ F.pandas_udf(returnType= 返回值sparksql的数据类型) 放置到对应的python函数上

3.在代码中使用

18. 简述SParkSQL函数的分类

  1. UDF 一进一出,split等

  2. UDAF 多进一出, 聚合函数 count,sum等

  3. UDTF 一进多出, 表生成函数, explode炸裂函数等

19. 简述SparkSQL底层工作流程

如何将sparksql翻译成rdd的,基于catalys优化器来实施

  1. 当catalys接收到客户端的代码,会先校验语法,通过后会根据执行顺序,生成未解析的逻辑计划(ats抽象语法树)

  2. 对于ats抽象语法树加入元数据信息,确定一共涉及到哪些字段,字段的类型是什么,以及表其他相关元数据信息,加入元数据信息后,得到了未优化的逻辑计划.

  3. 对未优化的逻辑计划执行优化操作,优化是通过优化器来执行的,在优化器匹配相对应的优化规则,Sparksql底层提供了一两百个优化规则,如:

     谓词下推:也叫作断言下推,将数据过滤操作提前到数据扫描的时候进行,减少后续操作的数据量,提升效率;
    
     列式裁剪:不加载与数据分析无关的字段,减少后续处理的数据量,提升效率;
    
  4. 由于优化规则很多,导致会得到多个优化的逻辑计划,在转换为物理执行计划的过程中,会根据成本模型(运行耗时,资源消耗等)得到一个最优的物理执行计划;

  5. 将物理执行计划通过code generation(代码生成器),转换成Spark RDD的代码;

  6. 最后就是将SparkRDD代码部署到集群上运行

20. 简述消息队列的应用场景

  1. 应用解耦合

  2. 异步处理

  3. 限流削峰

  4. 消息驱动系统

21. 简述Kafka的架构

架构中的角色:

  1. Producer生产者:负责将信息/数据发送到kafka中

  2. consumer消费者:负责将信息从kafka中取出

  3. broker : kafka集群中的节点,节点与节点之间没有主从之分

  4. topic : 主题,是业务层面对消息进行分类的,一个topic可以有多个分区,分区数量没有限制

  5. partiton分区: 一个分区可以有多个副本,副本的数量不超过broker集群的数量

  6. leader 主副本: leader主副本会主动将信息副本发送到follwer从副本上

  7. Follower从副本: Follower从副本被动接收Leader主副本发送过来的信息副本

  8. Zookeeper:用来管理Kafka集群,管理信息的元数据

  9. ISR同步列表: 存储和Leader主副本信息差距最小的一个副本,当leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follwer从副本变成Leader主副本,对外提供服务

22. 简述Kafka 之所以具有高速的读写性能,主要有哪几个原因

  1. 顺序写入磁盘 相比于随机写入,顺序写入磁盘的性能更高,减少了磁盘寻址时间,减少了IO操作的次数,提高了写入的速度

  2. 零拷贝技术 直接从数据源进行读取,不用经过中间的数据拷贝过程,减少了网络IO和磁盘IO等过程,提升了效率

  3. 分布式架构 分布式的架构实现了扩展与负载均衡,提高了整体的吞吐量和并发处理能力

23. 简述Kafka的分区和副本机制

分区机制:

    1- 避免单台服务器容量的限制,分多个区可以避免单个分区的数据过大导致服务器无法存储

    2- 提升topic的吞吐量,数据读写速度,利用多台服务器的数据读写能力,网络等资源

    3- 分区的数量没有限制,但尽量不要超过kafka集群中broker节点个数的3倍

副本机制:

    1-通过多副本机制,提升数据的安全性,但副本过多会导致冗余过多

    2- 副本数量有限制,不可超过kafka集群中broker节点个数,推荐分区的副本数量为1-3个

24. 简述kafka中生产者数据分发策略

生产者产生的消息,是如何保存到具体分区上的

    1- 随机分发策略

            将消息发到到随机的某个分区上,还是发送到Leader主副本上。Python支持,Java不支持

            当在发送数据的时候, 如果只传递了topic 和 value,没有指定key的时候, 那么此时就采用随机策略

    2- 指定分区策略

            将消息发到指定的分区上面。Python支持,Java支持

            当在发送数据的时候, 如果指定了partition参数, 表示的采用指定分区的方案, 分区的编号从0开始

            当指定了partition的参数后, 与DefaultPartitioner没有任何的关系

    3- Hash取模策略

            对消息的key先取Hash值,再和分区数取模。Python支持,Java支持

            当在发送数据的时候, 如果传递了topic 和 value 以及key的时候, 那么此时就是采用hash取模策略

            注意: 相同key的返回的hash值是一致的, 同样对应分区也是同一个。也就是要注意数据倾斜的问题

    4- 轮询策略

            在kafka的2.4及以上版本,已经更名为粘性分发策略,python不支持,java支持

    5- 自定义分发策略

            Python支持,Java支持

            参考源代码DefaultPartitioner模仿写即可

JAVA中的轮询分发策略 和 粘性分发策略介绍

    1- 轮询分发策略:kafka老版本的策略,当生产数据的时候,只有value但是没有key的时候,采用轮询

                优点: 可以保证每个分区拿到的数据基本是一样,因为是一个一个的轮询的分发
                 缺点: 如果采用异步发送方式,意味着一批数据发送到broker端,由于是轮询策略,会将这一批数据拆分为多个小的批次,分别再写入到不同的分区里面去,

                            写入进去以后,每个分区都会给予响应,会影响写入效率。
 

    2- 粘性分发策略: kafka2.4版本及以上的策略,当生产数据的时候,只有value但是没有key的时候,采用粘性分发策略

                优点: 在发送数据的时候,首先会随机的选取一个分区,然后尽可能将数据分发到这个分区上面去,也就是尽可能粘着这个分区。该分发方式,

                            在异步发送的操作中,效率比较高。
                 缺点: 在数据发送特别快的时候,可能会导致某个分区的数据比其他分区数据多很多,造成大量的数据集中在一个分区上面

25. 简述消息存储机制和查询机制

消息存储机制

Kafka集群中的消息存储在一组称为“分区”的逻辑日志上。每个主题可以分成多个分区,每个分区都有一个唯一的标识符和一组不断增加的有序消息。这些分区可以分布在不同的Kafka节点上,以实现负载均衡和可伸缩性。

在每个Kafka节点上,每个分区都被存储为一个或多个文件(称为日志段),这些文件包含了该分区的所有消息。当消息被写入时,它们会被追加到最后一个日志段。当日志段达到一定大小(通过broker端参数log.segment.bytes进行配置)或时间(通过broker端参数log.segment.ms进行配置)时,将会创建一个新的日志段,原来的日志段将会被关闭。这种设计使得Kafka能够高效地追加消息,并且可以轻松地删除旧数据,同时保证消息的持久性和可靠性。此外,Kafka还提供了复制机制来确保数据的容错性和高可用性。

1-xx.log和xx.index它们的作用是什么? 答: xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。 xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度

2-xx.log和xx.index文件名称的意义? 答: 这个数字是xx.log文件中第一条消息的offset(偏移量)。offset偏移量从0开始编号。

3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储? 答: 1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源 2- 在一个大的文件中,检索内容也会非常消耗资源 3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过server.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)


查询机制:消费者在消费的时候,是如何找到对应offset偏移量的消息的

查询步骤:

1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中

2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围

3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)

4- 最终就获取到了具体的消息内容

26. Kafka消费者的负载均衡机制

Kafka集群中每分钟新产生400条数据,下游的一个消费者每分钟能够处理400条数据。

随着业务发展,Kafka集群中每分钟新产生1200条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。可以增加消费者数量,并且将这些消费者放到同一个消费组当中

随着业务发展,Kafka集群中每分钟新产生1600条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。再增加消费组中消费者的个数已经无法解决问题。

如何解决: 1- 增加消费组中消费者的个数 2- 提高下游消费者对消息的处理效率

Kafka消费者的负载均衡机制

1- 在同一个消费组中,消费者的个数最多不能超过Topic的分区数。如果超过了,就会有一些消费者处于闲置状态,消费不到任何数据。

2- 在同一个消费组中,一个Topic中一个分区的数据,只能被同个消费组中的一个消费者所消费,不能被同个消费组中多个消费者所消费。但是一个消费组内的一个消费者可以消费多个分区的数据。也就是分区和消费者的对应关系,多对一

3- 不同的消费组中的消费者,可以对一个Topic的数据同时消费,也就是不同消费组间没有任何关系。也就是Topic的数据能够被多个消费组中的消费者重复消费。

补充:

查看消费组中有多少个消费者,用来避免消费者个数超过分区个数。

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_1 --members --describe

27. Kafka如何保证数据不丢失

生产者保证数据不丢失:

    生产者端将消息发给Kafka集群后,broker要给生产者响应信息,响应原理就是ack机制

    ack机制有三个参数,分别是 0,1,-1

    0:生产给到集群,生产者不等待不接收broker返回的响应信息

    1: 生产给到集群,集群中的分区对应leader主副本所在的broker给生产者返回响应信息

    -1: 生产给到集群,集群中的分区对应的所有副本给生产者返回响应信息

    效率级别 0>1>-1

    安全级别 -1>1>0

            根据安全和效率的要求选择ack参数配置

Broker端如何保证数据不丢失:

    broker通过多副本机制保证数据不丢失,同时需要生产者将ack设置为-1,安全级别最高

消费端如何保证数据不丢失:

    消费者消费消息的步骤:

    1- 消费者首先连接到kafka集群中,进行消息消费

    2- Kafka集群接收到消费者的请求后,会根据消费组id,查找上次消费消息对应的offset偏移量

    3- 如果没有查找到offset,消费者默认从topic最新的地方开始消费

    4- 如果有查找到offset,会从上次消费到的offset地方继续进行消费

            4.1- 首先先确定要读取的这个偏移量在哪个segment文件当中

            4.2- 查询这个segment文件对应的index文件,根据offset确定这个消息在log文件的什么位置,也就是确定消息的物理偏移量

            4.3- 读取log文件,查询对应范围内的数据即可

            4.4- 获取最终的消息数据

    5- 消费者在消费的过程中,底层有个线程会定时的将消费的offset提交给到kafka集群,kafka集群会更新对应的offset的值;

28. Kafka中消费者如何对数据仅且只消费一次?

1- 将消费者的 enable.auto.commit 属性设置为 false,并手动管理消费者的偏移量。这样可以确保消费者在处理完所有消息后才更新偏移量,避免重复消费数据。也就是将消息的消费、消息业务处理代码、offset提交代码放在同一个事务当中。

2- 使用幂等生产者或事务性生产者来确保消息只被发送一次。这样可以避免重复发送消息,从而避免消费者重复消费数据。

3- 在消息中加入唯一的ID

29. 结构化流中Sink输出模式有哪几类,各自特点是什么?

输出模式有三种:append , complete, update

    append模式:

                    只支持追加,不支持聚合和排序,每次只打印追加的内容

                    适用于对数据进行累加计算的场景

    complete模式:

                    每一次都是全量处理,因为数据量大,所以必须聚合,也可以支持排序

                    适用于需要获取完整数据集的场景,不适用无限数据流的场景

    update模式:

                    就是支持聚合的append,有聚合操作,只会输出有变化和新增的内容,不支持排序

                    适用场景需要跟踪数据的变化,实时监控指标的更新等

30. 结构化流中Sink输出终端常见的有哪几类,各自特点是什么?

1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式

2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式

3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式

4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式

5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

31. 结构化流如何处理延迟到来的数据?

Spark结构化流可以通过设置水印(watermark)来处理延迟到来的数据。

水印是一种基于时间的度量,表示数据流中已经处理的最新时间。可以将水印理解为一个延迟阈值,表示在当前时间点之前的所有数据都已经到达,而在此之后的数据可能还未到达。Spark结构化流会根据水印来判断哪些数据已经过期,从而进行数据清理和聚合操作。

具体来说,可以通过以下步骤来设置水印:

  1. 定义事件时间字段:在创建数据流时,需要指定事件时间字段,即数据中表示事件时间的列。
  2. 设置水印生成规则:使用withWatermark()方法来设置水印生成规则,该方法需要指定一个时间间隔作为水印的生成周期。
  3. 处理延迟数据:在数据处理过程中,可以使用window()方法来对数据进行窗口操作,同时使用trigger()方法来设置触发器,以便在水印到达窗口结束时间后触发数据处理。

32. 处理小文件的操作

常规处理小文件的办法:
1- 大数据框架提供的现有的工具或者命令
1.1- hadoop fs -getmerge /input/small_files/*.txt /output/merged_file.txt
1.2- hadoop archive -archiveName myhar.har -p /small_files /big_files

2- 可以通过编写自定义的代码,将小文件读取进来,在代码中输出的时候,输出形成大的文件

wholeTextFiles: 读取小文件。
1-支持本地文件系统和HDFS文件系统。参数minPartitions指定最小的分区数。
2-通过该方式读取文件,会尽可能使用少的分区数,可能会将多个小文件的数据放到同一个分区中进行处理。
3-一个文件要完整的存放在一个元组中,也就是不能将一个文件分成多个进行读取。文件是不可分割的。
4-RDD分区数量既受到minPartitions参数的影响,同时受到小文件个数的影响


本文转载自: https://blog.csdn.net/m0_49956154/article/details/135574814
版权归原作者 白白的wj 所有, 如有侵权,请联系我们删除。

“2024.1.15 Spark 阶段原理,八股,面试题”的评论:

还没有评论