0


技术周总结 2024.07.01~07.07(Spark & Scala)

文章目录

一、07.01 周一

1.1)问题01:scala怎么实现 一个变量作为函数的入参以及返回值

要保持返回的结果变量名为

myVariable

,可以使用

case class

来实现。示例如下:

caseclass MyVariable(myVariable:Int, result:Int)// 函数接受一个变量作为入参,并返回一个包含原始变量名和处理后的值的 case class 实例def processVariable(input:Int): MyVariable ={val processedValue = input *2
    MyVariable(input, processedValue)}val myVariable =10val result = processVariable(myVariable)
println(result.myVariable)// 输出:10
println(result.result)// 输出:20

1.2)问题02:park中DataFrame使用 createView的用法

要将DataFrame注册为临时视图,可以使用

createOrReplaceTempView

方法或

createOrReplaceGlobalTempView

方法。示例如下:

// 创建临时视图
dataFrame.createOrReplaceTempView("temp_view_name")

// 创建全局临时视图
dataFrame.createOrReplaceGlobalTempView("global_temp_view_name")

这样就可以通过SQL语句或Spark SQL操作这些视图。

1.3)问题03:scala中三元运算符

在Scala中,没有类似于其他语言中的三元运算符(例如

condition ? value1 : value2

)。但是,可以使用

if-else

表达式来实现类似的功能。以下是一个示例:

val condition = true
val result = if (condition) "value1" else "value2"
println(result)

在这个示例中,如果

condition

true

,则

result

将被赋值为

"value1"

,否则为

"value2"

1.4) 问题04:伴生对象和伴生类

在 Scala 中,伴生对象和伴生类可以互相访问对方的私有成员。伴生对象和伴生类是同名且在同一文件中的类和对象。

实例

class CompanionExample {privateval privateField:String="I am private in class"}object CompanionExample {def accessPrivateField(example: CompanionExample):String={
    example.privateField  // 伴生对象可以访问类的 private 成员}}object Main {def main(args: Array[String]):Unit={val example =new CompanionExample
    println(CompanionExample.accessPrivateField(example))}}

1.5)问题05:spark-shell的使用

https://www.hadoopdoc.com/spark/spark-shell

spark-shell

查看页面: http://localhost:4040/jobs/

1.6) 问题06:mybatis中 like语句怎么写

在 MyBatis 中,可以使用

${}

占位符来直接拼接字符串,而不使用

CONCAT

函数。下面是一个示例:

<selectid="selectUsersByName"parameterType="string"resultType="User">
    SELECT * FROM users
    WHERE name LIKE '%' || #{searchString} || '%'
</select>

在这个示例中,我们使用

||

运算符来直接拼接字符串,实现在

name

字段中进行模糊匹配,而无需使用

CONCAT

函数。

二、07.02 周二

2.1)问题01:常见的 spark DataFrame的算子有哪些?分类是怎样的

Spark DataFrame 的常见算子可以分为以下几类:

  1. 转换算子(Transformation):- select(): 选择指定的列。- filter(): 根据条件筛选行。- groupBy(): 按列分组。- agg(): 对分组后的数据进行聚合操作。- join(): 连接两个 DataFrame。- sort(): 对数据进行排序。- withColumn(): 添加新列或替换现有列。- drop(): 删除列。- distinct(): 获取唯一值。
  2. 行动算子(Action):- show(): 显示 DataFrame 的内容。- collect(): 将 DataFrame 中的数据收集到本地。- count(): 计算 DataFrame 中的行数。- take(): 获取前几行数据。- foreach(): 对每行数据执行指定操作。- write(): 将 DataFrame 写入外部存储。
  3. I/O 算子:- read(): 从外部数据源读取数据。- write(): 将 DataFrame 写入外部存储。
  4. 窗口函数(Window Functions):- over(): 定义窗口规范,用于在分组数据上执行聚合和排序操作。

这些算子可以帮助在 Spark DataFrame 上进行数据转换、操作和分析,实现复杂的数据处理任务。

三、07.03 周三

3.1)问题01:scala中想把入参修改后,再返回,都有哪些实现方式?

在 Scala 中,函数参数默认是不可变的。这意味着你不能直接在函数内部修改参数的值并返回它们。不过,你可以通过多种方式实现修改参数并返回的效果。以下是几种常见的方法:

1. 使用返回值

最直接的方法是通过返回一个新的值或对象。你可以创建一个新的对象或修改副本,然后返回它。

def modifyAndReturn(x:Int):Int={val modifiedX = x +1
  modifiedX
}val result = modifyAndReturn(5)// result = 6

对于更复杂的数据结构,比如集合或自定义对象,可以返回一个新的修改后的实例:

caseclass Person(name:String, age:Int)def modifyPerson(person: Person): Person ={
  person.copy(age = person.age +1)}val person = Person("Alice",25)val modifiedPerson = modifyPerson(person)// modifiedPerson = Person("Alice", 26)

2. 使用可变对象(Mutable Objects)

虽然 Scala 倾向于使用不可变对象,但你可以使用可变对象来实现参数修改。比如使用

var

或可变集合(如

ArrayBuffer

)。

def modifyArray(arr: Array[Int]): Array[Int]={
  arr(0)= arr(0)+1
  arr
}val array = Array(1,2,3)val modifiedArray = modifyArray(array)// modifiedArray = Array(2, 2, 3)

3. 使用元组(Tuples)

你可以返回一个元组,包含多个修改后的参数值。

def modifyParams(x:Int, y:Int):(Int,Int)={(x +1, y +1)}val(newX, newY)= modifyParams(5,10)// newX = 6, newY = 11

4. 使用隐式类(Implicit Classes)

你可以使用隐式类来增强现有类型的功能,使其支持链式调用。

object IntOps {implicitclass RichInt(val x:Int)extendsAnyVal{def increment:Int= x +1}}import IntOps._

val newX =5.increment  // newX = 6

5. 使用辅助类或对象

如果你需要修改多个参数并返回它们,可以使用辅助类或对象来封装这些参数。

caseclass Params(x:Int, y:Int)def modifyParams(params: Params): Params ={
  params.copy(x = params.x +1, y = params.y +1)}val params = Params(5,10)val newParams = modifyParams(params)// newParams = Params(6, 11)

6. 使用变长参数(Varargs)

如果你需要修改和返回变长参数,可以将它们转换为集合,然后返回修改后的集合。

def modifyParams(params:Int*): Seq[Int]={
  params.map(_ +1)}val newParams = modifyParams(1,2,3)// newParams = Seq(2, 3, 4)

总结

Scala 提供了多种方式来实现修改参数并返回的功能。最常见的方法是通过返回新的值或对象,这符合函数式编程的理念,并且可以避免副作用。根据具体需求,你可以选择最合适的方法来实现你的目标。

3.2)spark中的 DataFrame的操作有哪些优化措施

在 Apache Spark 中,优化 DataFrame 操作可以显著提高数据处理的效率和性能。以下是一些常见的优化措施:

1. 使用 Spark SQL Catalyst Optimizer

Catalyst 是 Spark SQL 的查询优化器,它可以自动进行多种优化,例如谓词下推、列裁剪等。尽可能使用 SQL 风格的查询或 DataFrame API,而不是 RDD 操作,因为 Catalyst 只能优化前者。

2. 缓存和持久化

对频繁使用的中间结果进行缓存或持久化,以避免重复计算。

val df = spark.read.parquet("hdfs://path/to/data")
df.cache()// 或者 df.persist(StorageLevel.MEMORY_AND_DISK)

3. 使用列式存储格式

使用列式存储格式(如 Parquet 或 ORC)来存储数据,这样可以提高磁盘 I/O 性能和数据压缩率。

val df = spark.read.parquet("hdfs://path/to/parquet")

4. 谓词下推(Predicate Pushdown)

在读取数据时,尽量利用数据源支持的谓词下推功能,以减少读取的数据量。

val df = spark.read.parquet("hdfs://path/to/data").filter("age > 30")

5. 列裁剪(Column Pruning)

只选择需要的列,避免读取和处理不必要的数据。

val df = spark.read.parquet("hdfs://path/to/data").select("name","age")

6. 使用广播变量(Broadcast Variables)

对于较小的 DataFrame,可以使用广播变量,将其复制到每个节点以减少数据传输开销。

val smallDf = spark.read.parquet("hdfs://path/to/small_data")val broadcastedDf = broadcast(smallDf)val largeDf = spark.read.parquet("hdfs://path/to/large_data")val joinedDf = largeDf.join(broadcastedDf,"key")

7. 使用适当的分区(Partitioning)

通过合理的分区策略,可以平衡任务的负载,减少数据倾斜。

val df = spark.read.parquet("hdfs://path/to/data")
df.repartition(100, $"keyColumn")// 按照 "keyColumn" 列重新分区

8. 避免过度分区和少量分区

  • 过度分区会导致过多的小任务,增加调度开销。
  • 少量分区会导致任务负载不均,某些任务可能会变得非常重,从而拖慢整体进度。

9. 合理使用

coalesce

repartition
  • repartition 用于增加或减少分区,通常用于增加分区。
  • coalesce 用于减少分区,通常用于减少分区,并且比 repartition 更高效。
val df = spark.read.parquet("hdfs://path/to/data")val repartitionedDf = df.repartition(100)// 增加分区val coalescedDf = df.coalesce(50)// 减少分区

10. 启用动态分区裁剪(Dynamic Partition Pruning)

在 Spark SQL 中启用动态分区裁剪,以在执行时减少不必要的分区扫描。

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled","true")

11. 使用高效的 UDF(User-Defined Functions)

尽量避免使用 UDF,因为它们可能会破坏 Catalyst 的优化。使用内置函数或 SQL 表达式来实现相同的功能。

12. 调整并行度配置参数

调整以下 Spark 配置参数以优化并行度:

  • spark.sql.shuffle.partitions:默认值为 200,可以根据数据量调整。
  • spark.default.parallelism:调整为集群核心数的 2-3 倍。
spark.conf.set("spark.sql.shuffle.partitions","300")

13. 合理使用 window 函数

对于需要进行分组排序的操作,合理使用 window 函数可以提高性能。

importorg.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("partitionColumn").orderBy("orderColumn")val df = df.withColumn("rank", rank().over(windowSpec))

14. 避免重复计算

将中间结果缓存或持久化,避免重复计算。例如,多个步骤使用同一个 DataFrame 的结果。


## 3.3)spark中是两个表各自生成DataFrame后再join的效率高,还是两个表先join生成一个整体的sql,整体生成一个DateFrame的效率高?

在 Spark 中,通常情况下,将两个表各自生成 DataFrame 后再进行 join 操作的效率会更高。这是因为:

1. **分布式计算优化**:Spark 的 Catalyst 优化器可以对 DataFrame 操作进行优化,包括 join 操作。将两个表各自生成 DataFrame 后再 join,可以利用 Catalyst 优化器的各种优化策略,例如广播 join、排序 merge join 等。

2. **延迟计算(Lazy Evaluation)**:Spark 的 DataFrame API 是延迟计算的,这意味着只有在触发行动操作(如 `count`、`collect` 等)时,Spark 才会实际执行计算。在此之前,Spark 会尽可能优化整个计算计划。如果先生成 DataFrame,再进行 join 操作,Spark 有更多机会优化执行计划。

3. **分区和分布控制**:在生成 DataFrame 之后,可以对其进行分区和重新分区,以优化 join 操作的性能。例如,可以使用 `repartition` 或 `coalesce` 来调整分区数,从而减少数据倾斜和网络传输。

4. **利用缓存**:将中间结果缓存起来,可以避免重复计算,提高性能。如果先生成 DataFrame,可以在 join 之前缓存数据。
   ```scala
   val df1 = spark.read.json("table1.json").cache()
   val df2 = spark.read.json("table2.json").cache()
   val joinedDF = df1.join(df2, "key")

不过,具体的性能还取决于很多因素,比如数据规模、表的结构、集群配置等。建议在实际应用中进行性能测试和调优,以选择最优的方案。

以下是两个方法的示例:

  1. 各自生成 DataFrame 后再 joinval df1 = spark.read.json("table1.json")val df2 = spark.read.json("table2.json")val joinedDF = df1.join(df2,"key")
  2. 整体生成一个 SQL,再生成 DataFrameval sqlQuery ="SELECT * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key"val joinedDF = spark.sql(sqlQuery)

一般情况下,第一个方法更容易被 Spark 的优化器优化,但具体选择哪种方法,还是建议进行实际的性能测试。

3.3)sparkUI怎么看

Spark UI 是一个非常有用的工具,用于监视和调试 Spark 应用程序的执行。以下是如何访问和使用 Spark UI 的步骤:

1. 启动 Spark 应用程序

当你启动一个 Spark 应用程序时,无论是通过 Spark Shell、Spark Submit 还是其他方式,Spark UI 都会自动启动并在一个特定的端口上提供服务。默认情况下,Spark UI 在 4040 端口上运行。

例如,通过 Spark Shell 启动一个应用程序:

spark-shell

2. 访问 Spark UI

在浏览器中打开以下 URL:

http://localhost:4040

如果 4040 端口已经被占用,Spark 会使用下一个可用的端口(例如 4041、4042 等)。你可以在启动日志中找到实际使用的端口。

3. Spark UI 的主要组件

一旦你打开 Spark UI,你会看到几个标签页,每个标签页提供了不同的信息:

  • Jobs:显示所有的 Spark 任务(Job),包括每个任务的状态、开始时间、完成时间、执行时间等。
  • Stages:显示所有的阶段(Stage),包括每个阶段的详细信息和执行情况。
  • Storage:显示缓存和持久化的 RDD 和 DataFrame 的信息。
  • Environment:显示 Spark 应用程序的环境信息,包括 Spark 配置、系统属性、类路径等。
  • Executors:显示所有的执行器(Executor)的信息,包括每个执行器的内存使用情况、磁盘使用情况、任务执行情况等。
  • SQL:显示所有执行的 SQL 查询的详细信息。

4. 查看详细信息

你可以点击每个任务(Job)或阶段(Stage)来查看更详细的信息。例如,在 “Stages” 标签页中,点击某个阶段的 ID,可以查看该阶段的详细执行计划、任务(Task)列表、每个任务的执行时间、输入输出数据量等。

5. 监控运行中的应用

如果你的应用程序正在运行,你可以实时监控其执行情况。Spark UI 会动态更新任务和阶段的状态,帮助你了解应用程序的进展和性能。

6. 查看历史应用

如果你的应用程序已经完成,你可以通过 Spark 的历史服务器查看过去的应用程序执行情况。启动 Spark 历史服务器的方法如下:

./sbin/start-history-server.sh

然后在浏览器中访问:

http://localhost:18080

在历史服务器中,你可以查看所有已经完成的应用程序的详细执行情况。

通过 Spark UI,你可以深入了解 Spark 应用程序的执行情况,发现性能瓶颈,优化代码,提高应用程序的效率。

参考链接:
https://www.cnblogs.com/xing901022/p/6445254.html
SparkUI怎么看

3.4)SparkSession.sql()会实际触发查询操作

val sql ="""
CREATE
TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);
"""
context.sparkSession.sql(sql)

这段代码使用Doris插件在Spark中创建一个临时视图.
这个地方sql查询的数据量的大小会影响 DataFrame创建的速度

在这段代码中,实际的数据查询操作是在 context.sparkSession.sql(sql) 这一行中发生的,而不是在创建临时视图时。因此,SQL查询的数据量大小会影响DataFrame创建的速度。

四、07.04 周四

4.1) 问题01:怎么设置log4j日志的格式中包含输出的代码所在的行号

要在日志格式中包含输出的代码所在的行号,可以使用

%L

占位符。修改

log4j.appender.stdout.layout.ConversionPattern

log4j.appender.logfile.layout.ConversionPattern

的配置如下所示:

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m (%L)%n
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m (%L)%n

这样设置后,日志输出的格式中会包含代码所在的行号信息。

五、07.06 周六

5.1) 问题01: 嵌入式处理器 PowerPC750 ,Cortex-M3,MIPS32 24K,帮忙介绍下吧

好的,我来简单介绍一下这几种常见的嵌入式处理器:

  1. PowerPC750:- 属于 PowerPC 架构,由 IBM、Motorola 和 Apple 共同开发。- 32位处理器,主频范围从233MHz到750MHz不等。- 采用超标量流水线架构,具有较强的计算性能。- 广泛应用于工业控制、航天航空、游戏机等领域。
  2. Cortex-M3:- 属于 ARM Cortex-M 系列处理器,专为嵌入式应用设计。- 32位 RISC 结构,主频可达120MHz。- 低功耗、低成本,适用于需要高性能和实时性的嵌入式系统。- 广泛应用于工业控制、医疗设备、家用电器等领域。
  3. MIPS32 24K:- 属于 MIPS 架构,由 MIPS Technologies 公司开发。- 32位处理器,主频范围从200MHz到800MHz不等。- 采用5级流水线,具有较高的处理性能。- 低功耗、低成本,适用于嵌入式系统和消费电子产品。- 广泛应用于路由器、机顶盒、游戏机等领域。

总的来说,这三种处理器都是主流的嵌入式处理器,拥有各自的特点和应用领域。PowerPC750 和 MIPS32 24K 主要面向高性能嵌入式应用,而 Cortex-M3 则更适合低功耗和实时性要求较高的嵌入式系统。具体选择时需要根据具体的应用场景和性能需求进行权衡。

5.2)问题02: Python列表中的两个冒号的含义是什么

在 Python 中,列表中的两个冒号 : 通常用于表示列表的切片操作。具体来说,stack[::-1] 表示从列表的末尾开始,以步长为-1(即反向)的方式获取整个列表的切片。
切片操作可以用来获取列表的子列表、修改列表的部分元素、实现序列的反转等。在这个例子中,通过 stack[::-1] 可以得到 stack 列表的逆序版本。
例如,对于一个列表 list = [1, 2, 3, 4, 5],执行 list[::-1] 将得到 [5, 4, 3, 2, 1]。
这种切片操作的语法是 list[start🔚step],其中:
 start 和 end 是可选的索引,表示切片的起始和结束位置(包含起始位置,不包含结束位置)。如果省略 start,则默认从开头开始;如果省略 end,则默认到结尾结束。
 step 是可选的步长,为正数时表示从左到右取值,为负数时表示从右到左取值,为 0 时表示取单个元素。
通过合理使用切片操作,可以方便地对列表进行各种操作和处理。

5.3)问题03: 那磁盘中的柱面,磁头,扇区都是什么

磁盘(硬盘)是一种数据存储设备,其工作原理和结构比较复杂,涉及多个关键组件和概念。以下是对柱面(Cylinder)、磁头(Head)、扇区(Sector)等术语的解释:

1. 柱面(Cylinder)

柱面是硬盘中的一个逻辑结构。硬盘通常由多个盘片(Platters)组成,每个盘片的两面都有磁道(Track)。所有盘片在相同位置的磁道集合被称为一个柱面。柱面是一个三维概念,可以认为它是沿着硬盘轴心垂直的一个柱形结构。

  • 磁道(Track):每个盘片被划分为若干同心圆,这些同心圆就是磁道。
  • 柱面(Cylinder):所有盘片上具有相同半径的磁道集合。

通过柱面的概念,可以减少磁头在不同盘片间的移动,因为在同一个柱面内,磁头只需上下移动,而不需要沿着盘片的表面移动。

2. 磁头(Head)

磁头是读写硬盘数据的部件。每个盘片的两面都有一个磁头,用于读取和写入数据。磁头悬浮在盘片表面上方,通过电磁感应来读取和写入数据。

  • 数量:磁头的数量等于盘片数乘以2(因为每个盘片有两个面)。
  • 移动:所有磁头通过一个共同的机械臂(通常称为磁头臂或滑动臂)进行移动。当机械臂移动时,所有磁头会同步移动到相应的磁道上。

3. 扇区(Sector)

扇区是硬盘上最小的物理存储单位。每个磁道被划分为若干个扇区,每个扇区通常存储512字节或4096字节的数据。

  • 扇区大小:早期硬盘的扇区大小通常为512字节,但现代硬盘通常使用4096字节的扇区大小(也叫高级格式化)。
  • 寻址:硬盘数据通过称为LBA(逻辑块地址,Logical Block Addressing)的方法进行寻址,每个扇区都有一个唯一的LBA编号。

磁盘的物理结构

硬盘的物理结构可以通过以下几个层次来理解:

  1. 盘片(Platters):硬盘由多个盘片组成,每个盘片上有磁性材料涂层。
  2. 磁道(Track):每个盘片分为多个同心圆环,称为磁道。
  3. 柱面(Cylinder):相同半径的磁道在所有盘片上的集合。
  4. 扇区(Sector):每个磁道被进一步划分为多个扇区,每个扇区存储一定数量的数据。

数据访问过程

当需要读取或写入数据时,磁盘控制器会执行以下步骤:

  1. 确定柱面:根据数据的逻辑地址,确定要访问的柱面。
  2. 移动磁头:将磁头臂移动到相应的柱面位置。
  3. 选择磁头:选择对应盘片上的读写磁头。
  4. 确定扇区:在选定的磁道上找到指定的扇区。
  5. 读取/写入数据:执行读取或写入操作。

示例

假设我们有一个硬盘,包含两个盘片(即四个盘面),每个盘片有2000个磁道,每个磁道有100个扇区,每个扇区存储512字节。

  • 盘片数:2
  • 磁头数:4(每个盘片2个面)
  • 磁道数:2000
  • 每磁道扇区数:100
  • 每扇区字节数:512

如果我们要访问存储在第一个盘片、第1000个磁道、第50个扇区的数据,磁盘控制器会:

  1. 将磁头移动到第1000个磁道(柱面)。
  2. 选择第一个盘片的读写磁头。
  3. 找到第1000个磁道上的第50个扇区。
  4. 读取或写入512字节的数据。
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/xinkuaile/article/details/140107563
版权归原作者 打破砂锅问到底007 所有, 如有侵权,请联系我们删除。

“技术周总结 2024.07.01~07.07(Spark & Scala)”的评论:

还没有评论