大数据编程实验四:SparkStreaming编程
文章目录
一、实验目的与要求
- 通过实验掌握Spark Streaming的基本编程方法
- 熟悉利用Spark Streaming处理来自不同数据源的数据
- 熟悉DStream的各种转换操作
- 熟悉把DStream的数据输出保存到文本文件或MySQL数据库中
二、实验内容
- 参照教材示例,利用Spark Streaming对不同类型数据源的数据进行处理
- 参照教材示例,完成DStream的两种有状态转换操作
- 参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中
三、实验步骤
1、利用Spark Streaming对不同类型数据源的数据进行处理
- 文件流首先在虚拟机中打开第一个终端作为数据流终端,创建一个logfile目录:
cd /usr/local/spark/mycodemkdir streamingcd streamingmkdir logfile
然后我们打开第二个终端作为流计算终端,在我们创建的目录下面新建一个py程序:vim FileStreaming.py
然后输入如下代码:from pyspark import SparkContext, SparkConffrom pyspark.streaming import StreamingContextconf = SparkConf()conf.setAppName('TestDStream')conf.setMaster('local[2]')sc = SparkContext(conf = conf)ssc = StreamingContext(sc,10)lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')words = lines.flatMap(lambda line: line.split(' '))wordCounts = words.map(lambda x :(x,1)).reduceByKey(lambda a,b:a+b)wordCounts.pprint()ssc.start()ssc.awaitTermination()
保存该文件并执行如下命令:/usr/local/spark/bin/spark-submit FileStreaming.py
然后我们进入数据流终端,在logfile目录下新建一个log2.txt文件,然后往里面输入一些英文语句后保存退出,再次切换到流计算终端,就可以看见打印出单词统计信息了。 - 套接字流我们继续在流计算端的streaming目录下创建一个socket目录,然后在该目录下创建一个DataSourceSocket.py程序:
mkdir socketcd socketvim NetworkWordCount.py
并在py程序中输入如下代码:from __future__ import print_functionimport sysfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ =="__main__":iflen(sys.argv)!=3:print("Usage: NetworkWordCount.py <hostname> <port>",file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount") ssc = StreamingContext(sc,1) lines = ssc.socketTextStream(sys.argv[1],int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word:(word,1))\ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
我们再在数据流终端启动Socket服务器端:nc-lk8888
然后我们再进入流计算终端,执行如下代码启动流计算:/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 8888
然后我们在数据流终端内手动输入一行英文句子后回车,多输入几次,流计算终端就会不断执行词频统计并打印出信息。 - RDD队列流我们继续在streaming目录下新建rddqueue目录并在该目录下创建py程序:
mkdir rddqueuecd rddqueue/vim RDDQueueStreaming.py
然后在py文件中输入如下代码:import timefrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ =="__main__": sc = SparkContext(appName="PythonStreamingQueueStream") ssc = StreamingContext(sc,2)#创建一个队列,通过该队列可以把RDD推给一个RDD队列流 rddQueue =[]for i inrange(5): rddQueue +=[ssc.sparkContext.parallelize([j for j inrange(1,1001)],10)] time.sleep(1)#创建一个RDD队列流 inputStream = ssc.queueStream(rddQueue) mappedStream = inputStream.map(lambda x:(x %10,1)) reducedStream = mappedStream.reduceByKey(lambda a, b: a + b) reducedStream.pprint() ssc.start() ssc.stop(stopSparkContext=True, stopGraceFully=True)
保存退出后再执行如下命令:/usr/local/spark/bin/spark-submit RDDQueueStreaming.py
2、完成DStream的两种有状态转换操作
- DStream无状态转换操作上面的词频统计程序NetworkWordCount就采取了无状态转换操作。
- DStream有状态转换操作我们在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码:
from __future__ import print_functionimport sysfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ =="__main__":if len(sys.argv)!=3: print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount") ssc = StreamingContext(sc, 10) ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint") lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\. reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10) counts.pprint() ssc.start() ssc.awaitTermination()
然后我们在数据流终端执行如下命令启动服务器:cd /usr/local/spark/mycode/streaming/socket/nc-lk6666
然后再在流计算终端运行我们刚写的代码:/usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 6666
在数据流终端输入英文就可以看见统计结果了。
3、完成把DStream的数据输出保存到MySQL数据库中
我们首先启动MySQL数据库:
systemctl start mysqld.service
mysql -u root -p
然后创建spark数据库和wordcount表:
mysql> create database spark;
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));
然后再在终端安装python连接MySQL的模块:
pip3 install PyMySQL
然后我们在streaming目录下新建stateful目录并在该目录下创建py文件:
mkdir stateful
cd stateful/
vim NetworkWordCountStatefulDB.py
并在py文件中输入如下代码:
from __future__ import print_function
import sys
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ =="__main__":iflen(sys.argv)!=3:print("Usage: NetworkWordCountStateful <hostname> <port>",file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc,1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful")# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello',1),(u'world',1)])defupdateFunc(new_values, last_sum):returnsum(new_values)+(last_sum or0)
lines = ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word:(word,1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()defdbfunc(records):
db = pymysql.connect("localhost","root","123456","spark")
cursor = db.cursor()defdoinsert(p):
sql ="insert into wordcount(word,count) values ('%s', '%s')"%(str(p[0]),str(p[1]))try:
cursor.execute(sql)
db.commit()except:
db.rollback()for item in records:
doinsert(item)deffunc(rdd):
repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
ssc.start()
ssc.awaitTermination()
然后我们新建一个数据源终端并执行如下命令:
cd /usr/local/spark/mycode/streaming/stateful/
nc-lk5555
然后再在我们的流计算终端运行我们该编写的代码:
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulDB.py localhost 5555
然后就可以把词频统计的结果写入MySQL中了。
版权归原作者 -北天- 所有, 如有侵权,请联系我们删除。