1.示例
(1) 将依赖文件加入pom.xml
<dependency><groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.1</version>
</dependency>
(2) 打开虚拟机,配置镜像文件,如果配置过,请忽略此步骤
Centos7更换yum国内源教程_centos7更换yum源-CSDN博客
(3) 安装nc软件
sudo yum install -y nc
(4) 在本地idea项目中,运行 Streaming.java
(5)在虚拟机启动一个nc端口
nc -lk 9999
再在控制台输入
Jack Mary
(6) IDEA控制台运行效果:
(7) 再在12代码后(即val的声明后面)添加代码:
lines.flatMap(.split("\s+")).map((, 1)).reduceByKey(+).print();
运行结果
(8)IDEA打开Streaming2.scala,设置自己的hdfs地址,可以使用以下代码查看在虚拟机,并且更改IDEA第7 12行的代码
hdfs getconf -confKey fs.defaultFS
第10行修改自己的主机名,通过以下代码在虚拟机查看
hostname
使用Maven打包项目
(9) 使用nc创建端口(可以使用另外一个控制台使用nc)
nc -lk 4444
(10) 启动spark项目,如11.1章节所示步骤
cd /app/spark-3.1.1-bin-hadoop3.2/sbin
./start-master.sh
./start-worker.sh spark://192.168.199.6:7077
(11)查看work和master的通信
netstat -tuln | grep 7077
netstat -tuln | grep 8080
若显示的内容不包括127.0.0.0,即运行成功,此部分比较麻烦,如果显示不正确可以修改
export SPARK_MASTER_HOST='0.0.0.0'
export SPARK_WORKER_PORT=8080
export SPARK_WORK_HOST='0.0.0.0'
export SPARK_MASTER_PORT=7077
运行打包的项目,根据自己的位置和端口进行修改
spark-submit --master spark://myserver:7077 --class org.hadoop.spark.Streaming2 /usr/jar/chapter11-1.0.jar hdfs://192.168.199.6:9000/test/ hdfs://192.168.199.6:9000/out001
运行效果,在网页查看
2,DStream
虚拟机打开控制台,输入代码后,输入数据
nc -lk 9999
打开DStreaming.scala,修改19行的ip地址改成自己的,并且运行
3, FileStream
(1)打开FileStreaming.scala文件,使用Maven打包,并且复制到app文件夹下
运行代码,需要修改自己的主机名
spark-submit --master spark://myserver:7077 --class org.hadoop.spark.FileStreaming /app/chapter11-1.0.jar
使用winscp,往app文件目录下面添加文件后
(2) 打开FileStreaming2.scala,按照上进行打包,并且复制到app文件夹下
在虚拟机运行,创建hdfs文件夹,一个控制台
hadoop fs -mkdir -p /test
echo "This is a test file." > /app/testfile.txt
hadoop fs -put /app/testfile.txt /test/
另外一个控制台
spark-submit --master spark://myserver:7077 --class org.hadoop.spark.FIleStreaming2 /app/chapter11-1.0.jar hdfs://192.168.199.6:9000/test/
运行结果
4,窗口函数
(1) 在虚拟机打开9999端口并且监控
nc -lk 9999
(2) 打开文件WindowFun.scal,修改主机名为自己的,并且运行
(3)往虚拟机nc中输入数据
hello world
spark streaming
window function
运行效果: 可以看见在一段时间内输入的内容,被整理在一起了
5 updateStateByKey
(1) 运行ForUpdateStateByKey.java,路径如下:
src/main/java/org/hadoop/spark/ForUpdateStateByKey.java
(2) 运行UpdateStateByKeyClient.scala
版权归原作者 勇敢de大角牛 所有, 如有侵权,请联系我们删除。