离线数据抽取
写在前面:
此笔记是本人在备战2022年大数据赛项整理出来的,不涉及国赛涉密内容,如点赞收藏理想,我将会把所有模块的笔记开源分享出来,如有想询问国赛经验的可以关注私聊我,我会一一回复。
1. Scala
Scala简介
Scala 是一门满足现代软件工程师需求的语言;它是一门静态类型语言,支持混合范式;它也是一门运行在 JVM 之上的语言,语法简洁、优雅、灵活。Scala 拥有一套复杂的类型系统,Scala方言既能用于编写简短的解释脚本,也能用于构建大型复杂系统。
Scala基础
1. 数据类型
2. 变量和常量的声明
- 定义变量或者常量的时候,也可以写上返回的类型,一般省略,如:val a:Int = 10
- 常量不可再赋值
/**
* 定义变量和常量
* 变量 :用 var 定义 ,可修改
* 常量 :用 val 定义,不可修改
*/var name ="zhangsan"
println(name)
name ="lisi"
println(name)val gender ="m"// gender = "m"//错误,不能给常量再赋值
注意:scala有个原则就是极简原则,不用写的东西一概不写。
定义变量有两种形式
一种是像上面那样用val修饰另一种是var进行修饰
val 定义的变量不可变相当与java中的final
用表达式进行赋值
Val x=1
Val y=if(1>0)1else-1
混和表达式
Val a =if(x>0)1else “jay”
需要注意的是any是所有的父类,相当于java里的objectelse缺失的表达式
val p=if(x>5)1
3. 类和对象
创建类
class Person{val name ="zhangsan"val age =18def sayName()={"my name is "+ name
}}
创建对象
object Lesson_Class {def main(args: Array[String]):Unit={val person =new Person()
println(person.age);
println(person.sayName())}}
apply方法
/**
* object 单例对象中不可以传参,
* 如果在创建Object时传入参数,那么会自动根据参数的个数去Object中寻找相应的apply方法
*/object Lesson_ObjectWithParam {def apply(s:String)={
println("name is "+s)}def apply(s:String,age:Int)={
println("name is "+s+",age = "+age)}def main(args: Array[String]):Unit={
Lesson_ObjectWithParam("zhangsang")
Lesson_ObjectWithParam("lisi",18)}}
伴生类和伴生对象
class Person(xname :String, xage :Int){var name = Person.name
val age = xage
var gender ="m"defthis(name:String,age:Int,g:String){this(name,age)
gender = g
}def sayName()={"my name is "+ name
}}object Person {val name ="zhangsanfeng"def main(args: Array[String]):Unit={val person =new Person("wagnwu",10,"f")
println(person.age);
println(person.sayName())
println(person.gender)}}
注意点:
- 建议类名首字母大写 ,方法首字母小写,类和方法命名建议符合驼峰命名法。
- scala 中的object是单例对象,相当于java中的工具类,可以看成是定义静态的方法的类。object不可以传参数。另:Trait不可以传参数
- scala中的class类默认可以传参数,默认的传参数就是默认的构造函数。
重写构造函数的时候,必须要调用默认的构造函数。
- class 类属性自带getter ,setter方法。
- 使用object时,不用new,使用class时要new ,并且new的时候,class中除了方法不执行(不包括构造),其他都执行。
- 如果在同一个文件中,object对象和class类的名称相同,则这个对象就是这个类的伴生对象,这个类就是这个对象的伴生类。可以互相访问私有变量。
4. This
5. if else
/** * if else / val age =18 if (age < 18 ){ println(“no allow”) }elseif* (18<=age&&age<=20){ println(“allow with other”) }else{ println(“allow self”) }
6. for ,while,do…while
to和until 的用法(不带步长,带步长区别)
/**
* to和until
* 例:
* 1 to 10 返回1到10的Range数组,包含10
* 1 until 10 返回1到10 Range数组 ,不包含10
*/
println(1 to 10)//打印 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
println(1.to(10))//与上面等价,打印 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
println(1 to (10,2))//步长为2,从1开始打印 ,1,3,5,7,9
println(1.to(10,2))
println(1 until 10)//不包含最后一个数,打印 1,2,3,4,5,6,7,8,9
println(1.until(10))//与上面等价
println(1 until (10,3))//步长为2,从1开始打印,打印1,4,7
在scala中,Range代表的是一段整数的范围,官方有关range的api:
http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Range
这些底层其实都是Range,Range(1,10,2):1是初始值,10是条件,2是步长,步长也可以为负值,递减。
until和Range是左闭右开,1是包含的,10是不包含。而to是左右都包含。
for循环
/**
* for 循环
*
*/for( i <-1 to 10){
println(i)}//for循环数组val arr=Array(“a”,”b”,”c”)for(i<-arr)
println(i)
创建多层for循环(高级for循环)
//可以分号隔开,写入多个list赋值的变量,构成多层for循环//scala中 不能写count++ count-- 只能写count+var count =0;for(i <-1 to 10; j <-1 until 10){
println("i="+ i +", j="+j)
count +=1}
println(count);//例子: 打印小九九for(i <-1 until 10;j <-1 until 10){if(i>=j){
print(i +" * "+ j +" = "+ i*j+" ")}if(i==j ){
println()}}
- for循环中可以加条件判断,可以使用分号隔开,也可以不使用分号(使用空格)
//可以在for循环中加入条件判断for(i<-1 to 10;if(i%2)==0;if(i ==4)){ println(i)}
- scala中不能使用count++,count—只能使用count = count+1 ,count += 1
- for循环用yield 关键字返回一个集合(把满足条件的i组成一个集合)val result = for(i <- 1 to 100 if(i>50) if(i%2==0)) yield iprintln(result)
- while循环,while(){},do {}while()
//将for中的符合条件的元素通过yield关键字返回成一个集合val list =for(i <-1 to 10;if(i >5))yield i
for( w <- list ){
println(w)}/**
* while 循环
*/var index =0while(index <100){
println("第"+index+"次while 循环")
index +=1}
index =0do{
index +=1
println("第"+index+"次do while 循环")}while(index <100)
加深练习
需求说明:定义一个数组val a1=Array(1,2,3,4,5,6,7,8,9)把其中的偶数取出。
def main(args: Array[String]):Unit={var a1=Array.range(1,10)for(i<-a1 if(i%2==0)){
println(i)}}}
7. 懒加载
Val lazyVal={println(“I am too lazy”);1}
lazy val lazyVal={println(“I am too lazy”);1}
8. Scala方法与函数
Scala方法的定义
有参方法
无参方法
def fun (a:Int, b:Int):Unit={
println(a+b)}
fun(1,1)def fun1 (a :Int, b :Int)= a+b
println(fun1(1,2))
注意点:
- 方法定义语法 用def来定义
- 可以定义传入的参数,要指定传入参数的类型
- 方法可以写返回值的类型也可以不写,会自动推断,有时候不能省略,必须写,比如在递归方法中或者方法的返回值是函数类型的时候。
- scala中方法有返回值时,可以写return,也可以不写return,会把方法中最后一行当做结果返回。当写return时,必须要写方法的返回值。
- 如果返回值可以一行搞定,可以将{}省略不写
- 传递给方法的参数可以在方法中使用,并且scala规定方法的传过来的参数为val的,不是var的。
- 如果去掉方法体前面的等号,那么这个方法返回类型必定是Unit的。这种说法无论方法体里面什么逻辑都成立,scala可以把任意类型转换为Unit.假设,里面的逻辑最后返回了一个string,那么这个返回值会被转换成Unit,并且值会被丢弃。
方法与函数
定义一个方法:
def method(a:Int,b:Int) =a*b val a =2
method(3,5)
定义一个函数:
Val f1=(x:Int,y:Int)=>x+y
f1 (1,2)
匿名函数
(x:Int,y:Int)=>x+y
在函数式编程语言中,函数是“头等公民”,它可以像任何其他数据类型一样被传递和操作,函数可以在方法中传递。
递归方法
/**
* 递归方法
* 5的阶乘
*/def fun2(num :Int):Int={if(num ==1)
num
else
num * fun2(num-1)}
print(fun2(5))
参数有默认值的方法
- 默认值的函数中,如果传入的参数个数与函数定义相同,则传入的数值会覆盖默认值。
- 如果不想覆盖默认值,传入的参数个数小于定义的函数的参数,则需要指定参数名称。
/**
* 包含默认参数值的函数
* 注意:
* 1.默认值的函数中,如果传入的参数个数与函数定义相同,则传入的数值会覆盖默认值
* 2.如果不想覆盖默认值,传入的参数个数小于定义的函数的参数,则需要指定参数名称
*/def fun3(a :Int=10,b:Int)={
println(a+b)}
fun3(b=2)
可变参数的方法
- 多个参数用逗号分开
/**
* 可变参数个数的函数
* 注意:多个参数逗号分开
*/def fun4(elements :Int*)={var sum =0;for(elem <- elements){
sum += elem
}
sum
}
println(fun4(1,2,3,4))
匿名函数
- 有参匿名函数
- 无参匿名函数
- 有返回值的匿名函数
- 可以将匿名函数返回给val定义的值
/**
* 匿名函数
* 1.有参数匿名函数
* 2.无参数匿名函数
* 3.有返回值的匿名函数
* 注意:
* 可以将匿名函数返回给定义的一个变量
*///有参数匿名函数val value1:(Int)=>Unit=(a :Int)=>{
println(a)}
value1(1)//无参数匿名函数val value2 =()=>{
println("我爱学习")}
value2()//有返回值的匿名函数val value3 =(a:Int,b:Int)=>{
a+b
}
println(value3(4,4))
嵌套方法
/**
* 嵌套方法
* 例如:嵌套方法求5的阶乘
*/def fun5(num:Int)={def fun6(a:Int,b:Int):Int={if(a ==1){
b
}else{
fun6(a-1,a*b)}}
fun6(num,1)}
println(fun5(5))
偏应用函数
偏应用函数是一种表达式,不需要提供函数需要的所有参数,只需要提供部分,或不提供所需参数。
/**
* 偏应用函数
*/def log(date :Date, s :String)={
println("date is "+ date +",log is "+ s)}val date =new Date()
log(date ,"log1")
log(date ,"log2")
log(date ,"log3")//想要调用log,以上变化的是第二个参数,可以用偏应用函数处理val logWithDate = log(date,_:String)
logWithDate("log11")
logWithDate("log22")
logWithDate("log33")
高阶函数
函数的参数是函数,或者函数的返回类型是函数,或者函数的参数和函数的返回类型是函数的函数。
- 函数的参数是函数
- 函数的返回是函数
- 函数的参数和函数的返回是函数
/**
* 高阶函数
* 函数的参数是函数 或者函数的返回是函数 或者函数的参数和返回都是函数
*///函数的参数是函数def hightFun(f :(Int,Int)=>Int, a:Int):Int={
f(a,100)}def f(v1 :Int,v2:Int):Int={
v1+v2
}
println(hightFun(f,1))//函数的返回是函数//1,2,3,4相加def hightFun2(a :Int,b:Int):(Int,Int)=>Int={def f2 (v1:Int,v2:Int):Int={
v1+v2+a+b
}
f2
}
println(hightFun2(1,2)(3,4))//函数的参数是函数,函数的返回是函数def hightFun3(f :(Int,Int)=>Int):(Int,Int)=>Int={
f
}
println(hightFun3(f)(100,200))
println(hightFun3((a,b)=>{a+b})(200,200))//以上这句话还可以写成这样//如果函数的参数在方法体中只使用了一次 那么可以写成_表示
println(hightFun3(_+_)(200,200))
柯里化函数
- 高阶函数的简化
- 定义
- 柯里化(Currying)指的是把原来接受多个参数的函数变换成接受一个参数的函数过程,并且返回接受余下的参数且返回结果为一个新函数的技术。
scala柯里化风格的使用可以简化主函数的复杂度,提高主函数的自闭性,提高功能上的可扩张性、灵活性。可以编写出更加抽象,功能化和高效的函数式代码。
//柯理化object KLH {def main(args: Array[String]):Unit={def klh(x:Int)(y:Int)=x*y
val res=klh(3)(_)
println(res(4))}}/**
* 柯里化函数
*/def fun7(a :Int,b:Int)(c:Int,d:Int)={
a+b+c+d
}
println(fun7(1,2)(3,4))
2. Spark
spark简介
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
在YARN上运行Spark
配置
大部分为
Spark on YARN
模式提供的配置与其它部署模式提供的配置相同。下面这些是为
Spark on YARN
模式提供的配置。
Spark属性
Property NameDefaultMeaningspark.yarn.applicationMaster.waitTries10ApplicationMaster等待Spark master的次数以及SparkContext初始化尝试的次数spark.yarn.submit.file.replicationHDFS默认的复制次数(3)上传到HDFS的文件的HDFS复制水平。这些文件包括Spark jar、app jar以及任何分布式缓存文件/档案spark.yarn.preserve.staging.filesfalse设置为true,则在作业结束时保留阶段性文件(Spark jar、app jar以及任何分布式缓存文件)而不是删除它们spark.yarn.scheduler.heartbeat.interval-ms5000Spark application master给YARN ResourceManager发送心跳的时间间隔(ms)spark.yarn.max.executor.failuresnumExecutors * 2,最小为3失败应用程序之前最大的执行失败数spark.yarn.historyServer.address(none)Spark历史服务器(如host.com:18080)的地址。这个地址不应该包含一个模式(http://)。默认情况下没有设置值,这是因为该选项是一个可选选项。当Spark应用程序完成从ResourceManager UI到Spark历史服务器UI的连接时,这个地址从YARN ResourceManager得到spark.yarn.dist.archives(none)提取逗号分隔的档案列表到每个执行器的工作目录spark.yarn.dist.files(none)放置逗号分隔的文件列表到每个执行器的工作目录spark.yarn.executor.memoryOverheadexecutorMemory * 0.07,最小384分配给每个执行器的堆内存大小(以MB为单位)。它是VM开销、interned字符串或者其它本地开销占用的内存。这往往随着执行器大小而增长。(典型情况下是6%-10%)spark.yarn.driver.memoryOverheaddriverMemory * 0.07,最小384分配给每个driver的堆内存大小(以MB为单位)。它是VM开销、interned字符串或者其它本地开销占用的内存。这往往随着执行器大小而增长。(典型情况下是6%-10%)spark.yarn.queuedefault应用程序被提交到的YARN队列的名称spark.yarn.jar(none)Spark jar文件的位置,覆盖默认的位置。默认情况下,Spark on YARN将会用到本地安装的Spark jar。但是Spark jar也可以HDFS中的一个公共位置。这允许YARN缓存它到节点上,而不用在每次运行应用程序时都需要分配。指向HDFS中的jar包,可以这个参数为"hdfs:///some/path"spark.yarn.access.namenodes(none)你的Spark应用程序访问的HDFS namenode列表。例如,spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032,Spark应用程序必须访问namenode列表,Kerberos必须正确配置来访问它们。Spark获得namenode的安全令牌,这样Spark应用程序就能够访问这些远程的HDFS集群。spark.yarn.containerLauncherMaxThreads25为了启动执行者容器,应用程序master用到的最大线程数spark.yarn.appMasterEnv.EnvironmentVariableName添加通过EnvironmentVariableName指定的环境变量到Application Master处理YARN上的启动。用户可以指定多个该设置,从而设置多个环境变量。在yarn-cluster模式下,这控制Spark driver的环境。在yarn-client模式下,这仅仅控制执行器启动者的环境。
在YARN上启动Spark
确保
HADOOP_CONF_DIR
或
YARN_CONF_DIR
指向的目录包含Hadoop集群的(客户端)配置文件。这些配置用于写数据到dfs和连接到YARN ResourceManager。
有两种部署模式可以用来在YARN上启动Spark应用程序。在yarn-cluster模式下,Spark driver运行在application master进程中,这个进程被集群中的YARN所管理,客户端会在初始化应用程序之后关闭。在yarn-client模式下,driver运行在客户端进程中,application master仅仅用来向YARN请求资源。
和Spark单独模式以及Mesos模式不同,在这些模式中,master的地址由"master"参数指定,而在YARN模式下,ResourceManager的地址从Hadoop配置得到。因此master参数是简单的
yarn-client
和
yarn-cluster
在yarn-cluster模式下启动Spark应用程序。
./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]
例子:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
以上启动了一个YARN客户端程序用来启动默认的 Application Master,然后SparkPi会作为Application Master的子线程运行。客户端会定期的轮询Application Master用于状态更新并将更新显示在控制台上。一旦你的应用程序运行完毕,客户端就会退出。
在yarn-client模式下启动Spark应用程序,运行下面的shell脚本
$ ./bin/spark-shell --master yarn-client
添加其它的jar
在yarn-cluster模式下,driver运行在不同的机器上,所以离开了保存在本地客户端的文件,
SparkContext.addJar
将不会工作。为了使
SparkContext.addJar
用到保存在客户端的文件,在启动命令中加上
--jars
选项。
$ ./bin/spark-submit --class my.main.Class \
--master yarn-cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
注意事项
在Hadoop 2.2之前,YARN不支持容器核的资源请求。因此,当运行早期的版本时,通过命令行参数指定的核的数量无法传递给YARN。在调度决策中,核请求是否兑现取决于用哪个调度器以及如何配置调度器。
Spark executors使用的本地目录将会是YARN配置(yarn.nodemanager.local-dirs)的本地目录。如果用户指定了
spark.local.dir
,它将被忽略。
--files
和
--archives
选项支持指定带 # 号文件名。例如,你能够指定
--files localtest.txt#appSees.txt
,它上传你在本地命名为
localtest.txt
的文件到HDFS,但是将会链接为名称
appSees.txt
。当你的应用程序运行在YARN上时,你应该使用
appSees.txt
去引用该文件。
如果你在yarn-cluster模式下运行
SparkContext.addJar
,并且用到了本地文件,
--jars
选项允许
SparkContext.addJar
函数能够工作。如果你正在使用 HDFS, HTTP, HTTPS或FTP,你不需要用到该选项。
版权归原作者 woshinsy 所有, 如有侵权,请联系我们删除。