0


Spark 从入门到精通

Spark 从入门到精通

环境搭建

准备工作

创建安装目录
mkdir /opt/soft
cd /opt/soft
下载scala
wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
tar -zxvf scala-2.13.10.tgz
修改scala目录名称
mv scala-2.13.10 scala-2
下载spark
wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz 
修改目录名称
mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
vim /etc/profile
exportJAVA_HOME=/opt/soft/jdk8
exportCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

exportZOOKEEPER_HOME=/opt/soft/zookeeper

exportHADOOP_HOME=/opt/soft/hadoop3

exportHADOOP_INSTALL=${HADOOP_HOME}exportHADOOP_MAPRED_HOME=${HADOOP_HOME}exportHADOOP_COMMON_HOME=${HADOOP_HOME}exportHADOOP_HDFS_HOME=${HADOOP_HOME}exportYARN_HOME=${HADOOP_HOME}exportHADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

exportHDFS_NAMENODE_USER=root
exportHDFS_DATANODE_USER=root
exportHDFS_SECONDARYNAMENODE_USER=root
exportYARN_RESOURCEMANAGER_USER=root
exportYARN_NODEMANAGER_USER=root

exportHIVE_HOME=/opt/soft/hive3
exportHCAT_HOME=/opt/soft/hive3/hcatalog

exportSQOOP_HOME=/opt/soft/sqoop-1

exportFLUME_HOME=/opt/soft/flume

exportHBASE_HOME=/opt/soft/hbase2

exportPHOENIX_HOME=/opt/soft/phoenix

exportSCALA_HOME=/opt/soft/scala-2

exportSPARK_HOME=/opt/soft/spark3
exportSPARKPYTHON=/opt/soft/spark3/python

exportPATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
source /etc/profile

Local模式

scala java
启动
spark-shell

sparkl local spark-shell

页面地址:http://spark01:4040

![sparkl local spark-shell

退出
:quit

sparkl local spark-shell

pyspark
启动
pyspark

spark local pyspark

页面地址:http://spark01:4040

spark local pyspark

退出
quit() or Ctrl-D

spark local pyspark

本地模式提交应用

在spark目录下执行

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2]\
./examples/jars/spark-examples_2.13-3.4.0.jar \10
  1. –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  2. –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
  3. spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

Standalone模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
exportJAVA_HOME=/opt/soft/jdk8
exportHADOOP_HOME=/opt/soft/hadoop3
exportHADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
exportJAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

exportSPARK_MASTER_HOST=spark01
exportSPARK_MASTER_PORT=7077exportSPARK_WORKER_MEMORY=4g
exportSPARK_WORKER_CORES=4exportSPARK_MASTER_WEBUI_PORT=6633
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
exportSPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

spark standlone webui

http://spark01:18080

spark standlone history server

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \10

spark standlone webui

spark standlone history server

HA模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
exportJAVA_HOME=/opt/soft/jdk8
exportHADOOP_HOME=/opt/soft/hadoop3
exportHADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
exportJAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181 
-Dspark.deploy.zookeeper.dir=/spark3"exportSPARK_WORKER_MEMORY=4g
exportSPARK_WORKER_CORES=4exportSPARK_MASTER_WEBUI_PORT=6633
hdfs dfs -mkdir /spark3
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
exportSPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

spark ha webui

http://spark01:18080

spark ha history server

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \10
提交作业到Yarn
bin/spark-submit --master yarn\
--class  org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10

spark ha webui

spark ha history server

spark-code

spark-core

pom.xml
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lihaozhe</groupId><artifactId>spark-code</artifactId><version>1.0.0</version><properties><jdk.version>1.8</jdk.version><scala.version>2.13.10</scala.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><commons-lang3.version>3.12.0</commons-lang3.version><java-testdata-generator.version>1.1.2</java-testdata-generator.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.github.binarywang</groupId><artifactId>java-testdata-generator</artifactId><version>${java-testdata-generator.version}</version></dependency><!-- spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>3.4.0</version></dependency><!-- junit-jupiter-api --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>5.9.3</version><scope>test</scope></dependency><!-- junit-jupiter-engine --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-engine</artifactId><version>5.9.3</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.14.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.5</version></dependency><!-- commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.11.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!-- commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency></dependencies><build><finalName>${project.artifactId}</finalName><!--<outputDirectory>../package</outputDirectory>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.11.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-clean-plugin</artifactId><version>3.2.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.3.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>3.3.2</version></plugin><!-- 编译级别 --><!-- 打包的时候跳过测试junit begin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.2</version><configuration><skip>true</skip></configuration></plugin><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.8.1</version><configuration><scalaCompatVersion>2.13</scalaCompatVersion><scalaVersion>2.13.10</scalaVersion></configuration><executions><execution><goals><goal>testCompile</goal></goals></execution><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
hdfs-conf

在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml

被引入的hdfs配置文件为测试集群配置文件

由于生产环境与测试环境不同,项目打包的时候排除hdfs配置文件

rdd
数据集方式构建RDD
packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo01{publicstaticvoidmain(String[] args){String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(1,2,3,4,5);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);// 将数据获取到本地driverList<Integer> result = distData.collect();// lambda 表达式
            result.forEach(System.out::println);}}}
packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo01 {def main(args: Array[String]):Unit={val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(1,2,3,4,5)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)
    distData.foreach(println)}}
本地文件构建RDD

words.txt

linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
base phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo02{publicstaticvoidmain(String[] args){String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt");// 将数据获取到本地driverList<String> lines = javaRDD.collect();// lambda 表达式
            lines.forEach(System.out::println);}}}
packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo02 {def main(args: Array[String]):Unit={val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distFile = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt")
    distFile.foreach(println)}}
HDFS文件构建RDD
packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.List;/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo03{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");// 将数据获取到本地driverList<String> lines = javaRDD.collect();// lambda 表达式
            lines.forEach(System.out::println);}}}
packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
 * 借助外部文件 External Datasets 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo03 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用HDFS文件系统// val distFile = sc.textFile("hdfs://spark01:8020/data/words.txt")val distFile = sc.textFile("/data/words.txt")
    distFile.foreach(println)}}
转换算子与行动算子

data.csv

person3,137
person7,193
person7,78
person0,170
person5,145
person5,54
person5,150
person0,102
person0,15
person8,172
person6,177
person5,158
person8,30
person6,184
person5,50
person4,127
person1,197
person3,99
person7,2
person7,51
person9,27
person6,34
person0,18
person7,111
person2,34
person0,80
person3,19
person8,121
person1,38
person3,37
person8,69
person3,116
person5,14
person4,121
person7,13
person8,10
person4,67
person6,177
person8,161
person6,113
person5,161
person3,159
person5,161
person2,88
person3,191
person0,155
person4,55
person6,153
person6,187
person0,41
person3,157
person4,179
person4,95
person1,12
person3,109
person9,24
person9,188
person1,114
person7,9
person7,82
person8,47
person9,153
person7,152
person6,110
person2,73
person8,132
person4,175
person7,153
person9,174
person3,23
person3,103
person9,169
person8,98
person6,62
person2,33
person3,127
person1,91
person6,198
person4,28
person1,182
person0,164
person5,198
person7,22
person0,46
person3,5
person8,140
person3,131
person4,195
person7,86
person0,137
person8,152
person8,154
person7,144
person5,142
person9,147
person1,29
person5,113
person6,173
person6,115
person9,148
person2,114
person7,69
person6,192
person0,113
person5,26
person3,7
person1,2
person6,60
person8,38
person6,19
person4,5
person3,50
person9,179
person2,148
person0,23
person3,121
person9,66
person9,90
person4,166
person7,199
person0,79
person2,157
person5,98
person6,25
person1,100
person4,184
person6,124
person4,183
person3,105
person6,28
person5,141
person6,60
person2,108
person5,171
person7,98
person2,57
person9,18
person8,35
person7,141
person0,180
person2,176
person9,130
person2,26
person0,81
person6,144
person3,33
person4,41
person9,60
person1,99
person4,115
person6,83
person2,90
person7,174
person8,47
person5,62
person0,119
person9,99
person3,125
person3,20
person1,137
person9,74
person6,1
person4,140
person4,122
person1,56
person7,107
person9,131
person7,174
person7,191
person8,31
person4,45
person9,84
person6,38
person9,186
person6,89
person5,87
person9,80
person5,107
person3,175
person8,44
person0,114
person7,63
person3,129
person9,77
person9,86
person9,183
person3,61
person4,104
person2,192
person5,142
person4,124
person5,76
person0,187
person3,38
person7,62
person5,153
person9,149
person7,87
person7,27
person6,88
count
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * count 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo01{publicstaticvoidmain(String[] args){String appName ="count";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);long count = distData.count();System.out.println("count = "+ count);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * count 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo01 {def main(args: Array[String]):Unit={val appName ="count"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val count = distData.count()
    println(s"count = ${count}")}}
take
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * take 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo02{publicstaticvoidmain(String[] args){String appName ="take";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);List<Integer> topList = distData.take(3);
            topList.forEach(System.out::println);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * take 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo02 {def main(args: Array[String]):Unit={val appName ="take"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val top = distData.take(3)
    top.foreach(println)}}
distinct
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * distinct 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo03{publicstaticvoidmain(String[] args){String appName ="distinct";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,5,6,7,8,9,3,4,2,4,3);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> uniqueRDD = distData.distinct();List<Integer> uniqueList = uniqueRDD.collect();
            uniqueList.forEach(System.out::println);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * distinct 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo03 {def main(args: Array[String]):Unit={val appName ="distinct"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,5,6,7,8,9,3,4,2,4,3)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val uniqueData = distData.distinct()
    uniqueData.foreach(println)}}
map
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * map 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo04{publicstaticvoidmain(String[] args){String appName ="map";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(1,2,3,4,5);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> rs = distData.map(num -> num *2);List<Integer> list = rs.collect();
            list.forEach(System.out::println);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * map 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo04 {def main(args: Array[String]):Unit={val appName ="map"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(1,2,3,4,5)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val rs = distData.map(_ *2)
    rs.foreach(println)}}
flatMap
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importjava.util.Arrays;importjava.util.Iterator;importjava.util.List;/**
 * flatMap 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo05{publicstaticvoidmain(String[] args){String appName ="flatMap";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<String> data =Arrays.asList("hadoop hive presto","hbase phoenix","spark flink");// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<String> javaRDD = sc.parallelize(data);// javaRDD.flatMap(new FlatMapFunction<String, Object>() {//     @Override//     public Iterator<Object> call(String s) throws Exception {//        return null;//    }// });JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String,String>) line ->Arrays.asList(line.split(" ")).listIterator());List<String> words = wordsRdd.collect();
            words.forEach(System.out::println);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * flatMap 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo05 {def main(args: Array[String]):Unit={val appName ="flatMap"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array("hadoop hive presto","hbase phoenix","spark flink")// 从集合中创建 RDD// Parallelized Collections// ("hadoop","hive","presto","hbase","phoenix","spark","flink")val rs = data.flatMap(_.split(" "))
    rs.foreach(println)}}
filter
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
 * filter 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo06{publicstaticvoidmain(String[] args){String appName ="filter";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> evenRDD = distData.filter(num -> num %2==0);List<Integer> evenList = evenRDD.collect();
            evenList.forEach(System.out::println);}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * filter 算子
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo06 {def main(args: Array[String]):Unit={val appName ="filter"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val evenData = distData.filter(_ %2==0)
    evenData.foreach(println)}}
groupByKey
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
 * groupByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo07{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="groupByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {//     @Override//     public Tuple2<String, Integer> call(String s) throws Exception {//         return null;//    }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();List<Tuple2<String,Iterable<Integer>>> collect = groupRDD.collect();
            collect.forEach(tup ->{System.out.print(tup._1 +" >>> (");
                tup._2.forEach(num ->System.out.print(num +","));System.out.println("\b)");});}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * groupByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo07 {def main(args: Array[String]):Unit={val appName ="groupByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1)))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
    groupData.foreach(println)}}
reduceByKey
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
 * reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo08{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {//     @Override//     public Integer call(Integer integer, Integer integer2) throws Exception {//         return integer + integer2;//     }// });JavaPairRDD<String,Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);List<Tuple2<String,Integer>> collect = reduceRDD.collect();
            collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo08 {def main(args: Array[String]):Unit={val appName ="reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
    sumData.foreach(println)}}
mapValues
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Iterator;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;/**
 * mapValues 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo09{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="mapValues";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {//     @Override//     public Tuple2<String, Integer> call(String s) throws Exception {//         return null;//    }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();JavaPairRDD<String,Double> avgRDD = groupRDD.mapValues(v ->{int sum =0;Iterator<Integer> it = v.iterator();AtomicInteger atomicInteger =newAtomicInteger();while(it.hasNext()){Integer amount = it.next();
                    sum += amount;
                    atomicInteger.incrementAndGet();}return(double) sum / atomicInteger.get();});List<Tuple2<String,Double>> collect = avgRDD.collect();
            collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));//            Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));//            Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();//            Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();//            Map<String, Double> map = new HashMap<>();//            while (it.hasNext()) {//                Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();//                Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);//                long count = entry.getValue().stream().map(tup -> tup._2).count();////                map.put(entry.getKey(), Double.valueOf(sum) / count);//            }//            map.forEach((name, amount) -> System.out.println(name + " >>> " + amount));}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * mapValues 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo09 {def main(args: Array[String]):Unit={val appName ="mapValues"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
    groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f"))
    avgData.foreach(println)}}
sortByKey
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
 * sortByKey reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo10{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="sortByKey reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {//     @Override//     public Integer call(Integer integer, Integer integer2) throws Exception {//         return integer + integer2;//     }// });JavaPairRDD<String,Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);JavaPairRDD<String,Integer> soredRDD = reduceRDD.sortByKey(false);List<Tuple2<String,Integer>> collect = soredRDD.collect();
            collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));}}}
packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Iterator;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;/**
 * sortByKey reduceByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:18
 */publicclassJavaDemo11{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="sortByKey reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
        conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {//     @Override//     public Tuple2<String, Integer> call(String s) throws Exception {//         return null;//    }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();JavaPairRDD<String,Double> avgRDD = groupRDD.mapValues(v ->{int sum =0;Iterator<Integer> it = v.iterator();AtomicInteger atomicInteger =newAtomicInteger();while(it.hasNext()){Integer amount = it.next();
                    sum += amount;
                    atomicInteger.incrementAndGet();}return(double) sum / atomicInteger.get();});JavaPairRDD<String,Double> sortedRDD = avgRDD.sortByKey(false);List<Tuple2<String,Double>> collect = sortedRDD.collect();
            collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));//            Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));//            Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();//            Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();//            Map<String, Double> map = new HashMap<>();//            while (it.hasNext()) {//                Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();//                Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);//                long count = entry.getValue().stream().map(tup -> tup._2).count();////                map.put(entry.getKey(), Double.valueOf(sum) / count);//            }//            map.forEach((name, amount) -> System.out.println(name + " >>> " + amount));}}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * sortByKey  educeByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo10 {def main(args: Array[String]):Unit={val appName ="sortByKey reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
    sumData.foreach(println)val swapData = sumData.map(_.swap)// 参数 true为升序 false为降序 默认为升序val sortData = swapData.sortByKey(ascending =false)
    sortData.foreach(println)}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * sortByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo11 {def main(args: Array[String]):Unit={val appName ="sortByKey reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
    groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size))
    avgData.foreach(println)val swapData = avgData.map(_.swap)// 参数 true为升序 false为降序 默认为升序val sortData = swapData.sortByKey(ascending =false)
    sortData.foreach(v => println(v._2 +","+ v._1.formatted("%.2f")))}}
sortBy
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * sortBy  educeByKey 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo12 {def main(args: Array[String]):Unit={val appName ="sortBy reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
    sumData.foreach(println)// 参数 true为升序 false为降序 默认为升序val sortedData = sumData.sortBy(_._2, ascending =false)
    sortedData.foreach(println)}}
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * sortBy 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo13 {def main(args: Array[String]):Unit={val appName ="sortBy reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
    groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f").toDouble)// 参数 true为升序 false为降序 默认为升序val sortedData = avgData.sortBy(_._2, ascending =false)
    sortedData.foreach(println)}}
join
packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
 * join 算子
 * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 上午10:08
 */object ScalaDemo14 {def main(args: Array[String]):Unit={val appName ="join"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
    conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)val groupData = tupleData.groupByKey()val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f").toDouble)val rsData = sumData.join(avgData)
    rsData.foreach(println)}}
WordCount
JavaWordCount
packagecom.lihaozhe.course03;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Arrays;/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午4:16
 */publicclassJavaWordCount{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="WordCount";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");SparkConf conf =newSparkConf().setAppName(appName);// 本地运行// conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String,String>) line ->Arrays.asList(line.split(" ")).listIterator());JavaPairRDD<String,Integer> javaPairRDD = wordsRdd.mapToPair((PairFunction<String,String,Integer>) word ->newTuple2<>(word,1));JavaPairRDD<String,Integer> rs = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);
            rs.saveAsTextFile("/data/result");
            sc.stop();}}}
ScalaWordCount
packagecom.lihaozhe.course03importorg.apache.spark.{SparkConf, SparkContext}/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午3:51
 */object ScalaWordCount01 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")// val conf = new SparkConf().setAppName("WordCount").setMaster("local")val conf =new SparkConf().setAppName("WordCount")// conf.setMaster("local")val sc =new SparkContext(conf)val content = sc.textFile("/data/words.txt")// content.foreach(println)val words = content.flatMap(_.split(" "))// words.foreach(println)// (love,Seq(love, love, love, love, love))val wordGroup = words.groupBy(word => word)// wordGroup.foreach(println)// (love,5)val wordCount = wordGroup.mapValues(_.size)// wordCount.foreach(println)
    wordCount.saveAsTextFile("/data/result");
    sc.stop()}}
packagecom.lihaozhe.course03importorg.apache.spark.{SparkConf, SparkContext}/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/17 下午3:51
 */object ScalaWordCount02 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")// val conf = new SparkConf().setAppName("WordCount").setMaster("local")val conf =new SparkConf().setAppName("WordCount")// conf.setMaster("local")val sc =new SparkContext(conf)val content = sc.textFile("/data/words.txt")// content.foreach(println)val words = content.flatMap(_.split(" "))// words.foreach(println)// (love,Seq(love, love, love, love, love))val wordMap = words.map((_,1))// wordGroup.foreach(println)// (love,5)val wordCount = wordMap.reduceByKey(_ + _)//wordCount.foreach(println)
    wordCount.saveAsTextFile("/data/result");
    sc.stop()}}
项目打包发布
mvn package

上传jar文件到集群

在集群上提交

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode client

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode cluster

yarn client 模式 spark-submit --master yarn --delpoy-mode client

yarn cluster 模式 spark-submit --master yarn --delpoy-mode cluster

spark-submit --master yarn --class com.lihaozhe.course03.JavaWordCount spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount01 spark-code.jar --deploy-mode cluster
spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount02 spark-code.jar --deploy-mode cluster

sparkSQL

packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * 构建 dataFrame
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo01 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// root//   |-- _c0: string (nullable = true)//   |-- _c1: string (nullable = true)
    df.printSchema()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * show
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo02 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    df.show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * option 是否将第一列作为字段名 header默认值为 false
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read
      .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// root//   |-- name: string (nullable = true)//   |-- amount: string (nullable = true)
    df.printSchema()
    df.show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * select
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")// root//   |-- _c0: string (nullable = true)//   |-- _c1: string (nullable = true)
    df.printSchema()
    df.select("_c0","_c1").show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * withColumnRenamed
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.withColumnRenamed("_c0","name").withColumnRenamed("_c1","amount").printSchema()// root//   |-- name: string (nullable = true)//   |-- amount: string (nullable = true)

    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * col("原字段名").cast(数据类型).as("新字段名"),
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * topN
 * show(n,truncate = false)默认显示前20条记录 numRows 记录数 truncate 显示结果是否裁剪 默认值为true为裁剪 false为不裁剪
 * first() 第一条记录
 * take(n)
 * head(n)
 * tail(n) 最后n条记录
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo07 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read
      .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// df.show(5,truncate = false)// println(df.first())// df.take(3).foreach(println)// df.head(3).foreach(println)
    df.tail(3).foreach(println)
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * where 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo08 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).where("amount > 100").show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * where 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo09 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).where(col("amount")>100).show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * filter 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo10 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).filter("amount > 100").show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * filter 按条件查询
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo11 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).filter(col("amount")>100).show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * groupBy count 分组聚合
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo12 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).groupBy(col("name")).count().show()
    spark.stop()}}packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
 * groupBy count 分组聚合
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo12 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.select(
      col("_c0").cast(StringType).as("name"),
      col("_c1").cast(IntegerType).as("amount"),).groupBy(col("name")).count().show()
    spark.stop()}}
packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * SQLContext
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo13 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.withColumnRenamed("_c0","name").withColumnRenamed("_c1","amount")// 使用 DataFrame 生成一张临时表
    df.createOrReplaceTempView("order_info")// 获取 SQLContext 对象val sqlContext = spark.sqlContext
    sqlContext.sql("select _c0 as name,_c1 as amount from order_info where _c1 > 100").show(false)
    spark.stop()}}
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * DataFrame 与 DataSet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo01 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read
      .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")val ds = df.as[OrderInfo]val rdd = ds.map(orderInfo =>(orderInfo.name, orderInfo.amount.toInt)).rdd
    rdd.reduceByKey(_ + _).foreach(println)
    spark.stop()}}caseclass OrderInfo(name:String, amount:String)
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * 创建 DataSet
 * spark.read.text 方法返回值为 DataFrame
 * spark.read.textFile 方法返回值为 DataSet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo02 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    spark.stop()}}
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 *  直接使用 spark.read.text 或者 spark.read.textFile 读进来的数据 只有一个 String 类型 名字为 value 的值
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    // 读取 csv 文件获取 dataFrameval df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
    df.show(5, truncate =false)
    ds.show(5, truncate =false)
    spark.stop()}}
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._

    // Create an RDD of Person objects from a text file, convert it to a Dataframeval OrderDf = spark.sparkContext
      .textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv").map(_.split(",")).map(attributes => OrderSchema(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary view
    OrderDf.createOrReplaceTempView("order_info")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, amount FROM order_info WHERE amount BETWEEN 100 AND 150")
    teenagersDF.map(teenager =>"Name: "+ teenager(0)).show(3,false)
    teenagersDF.map(teenager =>"Name: "+ teenager.getAs[String]("name")).show(3, truncate =false)// No pre-defined encoders for Dataset[Map[K,V]], define explicitly// 一次读取一行数据并将数据封装到map中// 基本类型和case类也可以定义为隐式val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()implicitval mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String,Any]]val array = teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name","amount"))).collect()
    array.foreach(println)
    teenagersDF.toJSON.show(3, truncate =false)
    spark.stop()}}caseclass OrderSchema(name:String, amount:Int)
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 * 1、Create an RDD of Rows from the original RDD;
 * 1、从原RDD的行中创建一个RDD;
 * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
 * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
 * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
 * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._

    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// 1、从原RDD的行中创建一个RDD;val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。val struct = StructType(Array(
      StructField("name", StringType, nullable =true),
      StructField("amount", IntegerType, nullable =true)))// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。val df = spark.createDataFrame(rowRDD, struct)
    df.printSchema()// root//  |-- name: string  (nullable = true)//  |-- amount: integer (nullable = true)
    spark.stop()}}
packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
 * Interoperating with RDDs
 * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
 * 1、Create an RDD of Rows from the original RDD;
 * 1、从原RDD的行中创建一个RDD;
 * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
 * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
 * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
 * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// 1、从原RDD的行中创建一个RDD;val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。val schemaString ="name amount"val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable =true))val schema = StructType(fields)// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。val df = spark.createDataFrame(rowRDD, schema)
    df.printSchema()// root//  |-- name: string  (nullable = true)//  |-- amount: string (nullable = true)
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.{SaveMode, SparkSession}/**
 * DataSource csv parquet
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo01 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val df = spark.read.option("header","true").format("csv").load("file:///home/lsl/IdeaProjects/spark-code/info.csv")
    df.select("name","amount").write.mode(SaveMode.Overwrite).format("parquet").save("/data/spark/parquet")
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.{SaveMode, SparkSession}/**
 * DataSource parquet json
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo02 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val df = spark.read.option("header","true").format("parquet").load("/data/spark/parquet")
    df.select("name","amount").write.mode(SaveMode.Overwrite).format("json").save("/data/spark/json")
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.types.{StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val jdbcDF = spark.read
      .format("jdbc").option("url","jdbc:mysql://spark03").option("dbtable","lihaozhe.dujitang").option("user","root").option("password","Lihaozhe!!@@1122").load()
    jdbcDF.show()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

importjava.util.Properties

/**
 * DataSource JDBC MySQL
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
    jdbcDF.show()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

importjava.util.Properties

/**
 * DataSource JDBC MySQL 自定义数据类型
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")// 自定义数据类型
    connectionProperties.put("customSchema","id STRING, content STRING")val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
    jdbcDF.printSchema()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

importjava.util.Properties

/**
 * DataSource JDBC MySQL
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
    jdbcDF.show()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

importjava.util.Properties

/**
 * DataSource JDBC MySQL 自定义数据类型
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")// 自定义数据类型
    connectionProperties.put("customSchema","id STRING, content STRING")val jdbcDF = spark.read
      .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
    jdbcDF.printSchema()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
      StructField("name", StringType, nullable =true),
      StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)
    df.write
      .format("jdbc").option("url","jdbc:mysql://spark03").option("dbtable","lihaozhe.data").option("user","root").option("password","Lihaozhe!!@@1122").mode(SaveMode.Append).save()
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importjava.util.Properties

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo07 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
      StructField("name", StringType, nullable =true),
      StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")
    df.write
      .mode(SaveMode.Append).jdbc("jdbc:mysql://spark03","lihaozhe.data", connectionProperties)
    spark.stop()}}
packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importjava.util.Properties

/**
 * DataSource JDBC MySQL Load Save
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 上午8:30
 */object ScalaDemo08 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
    val sc = spark.sparkContext
    val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
      StructField("name", StringType, nullable =true),
      StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)val connectionProperties =new Properties()
    connectionProperties.put("user","root")
    connectionProperties.put("password","Lihaozhe!!@@1122")
    df.write
      .mode(SaveMode.Append).option("createTableColumnTypes","name VARCHAR(33)").jdbc("jdbc:mysql://spark03","lihaozhe.data", connectionProperties)
    spark.stop()}}
hdfs dfs -mkdir -p /lihaozhe
hdfs dfs -mkdir -p /hive/info
createdatabase lihaozhe location '/lihaozhe';
use lihaozhe;
create external table`info`(
    name string comment'姓名',
    amount intcomment'金额')comment'订单表'row format delimited fieldsterminatedby','linesterminatedby'\n'
 stored as textfile
 location '/hive/info';
loaddatalocal inpath  '/root/data.csv'intotable info;
packagecom.lihaozhe.course07importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession

/**
 * spark on hive
 * @author 李昊哲
 * @version 1.0.0  2023/5/18 下午4:07
 */object ScalaHiveSource {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
      conf.setMaster("local")}val spark = SparkSession
      .builder().appName("Spark SQL hive example").config(conf).enableHiveSupport().getOrCreate()// 隐式转换importspark.implicits._
    importspark.sql

    sql("select * from lihaozhe.info").show()
    spark.stop()}}

报错信息 Couldn’t create directory /user/hive/resources
sparksql hive
报错原因
报错是因为 hive-site.xml 配置文件中
hive.downloaded.resources.dir 配置项目录
/user/hive/resources 无法在本地创建
sparksql hive

  • SparkSQL通过hive-site.xml中的配置信息,连接元数据数据库, 通过hdfs-site.xml和core-site.xml文件连接HDFS
  • 配置文件中 /user/hive/resources 目录 是创建在运行IDEA的机器上, 即window、mac、linux本机, 而不是在远程服务器上创建 由于无法在本地文件系统中创建/user/hive/resources 目录,所以报错

解决 org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: Couldn’t create directory /user/hive/resources
在本地文件创建目录

mac 或 linux

sudomkdir -p /user/hive/resources

windows
在项目所在盘符创建目录
例如项目在 D盘 就在D盘创建目录 d:\user\hive\resources
md d:\user\hive\resources
windows cmd 创建多级目录

标签: spark 大数据 hadoop

本文转载自: https://blog.csdn.net/qq_24330181/article/details/130643549
版权归原作者 李昊哲小课 所有, 如有侵权,请联系我们删除。

“Spark 从入门到精通”的评论:

还没有评论