0


Spark 从入门到精通

Spark 从入门到精通

环境搭建

准备工作

创建安装目录
  1. mkdir /opt/soft
  2. cd /opt/soft
下载scala
  1. wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
  1. tar -zxvf scala-2.13.10.tgz
修改scala目录名称
  1. mv scala-2.13.10 scala-2
下载spark
  1. wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
  1. tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz
修改目录名称
  1. mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
  1. vim /etc/profile
  1. exportJAVA_HOME=/opt/soft/jdk8
  2. exportCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
  3. exportZOOKEEPER_HOME=/opt/soft/zookeeper
  4. exportHADOOP_HOME=/opt/soft/hadoop3
  5. exportHADOOP_INSTALL=${HADOOP_HOME}exportHADOOP_MAPRED_HOME=${HADOOP_HOME}exportHADOOP_COMMON_HOME=${HADOOP_HOME}exportHADOOP_HDFS_HOME=${HADOOP_HOME}exportYARN_HOME=${HADOOP_HOME}exportHADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
  6. exportHDFS_NAMENODE_USER=root
  7. exportHDFS_DATANODE_USER=root
  8. exportHDFS_SECONDARYNAMENODE_USER=root
  9. exportYARN_RESOURCEMANAGER_USER=root
  10. exportYARN_NODEMANAGER_USER=root
  11. exportHIVE_HOME=/opt/soft/hive3
  12. exportHCAT_HOME=/opt/soft/hive3/hcatalog
  13. exportSQOOP_HOME=/opt/soft/sqoop-1
  14. exportFLUME_HOME=/opt/soft/flume
  15. exportHBASE_HOME=/opt/soft/hbase2
  16. exportPHOENIX_HOME=/opt/soft/phoenix
  17. exportSCALA_HOME=/opt/soft/scala-2
  18. exportSPARK_HOME=/opt/soft/spark3
  19. exportSPARKPYTHON=/opt/soft/spark3/python
  20. exportPATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
  1. source /etc/profile

Local模式

scala java
启动
  1. spark-shell

sparkl local spark-shell

页面地址:http://spark01:4040

![sparkl local spark-shell

退出
  1. :quit

sparkl local spark-shell

pyspark
启动
  1. pyspark

spark local pyspark

页面地址:http://spark01:4040

spark local pyspark

退出
  1. quit() or Ctrl-D

spark local pyspark

本地模式提交应用

在spark目录下执行

  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master local[2]\
  4. ./examples/jars/spark-examples_2.13-3.4.0.jar \10
  1. –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  2. –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
  3. spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

Standalone模式

编写核心配置文件

cont目录下

  1. cd /opt/soft/spark3/conf
  1. cp spark-env.sh.template spark-env.sh
  1. vim spark-env.sh
  1. exportJAVA_HOME=/opt/soft/jdk8
  2. exportHADOOP_HOME=/opt/soft/hadoop3
  3. exportHADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
  4. exportJAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
  5. exportSPARK_MASTER_HOST=spark01
  6. exportSPARK_MASTER_PORT=7077exportSPARK_WORKER_MEMORY=4g
  7. exportSPARK_WORKER_CORES=4exportSPARK_MASTER_WEBUI_PORT=6633
编辑slaves
  1. cp workers.template workers
  1. vim workers
  1. spark01
  2. spark02
  3. spark03
配置历史日志
  1. cp spark-defaults.conf.template spark-defaults.conf
  1. vim spark-defaults.conf
  1. spark.eventLog.enabled true
  2. spark.eventLog.dir hdfs://lihaozhe/spark-log
  1. hdfs dfs -mkdir /spark-log
  1. vim spark-env.sh
  1. exportSPARK_HISTORY_OPTS="
  2. -Dspark.history.ui.port=18080
  3. -Dspark.history.retainedApplications=30
  4. -Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
  1. mv sbin/start-all.sh sbin/start-spark.sh
  2. mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
  1. scp -r /opt/soft/spark3 root@spark02:/opt/soft
  2. scp -r /opt/soft/spark3 root@spark03:/opt/soft
  1. scp -r /etc/profile root@spark02:/etc
  2. scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

  1. source /etc/profile
启动
  1. start-spark.sh
  2. start-history-server.sh
webui

http://spark01:6633

spark standlone webui

http://spark01:18080

spark standlone history server

提交作业到集群
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://spark01:7077 \
  4. ./examples/jars/spark-examples_2.13-3.4.0.jar \10

spark standlone webui

spark standlone history server

HA模式

编写核心配置文件

cont目录下

  1. cd /opt/soft/spark3/conf
  1. cp spark-env.sh.template spark-env.sh
  1. vim spark-env.sh
  1. exportJAVA_HOME=/opt/soft/jdk8
  2. exportHADOOP_HOME=/opt/soft/hadoop3
  3. exportHADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
  4. exportJAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
  5. SPARK_DAEMON_JAVA_OPTS="
  6. -Dspark.deploy.recoveryMode=ZOOKEEPER
  7. -Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181
  8. -Dspark.deploy.zookeeper.dir=/spark3"exportSPARK_WORKER_MEMORY=4g
  9. exportSPARK_WORKER_CORES=4exportSPARK_MASTER_WEBUI_PORT=6633
  1. hdfs dfs -mkdir /spark3
编辑slaves
  1. cp workers.template workers
  1. vim workers
  1. spark01
  2. spark02
  3. spark03
配置历史日志
  1. cp spark-defaults.conf.template spark-defaults.conf
  1. vim spark-defaults.conf
  1. spark.eventLog.enabled true
  2. spark.eventLog.dir hdfs://lihaozhe/spark-log
  1. hdfs dfs -mkdir /spark-log
  1. vim spark-env.sh
  1. exportSPARK_HISTORY_OPTS="
  2. -Dspark.history.ui.port=18080
  3. -Dspark.history.retainedApplications=30
  4. -Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
  1. mv sbin/start-all.sh sbin/start-spark.sh
  2. mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
  1. scp -r /opt/soft/spark3 root@spark02:/opt/soft
  2. scp -r /opt/soft/spark3 root@spark03:/opt/soft
  1. scp -r /etc/profile root@spark02:/etc
  2. scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

  1. source /etc/profile
启动
  1. start-spark.sh
  2. start-history-server.sh
webui

http://spark01:6633

spark ha webui

http://spark01:18080

spark ha history server

提交作业到集群
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://spark01:7077 \
  4. ./examples/jars/spark-examples_2.13-3.4.0.jar \10
提交作业到Yarn
  1. bin/spark-submit --master yarn\
  2. --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10

spark ha webui

spark ha history server

spark-code

spark-core

pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lihaozhe</groupId><artifactId>spark-code</artifactId><version>1.0.0</version><properties><jdk.version>1.8</jdk.version><scala.version>2.13.10</scala.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><commons-lang3.version>3.12.0</commons-lang3.version><java-testdata-generator.version>1.1.2</java-testdata-generator.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.github.binarywang</groupId><artifactId>java-testdata-generator</artifactId><version>${java-testdata-generator.version}</version></dependency><!-- spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>3.4.0</version></dependency><!-- junit-jupiter-api --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>5.9.3</version><scope>test</scope></dependency><!-- junit-jupiter-engine --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-engine</artifactId><version>5.9.3</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.14.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.5</version></dependency><!-- commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.11.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.13</artifactId><version>3.4.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!-- commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency></dependencies><build><finalName>${project.artifactId}</finalName><!--<outputDirectory>../package</outputDirectory>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.11.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-clean-plugin</artifactId><version>3.2.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.3.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>3.3.2</version></plugin><!-- 编译级别 --><!-- 打包的时候跳过测试junit begin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.2</version><configuration><skip>true</skip></configuration></plugin><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.8.1</version><configuration><scalaCompatVersion>2.13</scalaCompatVersion><scalaVersion>2.13.10</scalaVersion></configuration><executions><execution><goals><goal>testCompile</goal></goals></execution><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
hdfs-conf

在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml

被引入的hdfs配置文件为测试集群配置文件

由于生产环境与测试环境不同,项目打包的时候排除hdfs配置文件

rdd
数据集方式构建RDD
  1. packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * 借助并行数据集 Parallelized Collections 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo01{publicstaticvoidmain(String[] args){String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(1,2,3,4,5);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);// 将数据获取到本地driverList<Integer> result = distData.collect();// lambda 表达式
  8. result.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * 借助并行数据集 Parallelized Collections 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo01 {def main(args: Array[String]):Unit={val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(1,2,3,4,5)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)
  8. distData.foreach(println)}}
本地文件构建RDD

words.txt

  1. linux shell
  2. java mysql jdbc
  3. hadoop hdfs mapreduce
  4. hive presto
  5. flume kafka
  6. hbase phoenix
  7. scala spark
  8. sqoop flink
  9. linux shell
  10. java mysql jdbc
  11. hadoop hdfs mapreduce
  12. hive presto
  13. flume kafka
  14. hbase phoenix
  15. scala spark
  16. sqoop flink
  17. base phoenix
  18. scala spark
  19. sqoop flink
  20. linux shell
  21. java mysql jdbc
  22. hadoop hdfs mapreduce
  23. java mysql jdbc
  24. hadoop hdfs mapreduce
  25. hive presto
  26. flume kafka
  27. hbase phoenix
  28. scala spark
  29. java mysql jdbc
  30. hadoop hdfs mapreduce
  31. java mysql jdbc
  32. hadoop hdfs mapreduce
  33. hive presto
  1. packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * 借助外部文件 External Datasets 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo02{publicstaticvoidmain(String[] args){String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt");// 将数据获取到本地driverList<String> lines = javaRDD.collect();// lambda 表达式
  8. lines.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * 借助外部文件 External Datasets 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo02 {def main(args: Array[String]):Unit={val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distFile = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt")
  8. distFile.foreach(println)}}
HDFS文件构建RDD
  1. packagecom.lihaozhe.course01;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.List;/**
  2. * 借助外部文件 External Datasets 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo03{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="RDD";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");// 将数据获取到本地driverList<String> lines = javaRDD.collect();// lambda 表达式
  8. lines.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course01importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * 借助外部文件 External Datasets 构建 RDD
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo03 {def main(args: Array[String]):Unit={
  7. System.setProperty("HADOOP_USER_NAME","root")val appName ="rdd"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用HDFS文件系统// val distFile = sc.textFile("hdfs://spark01:8020/data/words.txt")val distFile = sc.textFile("/data/words.txt")
  9. distFile.foreach(println)}}
转换算子与行动算子

data.csv

  1. person3,137
  2. person7,193
  3. person7,78
  4. person0,170
  5. person5,145
  6. person5,54
  7. person5,150
  8. person0,102
  9. person0,15
  10. person8,172
  11. person6,177
  12. person5,158
  13. person8,30
  14. person6,184
  15. person5,50
  16. person4,127
  17. person1,197
  18. person3,99
  19. person7,2
  20. person7,51
  21. person9,27
  22. person6,34
  23. person0,18
  24. person7,111
  25. person2,34
  26. person0,80
  27. person3,19
  28. person8,121
  29. person1,38
  30. person3,37
  31. person8,69
  32. person3,116
  33. person5,14
  34. person4,121
  35. person7,13
  36. person8,10
  37. person4,67
  38. person6,177
  39. person8,161
  40. person6,113
  41. person5,161
  42. person3,159
  43. person5,161
  44. person2,88
  45. person3,191
  46. person0,155
  47. person4,55
  48. person6,153
  49. person6,187
  50. person0,41
  51. person3,157
  52. person4,179
  53. person4,95
  54. person1,12
  55. person3,109
  56. person9,24
  57. person9,188
  58. person1,114
  59. person7,9
  60. person7,82
  61. person8,47
  62. person9,153
  63. person7,152
  64. person6,110
  65. person2,73
  66. person8,132
  67. person4,175
  68. person7,153
  69. person9,174
  70. person3,23
  71. person3,103
  72. person9,169
  73. person8,98
  74. person6,62
  75. person2,33
  76. person3,127
  77. person1,91
  78. person6,198
  79. person4,28
  80. person1,182
  81. person0,164
  82. person5,198
  83. person7,22
  84. person0,46
  85. person3,5
  86. person8,140
  87. person3,131
  88. person4,195
  89. person7,86
  90. person0,137
  91. person8,152
  92. person8,154
  93. person7,144
  94. person5,142
  95. person9,147
  96. person1,29
  97. person5,113
  98. person6,173
  99. person6,115
  100. person9,148
  101. person2,114
  102. person7,69
  103. person6,192
  104. person0,113
  105. person5,26
  106. person3,7
  107. person1,2
  108. person6,60
  109. person8,38
  110. person6,19
  111. person4,5
  112. person3,50
  113. person9,179
  114. person2,148
  115. person0,23
  116. person3,121
  117. person9,66
  118. person9,90
  119. person4,166
  120. person7,199
  121. person0,79
  122. person2,157
  123. person5,98
  124. person6,25
  125. person1,100
  126. person4,184
  127. person6,124
  128. person4,183
  129. person3,105
  130. person6,28
  131. person5,141
  132. person6,60
  133. person2,108
  134. person5,171
  135. person7,98
  136. person2,57
  137. person9,18
  138. person8,35
  139. person7,141
  140. person0,180
  141. person2,176
  142. person9,130
  143. person2,26
  144. person0,81
  145. person6,144
  146. person3,33
  147. person4,41
  148. person9,60
  149. person1,99
  150. person4,115
  151. person6,83
  152. person2,90
  153. person7,174
  154. person8,47
  155. person5,62
  156. person0,119
  157. person9,99
  158. person3,125
  159. person3,20
  160. person1,137
  161. person9,74
  162. person6,1
  163. person4,140
  164. person4,122
  165. person1,56
  166. person7,107
  167. person9,131
  168. person7,174
  169. person7,191
  170. person8,31
  171. person4,45
  172. person9,84
  173. person6,38
  174. person9,186
  175. person6,89
  176. person5,87
  177. person9,80
  178. person5,107
  179. person3,175
  180. person8,44
  181. person0,114
  182. person7,63
  183. person3,129
  184. person9,77
  185. person9,86
  186. person9,183
  187. person3,61
  188. person4,104
  189. person2,192
  190. person5,142
  191. person4,124
  192. person5,76
  193. person0,187
  194. person3,38
  195. person7,62
  196. person5,153
  197. person9,149
  198. person7,87
  199. person7,27
  200. person6,88
count
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * count 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo01{publicstaticvoidmain(String[] args){String appName ="count";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);long count = distData.count();System.out.println("count = "+ count);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * count 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo01 {def main(args: Array[String]):Unit={val appName ="count"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val count = distData.count()
  8. println(s"count = ${count}")}}
take
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * take 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo02{publicstaticvoidmain(String[] args){String appName ="take";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);List<Integer> topList = distData.take(3);
  8. topList.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * take 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo02 {def main(args: Array[String]):Unit={val appName ="take"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val top = distData.take(3)
  8. top.foreach(println)}}
distinct
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * distinct 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo03{publicstaticvoidmain(String[] args){String appName ="distinct";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,5,6,7,8,9,3,4,2,4,3);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> uniqueRDD = distData.distinct();List<Integer> uniqueList = uniqueRDD.collect();
  8. uniqueList.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * distinct 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo03 {def main(args: Array[String]):Unit={val appName ="distinct"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,5,6,7,8,9,3,4,2,4,3)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val uniqueData = distData.distinct()
  8. uniqueData.foreach(println)}}
map
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * map 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo04{publicstaticvoidmain(String[] args){String appName ="map";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(1,2,3,4,5);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> rs = distData.map(num -> num *2);List<Integer> list = rs.collect();
  8. list.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * map 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo04 {def main(args: Array[String]):Unit={val appName ="map"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(1,2,3,4,5)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val rs = distData.map(_ *2)
  8. rs.foreach(println)}}
flatMap
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importjava.util.Arrays;importjava.util.Iterator;importjava.util.List;/**
  2. * flatMap 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo05{publicstaticvoidmain(String[] args){String appName ="flatMap";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<String> data =Arrays.asList("hadoop hive presto","hbase phoenix","spark flink");// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<String> javaRDD = sc.parallelize(data);// javaRDD.flatMap(new FlatMapFunction<String, Object>() {// @Override// public Iterator<Object> call(String s) throws Exception {// return null;// }// });JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String,String>) line ->Arrays.asList(line.split(" ")).listIterator());List<String> words = wordsRdd.collect();
  8. words.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * flatMap 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo05 {def main(args: Array[String]):Unit={val appName ="flatMap"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array("hadoop hive presto","hbase phoenix","spark flink")// 从集合中创建 RDD// Parallelized Collections// ("hadoop","hive","presto","hbase","phoenix","spark","flink")val rs = data.flatMap(_.split(" "))
  8. rs.foreach(println)}}
filter
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;/**
  2. * filter 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:18
  6. */publicclassJavaDemo06{publicstaticvoidmain(String[] args){String appName ="filter";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  7. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 数据集List<Integer> data =Arrays.asList(0,1,2,3,4,5,6,7,8,9);// 从集合中创建 RDD// Parallelized CollectionsJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> evenRDD = distData.filter(num -> num %2==0);List<Integer> evenList = evenRDD.collect();
  8. evenList.forEach(System.out::println);}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * filter 算子
  3. *
  4. * @author 李昊哲
  5. * @version 1.0.0 2023/5/17 上午10:08
  6. */object ScalaDemo06 {def main(args: Array[String]):Unit={val appName ="filter"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  7. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 数据集val data = Array(0,1,2,3,4,5,6,7,8,9)// 从集合中创建 RDD// Parallelized Collectionsval distData = sc.parallelize(data)val evenData = distData.filter(_ %2==0)
  8. evenData.foreach(println)}}
groupByKey
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
  2. * groupByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:18
  7. */publicclassJavaDemo07{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="groupByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  8. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {// @Override// public Tuple2<String, Integer> call(String s) throws Exception {// return null;// }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();List<Tuple2<String,Iterable<Integer>>> collect = groupRDD.collect();
  9. collect.forEach(tup ->{System.out.print(tup._1 +" >>> (");
  10. tup._2.forEach(num ->System.out.print(num +","));System.out.println("\b)");});}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * groupByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo07 {def main(args: Array[String]):Unit={val appName ="groupByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1)))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
  9. groupData.foreach(println)}}
reduceByKey
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
  2. * reduceByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:18
  7. */publicclassJavaDemo08{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  8. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {// @Override// public Integer call(Integer integer, Integer integer2) throws Exception {// return integer + integer2;// }// });JavaPairRDD<String,Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);List<Tuple2<String,Integer>> collect = reduceRDD.collect();
  9. collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * reduceByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo08 {def main(args: Array[String]):Unit={val appName ="reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
  9. sumData.foreach(println)}}
mapValues
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Iterator;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;/**
  2. * mapValues 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:18
  7. */publicclassJavaDemo09{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="mapValues";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  8. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {// @Override// public Tuple2<String, Integer> call(String s) throws Exception {// return null;// }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();JavaPairRDD<String,Double> avgRDD = groupRDD.mapValues(v ->{int sum =0;Iterator<Integer> it = v.iterator();AtomicInteger atomicInteger =newAtomicInteger();while(it.hasNext()){Integer amount = it.next();
  9. sum += amount;
  10. atomicInteger.incrementAndGet();}return(double) sum / atomicInteger.get();});List<Tuple2<String,Double>> collect = avgRDD.collect();
  11. collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));// Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));// Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();// Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();// Map<String, Double> map = new HashMap<>();// while (it.hasNext()) {// Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();// Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);// long count = entry.getValue().stream().map(tup -> tup._2).count();//// map.put(entry.getKey(), Double.valueOf(sum) / count);// }// map.forEach((name, amount) -> System.out.println(name + " >>> " + amount));}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * mapValues 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo09 {def main(args: Array[String]):Unit={val appName ="mapValues"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
  9. groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f"))
  10. avgData.foreach(println)}}
sortByKey
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.List;/**
  2. * sortByKey reduceByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析每个人消费的金额数据汇总
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:18
  7. */publicclassJavaDemo10{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="sortByKey reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  8. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});// JavaPairRDD<String, Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {// @Override// public Integer call(Integer integer, Integer integer2) throws Exception {// return integer + integer2;// }// });JavaPairRDD<String,Integer> reduceRDD = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);JavaPairRDD<String,Integer> soredRDD = reduceRDD.sortByKey(false);List<Tuple2<String,Integer>> collect = soredRDD.collect();
  9. collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));}}}
  1. packagecom.lihaozhe.course02;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Iterator;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;/**
  2. * sortByKey reduceByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 析客单价
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:18
  7. */publicclassJavaDemo11{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="sortByKey reduceByKey";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");// spark基础配置SparkConf conf =newSparkConf().setAppName(appName);// 本地运行
  8. conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){// 从外部中创建 RDD// External Datasets// 使用本地文件系统// JavaRDD<String> javaRDD = sc.textFile("hdfs://spark01:8020/data/words.txt");JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv");// javaRDD.mapToPair(new PairFunction<String, String, Integer>() {// @Override// public Tuple2<String, Integer> call(String s) throws Exception {// return null;// }// });JavaPairRDD<String,Integer> javaPairRDD = javaRDD.mapToPair((PairFunction<String,String,Integer>) word ->{String[] words = word.split(",");returnnewTuple2<String,Integer>(words[0],Integer.parseInt(words[1]));});JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey();JavaPairRDD<String,Double> avgRDD = groupRDD.mapValues(v ->{int sum =0;Iterator<Integer> it = v.iterator();AtomicInteger atomicInteger =newAtomicInteger();while(it.hasNext()){Integer amount = it.next();
  9. sum += amount;
  10. atomicInteger.incrementAndGet();}return(double) sum / atomicInteger.get();});JavaPairRDD<String,Double> sortedRDD = avgRDD.sortByKey(false);List<Tuple2<String,Double>> collect = sortedRDD.collect();
  11. collect.forEach(tup ->System.out.println(tup._1 +" >>> "+ tup._2));// Map<String, List<Tuple2<String, Integer>>> listMap = javaPairRDD.collect().stream().collect(Collectors.groupingBy(tup -> tup._1));// Set<Map.Entry<String, List<Tuple2<String, Integer>>>> entries = listMap.entrySet();// Iterator<Map.Entry<String, List<Tuple2<String, Integer>>>> it = entries.iterator();// Map<String, Double> map = new HashMap<>();// while (it.hasNext()) {// Map.Entry<String, List<Tuple2<String, Integer>>> entry = it.next();// Integer sum = entry.getValue().stream().map(tup -> tup._2).reduce(Integer::sum).orElse(0);// long count = entry.getValue().stream().map(tup -> tup._2).count();//// map.put(entry.getKey(), Double.valueOf(sum) / count);// }// map.forEach((name, amount) -> System.out.println(name + " >>> " + amount));}}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * sortByKey educeByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo10 {def main(args: Array[String]):Unit={val appName ="sortByKey reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
  9. sumData.foreach(println)val swapData = sumData.map(_.swap)// 参数 true为升序 false为降序 默认为升序val sortData = swapData.sortByKey(ascending =false)
  10. sortData.foreach(println)}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * sortByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo11 {def main(args: Array[String]):Unit={val appName ="sortByKey reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
  9. groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size))
  10. avgData.foreach(println)val swapData = avgData.map(_.swap)// 参数 true为升序 false为降序 默认为升序val sortData = swapData.sortByKey(ascending =false)
  11. sortData.foreach(v => println(v._2 +","+ v._1.formatted("%.2f")))}}
sortBy
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * sortBy educeByKey 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo12 {def main(args: Array[String]):Unit={val appName ="sortBy reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)
  9. sumData.foreach(println)// 参数 true为升序 false为降序 默认为升序val sortedData = sumData.sortBy(_._2, ascending =false)
  10. sortedData.foreach(println)}}
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * sortBy 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客单价
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo13 {def main(args: Array[String]):Unit={val appName ="sortBy reduceByKey"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val groupData = tupleData.groupByKey()
  9. groupData.foreach(println)val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f").toDouble)// 参数 true为升序 false为降序 默认为升序val sortedData = avgData.sortBy(_._2, ascending =false)
  10. sortedData.foreach(println)}}
join
  1. packagecom.lihaozhe.course02importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * join 算子
  3. * 引入数据文件 data.csv 第一列为姓名 第二列为每次消费的订单金额 分析客总金额
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/17 上午10:08
  7. */object ScalaDemo14 {def main(args: Array[String]):Unit={val appName ="join"// spark基础配置// val conf = new SparkConf().setAppName(appName).setMaster("local")val conf =new SparkConf().setAppName(appName)// 本地运行
  8. conf.setMaster("local")// 构建 SparkContext spark 上下文val sc =new SparkContext(conf)// 从外部中创建 RDD// External Datasets// 使用本地文件系统val distData = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// distData.foreach(println)// (person3,137)val tupleData = distData.map(line =>(line.split(",")(0), line.split(",")(1).toInt))// (person1,Seq(197, 38, 12, 114, 91, 182, 29, 2, 100, 99, 137, 56))val sumData = tupleData.reduceByKey(_ + _)val groupData = tupleData.groupByKey()val avgData = groupData.mapValues(v =>(v.sum.toDouble / v.size).formatted("%.2f").toDouble)val rsData = sumData.join(avgData)
  9. rsData.foreach(println)}}
WordCount
JavaWordCount
  1. packagecom.lihaozhe.course03;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importjava.util.Arrays;/**
  2. * @author 李昊哲
  3. * @version 1.0.0 2023/5/17 下午4:16
  4. */publicclassJavaWordCount{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");String appName ="WordCount";// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");SparkConf conf =newSparkConf().setAppName(appName);// 本地运行// conf.setMaster("local");try(JavaSparkContext sc =newJavaSparkContext(conf)){JavaRDD<String> javaRDD = sc.textFile("/data/words.txt");JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String,String>) line ->Arrays.asList(line.split(" ")).listIterator());JavaPairRDD<String,Integer> javaPairRDD = wordsRdd.mapToPair((PairFunction<String,String,Integer>) word ->newTuple2<>(word,1));JavaPairRDD<String,Integer> rs = javaPairRDD.reduceByKey((Function2<Integer,Integer,Integer>)Integer::sum);
  5. rs.saveAsTextFile("/data/result");
  6. sc.stop();}}}
ScalaWordCount
  1. packagecom.lihaozhe.course03importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * @author 李昊哲
  3. * @version 1.0.0 2023/5/17 下午3:51
  4. */object ScalaWordCount01 {def main(args: Array[String]):Unit={
  5. System.setProperty("HADOOP_USER_NAME","root")// val conf = new SparkConf().setAppName("WordCount").setMaster("local")val conf =new SparkConf().setAppName("WordCount")// conf.setMaster("local")val sc =new SparkContext(conf)val content = sc.textFile("/data/words.txt")// content.foreach(println)val words = content.flatMap(_.split(" "))// words.foreach(println)// (love,Seq(love, love, love, love, love))val wordGroup = words.groupBy(word => word)// wordGroup.foreach(println)// (love,5)val wordCount = wordGroup.mapValues(_.size)// wordCount.foreach(println)
  6. wordCount.saveAsTextFile("/data/result");
  7. sc.stop()}}
  1. packagecom.lihaozhe.course03importorg.apache.spark.{SparkConf, SparkContext}/**
  2. * @author 李昊哲
  3. * @version 1.0.0 2023/5/17 下午3:51
  4. */object ScalaWordCount02 {def main(args: Array[String]):Unit={
  5. System.setProperty("HADOOP_USER_NAME","root")// val conf = new SparkConf().setAppName("WordCount").setMaster("local")val conf =new SparkConf().setAppName("WordCount")// conf.setMaster("local")val sc =new SparkContext(conf)val content = sc.textFile("/data/words.txt")// content.foreach(println)val words = content.flatMap(_.split(" "))// words.foreach(println)// (love,Seq(love, love, love, love, love))val wordMap = words.map((_,1))// wordGroup.foreach(println)// (love,5)val wordCount = wordMap.reduceByKey(_ + _)//wordCount.foreach(println)
  6. wordCount.saveAsTextFile("/data/result");
  7. sc.stop()}}
项目打包发布
  1. mvn package

上传jar文件到集群

在集群上提交

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode client

standlone 模式 spark-submit --master spark://spark01:7077 --delpoy-mode cluster

yarn client 模式 spark-submit --master yarn --delpoy-mode client

yarn cluster 模式 spark-submit --master yarn --delpoy-mode cluster

  1. spark-submit --master yarn --class com.lihaozhe.course03.JavaWordCount spark-code.jar --deploy-mode cluster
  1. spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount01 spark-code.jar --deploy-mode cluster
  1. spark-submit --master yarn --class com.lihaozhe.course03.ScalaWordCount02 spark-code.jar --deploy-mode cluster

sparkSQL

  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * 构建 dataFrame
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo01 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// root// |-- _c0: string (nullable = true)// |-- _c1: string (nullable = true)
  12. df.printSchema()
  13. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * show
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo02 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")
  12. df.show()
  13. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * option 是否将第一列作为字段名 header默认值为 false
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read
  12. .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// root// |-- name: string (nullable = true)// |-- amount: string (nullable = true)
  13. df.printSchema()
  14. df.show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * select
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")// root// |-- _c0: string (nullable = true)// |-- _c1: string (nullable = true)
  12. df.printSchema()
  13. df.select("_c0","_c1").show()
  14. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * withColumnRenamed
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.withColumnRenamed("_c0","name").withColumnRenamed("_c1","amount").printSchema()// root// |-- name: string (nullable = true)// |-- amount: string (nullable = true)
  13. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * col("原字段名").cast(数据类型).as("新字段名"),
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * topN
  5. * show(n,truncate = false)默认显示前20条记录 numRows 记录数 truncate 显示结果是否裁剪 默认值为true为裁剪 false为不裁剪
  6. * first() 第一条记录
  7. * take(n)
  8. * head(n)
  9. * tail(n) 最后n条记录
  10. *
  11. * @author 李昊哲
  12. * @version 1.0.0 2023/5/18 上午8:30
  13. */object ScalaDemo07 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  14. conf.setMaster("local")}val spark = SparkSession
  15. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  16. // 读取 csv 文件获取 dataFrameval df = spark.read
  17. .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")// df.show(5,truncate = false)// println(df.first())// df.take(3).foreach(println)// df.head(3).foreach(println)
  18. df.tail(3).foreach(println)
  19. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * where 按条件查询
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo08 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).where("amount > 100").show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * where 按条件查询
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo09 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).where(col("amount")>100).show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * filter 按条件查询
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo10 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).filter("amount > 100").show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * filter 按条件查询
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo11 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).filter(col("amount")>100).show()
  15. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  4. * groupBy count 分组聚合
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo12 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.select(
  13. col("_c0").cast(StringType).as("name"),
  14. col("_c1").cast(IntegerType).as("amount"),).groupBy(col("name")).count().show()
  15. spark.stop()}}packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  16. importorg.apache.spark.sql.SparkSession
  17. importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.types.{IntegerType, StringType}/**
  18. * groupBy count 分组聚合
  19. *
  20. * @author 李昊哲
  21. * @version 1.0.0 2023/5/18 上午8:30
  22. */object ScalaDemo12 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  23. conf.setMaster("local")}val spark = SparkSession
  24. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  25. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  26. df.select(
  27. col("_c0").cast(StringType).as("name"),
  28. col("_c1").cast(IntegerType).as("amount"),).groupBy(col("name")).count().show()
  29. spark.stop()}}
  1. packagecom.lihaozhe.course04importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * SQLContext
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo13 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.csv("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.withColumnRenamed("_c0","name").withColumnRenamed("_c1","amount")// 使用 DataFrame 生成一张临时表
  13. df.createOrReplaceTempView("order_info")// 获取 SQLContext 对象val sqlContext = spark.sqlContext
  14. sqlContext.sql("select _c0 as name,_c1 as amount from order_info where _c1 > 100").show(false)
  15. spark.stop()}}
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * DataFrame 与 DataSet
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo01 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read
  12. .option("header","true")// 是否将第一列作为字段名 header默认值为 false.csv("file:///home/lsl/IdeaProjects/spark-code/info.csv")val ds = df.as[OrderInfo]val rdd = ds.map(orderInfo =>(orderInfo.name, orderInfo.amount.toInt)).rdd
  13. rdd.reduceByKey(_ + _).foreach(println)
  14. spark.stop()}}caseclass OrderInfo(name:String, amount:String)
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * 创建 DataSet
  5. * spark.read.text 方法返回值为 DataFrame
  6. * spark.read.textFile 方法返回值为 DataSet
  7. *
  8. * @author 李昊哲
  9. * @version 1.0.0 2023/5/18 上午8:30
  10. */object ScalaDemo02 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  11. conf.setMaster("local")}val spark = SparkSession
  12. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  13. // 读取 csv 文件获取 dataFrameval df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  14. spark.stop()}}
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * 直接使用 spark.read.text 或者 spark.read.textFile 读进来的数据 只有一个 String 类型 名字为 value 的值
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. // 读取 csv 文件获取 dataFrameval df = spark.read.text("file:///home/lsl/IdeaProjects/spark-code/data.csv")val ds = spark.read.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")
  12. df.show(5, truncate =false)
  13. ds.show(5, truncate =false)
  14. spark.stop()}}
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * Interoperating with RDDs
  5. * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
  6. *
  7. * @author 李昊哲
  8. * @version 1.0.0 2023/5/18 上午8:30
  9. */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  10. conf.setMaster("local")}val spark = SparkSession
  11. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  12. // Create an RDD of Person objects from a text file, convert it to a Dataframeval OrderDf = spark.sparkContext
  13. .textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv").map(_.split(",")).map(attributes => OrderSchema(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary view
  14. OrderDf.createOrReplaceTempView("order_info")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, amount FROM order_info WHERE amount BETWEEN 100 AND 150")
  15. teenagersDF.map(teenager =>"Name: "+ teenager(0)).show(3,false)
  16. teenagersDF.map(teenager =>"Name: "+ teenager.getAs[String]("name")).show(3, truncate =false)// No pre-defined encoders for Dataset[Map[K,V]], define explicitly// 一次读取一行数据并将数据封装到map中// 基本类型和case类也可以定义为隐式val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()implicitval mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String,Any]]val array = teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name","amount"))).collect()
  17. array.foreach(println)
  18. teenagersDF.toJSON.show(3, truncate =false)
  19. spark.stop()}}caseclass OrderSchema(name:String, amount:Int)
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
  3. * Interoperating with RDDs
  4. * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
  5. * 1、Create an RDD of Rows from the original RDD;
  6. * 1、从原RDD的行中创建一个RDD;
  7. * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  8. * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
  9. * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
  10. * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
  11. *
  12. * @author 李昊哲
  13. * @version 1.0.0 2023/5/18 上午8:30
  14. */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  15. conf.setMaster("local")}val spark = SparkSession
  16. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  17. val sc = spark.sparkContext
  18. val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// 1、从原RDD的行中创建一个RDD;val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。val struct = StructType(Array(
  19. StructField("name", StringType, nullable =true),
  20. StructField("amount", IntegerType, nullable =true)))// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。val df = spark.createDataFrame(rowRDD, struct)
  21. df.printSchema()// root// |-- name: string (nullable = true)// |-- amount: integer (nullable = true)
  22. spark.stop()}}
  1. packagecom.lihaozhe.course05importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
  3. * Interoperating with RDDs
  4. * 在字段较少的情况下 使用 反射 推到 出 RDD schema 信息
  5. * 1、Create an RDD of Rows from the original RDD;
  6. * 1、从原RDD的行中创建一个RDD;
  7. * 2、Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  8. * 2、创建由StructType表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。
  9. * 3、Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
  10. * 3、通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。
  11. *
  12. * @author 李昊哲
  13. * @version 1.0.0 2023/5/18 上午8:30
  14. */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  15. conf.setMaster("local")}val spark = SparkSession
  16. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换val sc = spark.sparkContext
  17. val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")// 1、从原RDD的行中创建一个RDD;val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))// 2、创建由 StructType 表示的模式,该模式与步骤1中创建的RDD中的Rows结构匹配。val schemaString ="name amount"val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable =true))val schema = StructType(fields)// 3、通过 SparkSession 提供的 createDataFrame 方法将 schema 应用到 RDD 的行。val df = spark.createDataFrame(rowRDD, schema)
  18. df.printSchema()// root// |-- name: string (nullable = true)// |-- amount: string (nullable = true)
  19. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.{SaveMode, SparkSession}/**
  3. * DataSource csv parquet
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/18 上午8:30
  7. */object ScalaDemo01 {def main(args: Array[String]):Unit={
  8. System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. val df = spark.read.option("header","true").format("csv").load("file:///home/lsl/IdeaProjects/spark-code/info.csv")
  12. df.select("name","amount").write.mode(SaveMode.Overwrite).format("parquet").save("/data/spark/parquet")
  13. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.{SaveMode, SparkSession}/**
  3. * DataSource parquet json
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/18 上午8:30
  7. */object ScalaDemo02 {def main(args: Array[String]):Unit={
  8. System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. val df = spark.read.option("header","true").format("parquet").load("/data/spark/parquet")
  12. df.select("name","amount").write.mode(SaveMode.Overwrite).format("json").save("/data/spark/json")
  13. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.types.{StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SparkSession}/**
  3. * DataSource JDBC MySQL Load Save
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/18 上午8:30
  7. */object ScalaDemo03 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  8. conf.setMaster("local")}val spark = SparkSession
  9. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  10. val jdbcDF = spark.read
  11. .format("jdbc").option("url","jdbc:mysql://spark03").option("dbtable","lihaozhe.dujitang").option("user","root").option("password","Lihaozhe!!@@1122").load()
  12. jdbcDF.show()
  13. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importjava.util.Properties
  4. /**
  5. * DataSource JDBC MySQL
  6. *
  7. * @author 李昊哲
  8. * @version 1.0.0 2023/5/18 上午8:30
  9. */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  10. conf.setMaster("local")}val spark = SparkSession
  11. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  12. val connectionProperties =new Properties()
  13. connectionProperties.put("user","root")
  14. connectionProperties.put("password","Lihaozhe!!@@1122")val jdbcDF = spark.read
  15. .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
  16. jdbcDF.show()
  17. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importjava.util.Properties
  4. /**
  5. * DataSource JDBC MySQL 自定义数据类型
  6. *
  7. * @author 李昊哲
  8. * @version 1.0.0 2023/5/18 上午8:30
  9. */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  10. conf.setMaster("local")}val spark = SparkSession
  11. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  12. val connectionProperties =new Properties()
  13. connectionProperties.put("user","root")
  14. connectionProperties.put("password","Lihaozhe!!@@1122")// 自定义数据类型
  15. connectionProperties.put("customSchema","id STRING, content STRING")val jdbcDF = spark.read
  16. .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
  17. jdbcDF.printSchema()
  18. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importjava.util.Properties
  4. /**
  5. * DataSource JDBC MySQL
  6. *
  7. * @author 李昊哲
  8. * @version 1.0.0 2023/5/18 上午8:30
  9. */object ScalaDemo04 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  10. conf.setMaster("local")}val spark = SparkSession
  11. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  12. val connectionProperties =new Properties()
  13. connectionProperties.put("user","root")
  14. connectionProperties.put("password","Lihaozhe!!@@1122")val jdbcDF = spark.read
  15. .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
  16. jdbcDF.show()
  17. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. importjava.util.Properties
  4. /**
  5. * DataSource JDBC MySQL 自定义数据类型
  6. *
  7. * @author 李昊哲
  8. * @version 1.0.0 2023/5/18 上午8:30
  9. */object ScalaDemo05 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  10. conf.setMaster("local")}val spark = SparkSession
  11. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  12. val connectionProperties =new Properties()
  13. connectionProperties.put("user","root")
  14. connectionProperties.put("password","Lihaozhe!!@@1122")// 自定义数据类型
  15. connectionProperties.put("customSchema","id STRING, content STRING")val jdbcDF = spark.read
  16. .jdbc("jdbc:mysql://spark03","lihaozhe.dujitang", connectionProperties)
  17. jdbcDF.printSchema()
  18. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/**
  3. * DataSource JDBC MySQL Load Save
  4. *
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/18 上午8:30
  7. */object ScalaDemo06 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  8. conf.setMaster("local")}val spark = SparkSession
  9. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  10. val sc = spark.sparkContext
  11. val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
  12. StructField("name", StringType, nullable =true),
  13. StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)
  14. df.write
  15. .format("jdbc").option("url","jdbc:mysql://spark03").option("dbtable","lihaozhe.data").option("user","root").option("password","Lihaozhe!!@@1122").mode(SaveMode.Append).save()
  16. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importjava.util.Properties
  3. /**
  4. * DataSource JDBC MySQL Load Save
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo07 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. val sc = spark.sparkContext
  12. val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
  13. StructField("name", StringType, nullable =true),
  14. StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)val connectionProperties =new Properties()
  15. connectionProperties.put("user","root")
  16. connectionProperties.put("password","Lihaozhe!!@@1122")
  17. df.write
  18. .mode(SaveMode.Append).jdbc("jdbc:mysql://spark03","lihaozhe.data", connectionProperties)
  19. spark.stop()}}
  1. packagecom.lihaozhe.course06importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{Row, SaveMode, SparkSession}importjava.util.Properties
  3. /**
  4. * DataSource JDBC MySQL Load Save
  5. *
  6. * @author 李昊哲
  7. * @version 1.0.0 2023/5/18 上午8:30
  8. */object ScalaDemo08 {def main(args: Array[String]):Unit={// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL basic example").config(conf).getOrCreate()// 隐式转换importspark.implicits._
  11. val sc = spark.sparkContext
  12. val ds = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/data.csv")val rowRDD = ds.map(_.split(",")).map(content => Row(content(0), content(1).toInt))val struct = StructType(Array(
  13. StructField("name", StringType, nullable =true),
  14. StructField("amount", IntegerType, nullable =true)))val df = spark.createDataFrame(rowRDD, struct)val connectionProperties =new Properties()
  15. connectionProperties.put("user","root")
  16. connectionProperties.put("password","Lihaozhe!!@@1122")
  17. df.write
  18. .mode(SaveMode.Append).option("createTableColumnTypes","name VARCHAR(33)").jdbc("jdbc:mysql://spark03","lihaozhe.data", connectionProperties)
  19. spark.stop()}}
  1. hdfs dfs -mkdir -p /lihaozhe
  2. hdfs dfs -mkdir -p /hive/info
  1. createdatabase lihaozhe location '/lihaozhe';
  1. use lihaozhe;
  1. create external table`info`(
  2. name string comment'姓名',
  3. amount intcomment'金额')comment'订单表'row format delimited fieldsterminatedby','linesterminatedby'\n'
  4. stored as textfile
  5. location '/hive/info';
  1. loaddatalocal inpath '/root/data.csv'intotable info;
  1. packagecom.lihaozhe.course07importorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.SparkSession
  3. /**
  4. * spark on hive
  5. * @author 李昊哲
  6. * @version 1.0.0 2023/5/18 下午4:07
  7. */object ScalaHiveSource {def main(args: Array[String]):Unit={
  8. System.setProperty("HADOOP_USER_NAME","root")// 基础配置val conf =new SparkConf()if(!conf.contains("spark.master")){
  9. conf.setMaster("local")}val spark = SparkSession
  10. .builder().appName("Spark SQL hive example").config(conf).enableHiveSupport().getOrCreate()// 隐式转换importspark.implicits._
  11. importspark.sql
  12. sql("select * from lihaozhe.info").show()
  13. spark.stop()}}

报错信息 Couldn’t create directory /user/hive/resources
sparksql hive
报错原因
报错是因为 hive-site.xml 配置文件中
hive.downloaded.resources.dir 配置项目录
/user/hive/resources 无法在本地创建
sparksql hive

  • SparkSQL通过hive-site.xml中的配置信息,连接元数据数据库, 通过hdfs-site.xml和core-site.xml文件连接HDFS
  • 配置文件中 /user/hive/resources 目录 是创建在运行IDEA的机器上, 即window、mac、linux本机, 而不是在远程服务器上创建 由于无法在本地文件系统中创建/user/hive/resources 目录,所以报错

解决 org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: Couldn’t create directory /user/hive/resources
在本地文件创建目录

mac 或 linux

  1. sudomkdir -p /user/hive/resources

windows
在项目所在盘符创建目录
例如项目在 D盘 就在D盘创建目录 d:\user\hive\resources
md d:\user\hive\resources
windows cmd 创建多级目录

标签: spark 大数据 hadoop

本文转载自: https://blog.csdn.net/qq_24330181/article/details/130643549
版权归原作者 李昊哲小课 所有, 如有侵权,请联系我们删除。

“Spark 从入门到精通”的评论:

还没有评论