目录
课程目标一 分布式编程的基本能力
操作HDFS的命令序列
操作HBase的命令序列
MapReduce程序
代码示例1 values求和
mapper.py
#! /usr/bin/python3
import sys
def main():
# 从标准输入流中接受数据行,对每一行调用mapper函数来处理
for line in sys.stdin:
line = line.strip()
mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
words = line.split(' ')
for word in words:
if len(word.strip()) == 0:
continue
print("%s\t%s" % (word, 1))
if __name__ == '__main__':
main()
reducer.py
#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
print("%s\t%s" % (k, sum(values)))
def main():
current_key = None
values = []
_key, _value = '', 0
for line in sys.stdin:
line = line.strip()
_key, _value = line.split('\t', 1)
_value = eval(_value)
if current_key == _key:
values.append(_value)
else:
if current_key:
reducer(current_key, values)
values = []
values.append(_value)
current_key = _key
# 不要忘记最后一个键值对
if current_key == _key:
reducer(current_key, values)
if __name__ == '__main__':
main()
代码示例2 文件去重(使用set)
mapper.py
#! /usr/bin/python3
import sys
def main():
for line in sys.stdin:
line = line.strip()
mapper(line)
def mapper(line):
########## Begin ###############
# items = line.split('\n')
for item in line:
key,value = item.split()
print("%s\t%s" % (key,value))
########### End #############
if __name__ == '__main__':
main()
reducer.py
#! /usr/bin/python3
import sys
def reducer(k, values):
############ Begin ################
value = sorted(list(set(values)))
for v in value:
print("%s\t%s" % (k,v))
############ End ################
def group():
"""将框架排序后的<k1,v1>,<k1,v2>,<k2,v3><k2,v4> 包装为
<k1,[v1,v2]>,<k2,[v3,v4]>后提交reduce函数执行"""
cur_key = None
last_key = None
value = None
values = []
for line in sys.stdin:
try: #如果不是键值对
last_key, value = line.strip().split("\t", 1)
except:
continue
if not cur_key: #输入的是第一条记录
cur_key = last_key
values.append(value)
elif cur_key == last_key: # 输入的键值没变化
values.append(value)
else: # 输入的是一个新键,表明前一个键的值都已输完
reducer(cur_key, values)
cur_key = last_key
values = []
values.append(value)
else: # 所有的记录都已处理完,将最后一个键值对交reduce处理
reducer(cur_key, values)
if __name__ == '__main__':
group()
代码示例3 挖掘关系(排列组合)
mapper.py
#! /usr/bin/python3
import sys
def mapper(line):
############### Begin ############
child,parent=line.split()
print("%s\t-%s" % (child,parent))
print("%s\t+%s" % (parent,child))
############### End #############
def main():
for line in sys.stdin:
line = line.strip()
if line.startswith('child'):
pass
else:
mapper(line)
if __name__ == '__main__':
main()
reducer.py
#! /usr/bin/python3
import sys
def reducer(k, values):
############## Begin ################
gp=[]
gc=[]
for v in values:
if v.startswith("-"):
gp.append(v[1:])
else:
gc.append(v[1:])
for i in gc:
for j in gp:
print("%s\t%s" % (i,j))
############## End #################
SparkRDD
创建RDD
1.集合并行化创建RDD(parallelize)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc=SparkContext("local","Simple App")
# 2.创建一个1到8的列表List
data=[1,2,3,4,5,6,7,8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,主要作用是:收集 rdd 的数据内容作为数组返回
result=rdd.collect()
# 5.打印 rdd 的内容
print(result)
# 6.停止 SparkContext
sc.stop()
#********** End **********#
2.读取外部数据集创建RDD(textFile)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == '__main__':
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc=SparkContext("local","Simple App")
# 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
# 2.读取本地文件,URI为:/root/wordcount.txt
distFile=sc.textFile("/root/wordcount.txt")
# 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,主要作用是:收集 rdd 的数据内容以数组形式返回
result=distFile.collect()
# 4.打印 rdd 的内容
print(result)
# 5.停止 SparkContext
sc.stop()
#********** End **********#
RDD算子
1.map案例(rdd套函数,返回list)
sc = SparkContext("local", "Simple App")
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)
print(rdd.collect())
rdd_map = rdd.map(lambda x: x * 2)
print(rdd_map.collect())
2.mapPartitions案例
def f(iterator):
list = []
for x in iterator:
list.append(x*2)
return list
if __name__ == "__main__":
sc = SparkContext("local", "Simple App")
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)
print(rdd.collect())
partitions = rdd.mapPartitions(f)
print(partitions.collect())
3.flatMap案例(解嵌套)
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
4.filter案例
def filter(self, f):
"""
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
"""
def func(iterator):
return filter(fail_on_stopiteration(f), iterator)
return self.mapPartitions(func, True)
5.distinct案例
sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.collect())
distinct = rdd.distinct()
print(distinct.collect())
6.sortBy案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
by = rdd.sortBy(lambda x: x)
print(by.collect())
7.sortByKey案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
8.mapValues案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
values = rdd.mapValues(lambda x: x + 2)
print(values.collect())
9.reduceByKey案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
10.常用算子
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext('local','Simple App')
# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.收集rdd的所有元素并print输出
print(rdd.collect())
# 5.统计rdd的元素个数并print输出
print(rdd.count())
# 6.获取rdd的第一个元素并print输出
print(rdd.first())
# 7.获取rdd的前3个元素并print输出
print(rdd.take(3))
# 8.聚合rdd的所有元素并print输出
print(rdd.reduce(lambda x,y : x + y))
# 9.停止 SparkContext
sc.stop()
# ********** End **********#
11.WordCount - 词频统计
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
"""
需求:对本地文件系统URI为:/root/wordcount.txt 的内容进行词频统计
"""
# ********** Begin **********#
sc = SparkContext("local","app");
rdd = sc.textFile("/root/wordcount.txt")
li = rdd.flatMap(lambda x : str(x).split(" ")).map(lambda x : (x,1)).reduceByKey(lambda x,y:x + y).sortBy(lambda x : x[1],ascending=False).collect();
print(li)
# ********** End **********#
12.Friend Recommendation - 好友推荐
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
def word_couple(word1, word2):
if hash(word1) > hash(word2):
return word1 + '_' + word2
return word2 + '_' + word1
def relations(items):
result = []
for i in range(1, len(items)):
result.append((word_couple(items[0], items[i]), 0))
for j in range(i+1, len(items)):
result.append((word_couple(items[i], items[j]), 1))
return result
def fun2(x):
values = tuple(x[1])
return ((x[0], 0) if min(values)==0 else (x[0], sum(values)))
if __name__ == "__main__":
"""
需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
"""
# ********** Begin **********#
sc = SparkContext("local", "friend recommendation")
src = sc.textFile("/root/friend.txt").map(lambda x:x.strip().encode('utf-8').split(" "))
rdd = src.flatMap(relations).reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y).filter(lambda x:x[1]>0)
rdd = rdd.sortBy(lambda x:x)
print(rdd.collect())
# ********** End **********#
Spark SQL
DataFrame创建
基于RDD转换
1.直接createDataFrame,提供一个list做schema
2.使用StructType创建schema对象
3.用RDD算子toDF(不用createDataFrame)
读取文本文件转换(写操作)
df = spark.read.csv("path_to_input_file.csv", header=True, inferSchema=True)
正则表达式
regexp_extract和regexp_replace
spark SQL案例
# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL ") \
.master("local")\
.getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
out=spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as speed,`名称` from table1 order by cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) DESC limit 3")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL ") \
.master("local")\
.getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
out=spark.sql("select concat(round(count(`研发单位`)*100/(select count(*) as num from table1 where `研发单位` is not null and `名称`is not null ),2),'%') as ratio, `研发单位` from table1 where `研发单位` is not null and `名称`is not null group by `研发单位`")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ =='__main__':
spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
#**********begin**********#
df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")
df.createTempView("data")
spark.sql("""
select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,
regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,
regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,
regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,
regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,
regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,
regexp_replace(POLYLINE,'\\\W+','') as POLYLINE
from data
""").show()
#**********end**********#
spark.stop()
课程目标二 设计大数据解决方案的能力
数据存储方案
关系数据库、文件系统、分布式文件系统和分布式数据库是数据存储和管理的不同方法,每种方法都有其特定的用途、优缺点和最佳实践场景。以下是它们的一些区别和优缺点:
关系数据库(RDBMS)
优点:
- 结构化数据:数据以表格的形式存储,每行和列都有明确定义的数据类型。
- 事务支持:支持ACID(原子性、一致性、隔离性、持久性)事务,确保数据完整性。
- 数据完整性:通过主键、外键和约束等机制保证数据的准确性和一致性。
- 查询效率:使用SQL进行复杂查询,优化器可以高效执行查询。
- 成熟技术:广泛的社区支持,大量的工具和资源。
缺点:
- 扩展性:水平扩展(增加更多节点)比较困难,通常采用垂直扩展(增强单个节点的能力)。
- 性能瓶颈:在处理大规模数据集时可能会遇到性能瓶颈。
- 成本:需要专业的DBA和维护成本。
文件系统
优点:
- 简单性:文件系统易于理解和使用,不需要复杂的数据库管理系统。
- 灵活性:可以存储任何类型的数据,不需要预定义的数据模型。
- 便携性:文件可以在不同的系统和应用程序之间轻松传输。
缺点:
- 数据一致性:缺乏数据一致性和完整性的保障机制。
- 数据冗余:容易导致数据冗余和重复。
- 查询效率:对于复杂的数据查询和分析,效率低下。
分布式文件系统(DFS)
优点:
- 可扩展性:可以跨多个节点存储和处理大量数据。
- 容错性:数据在多个节点上复制,提高了数据的可靠性和容错性。
- 高性能:通过并行处理提高数据访问速度。
缺点:
- 复杂性:需要复杂的系统来管理数据的一致性和复制。
- 延迟:跨网络访问数据可能会增加延迟。
- 数据一致性:在某些情况下,可能需要额外的机制来保证数据一致性。
分布式数据库
优点:
- 可扩展性:可以水平扩展,通过增加更多节点来处理更大的数据集。
- 高可用性:数据在多个节点上复制,提高了系统的可用性和容错性。
- 负载均衡:可以跨多个节点分布查询负载。
- 灵活性:可以结合关系数据库和非关系数据库的优点。
缺点:
- 复杂性:需要复杂的架构来管理数据的一致性、复制和分区。
- 成本:可能需要更多的硬件和维护成本。
- 数据一致性:在分布式系统中保持数据一致性是一个挑战。
对于需要高度事务性和数据一致性的应用,关系数据库可能是最佳选择;而对于需要处理大规模数据集和高吞吐量的应用,分布式文件系统或分布式数据库可能更合适。
数据计算框架
集中式计算、分布式计算、批计算、交互式计算和流计算是不同的计算范式,它们在数据处理、系统架构和应用场景上有所不同。以下是它们的区别和优缺点:
集中式计算(Centralized Computing)
定义:所有计算任务都在一个中心节点或服务器上执行。
优点:
- 简化管理:所有硬件和软件都集中在一个地点,易于管理和维护。
- 性能:单个强大的服务器可以提供高性能的计算能力。
缺点:
- 可扩展性:难以水平扩展以处理更大的工作负载。
- 容错性:单点故障可能导致整个系统的计算能力丧失。
- 成本:需要昂贵的高性能硬件。
分布式计算(Distributed Computing)
定义:计算任务分布在多个物理或逻辑上分离的节点上执行。
优点:
- 可扩展性:通过增加更多节点来提高计算能力。
- 容错性:节点间的数据复制提高了系统的可靠性。
- 成本效益:可以使用普通硬件构建大规模集群。
缺点:
- 复杂性:需要复杂的系统来管理数据一致性和节点通信。
- 网络依赖:对网络延迟和带宽有依赖。
批计算(Batch Computing)
定义:处理大量数据集合的计算任务,通常不需要即时响应。
优点:
- 效率:适合大规模数据处理,可以优化资源使用。
- 可靠性:可以在计算过程中处理节点故障。
缺点:
- 延迟:不适合需要即时结果的任务。
- 资源占用:在批处理窗口期间可能占用大量资源。
交互式计算(Interactive Computing)
定义:需要即时响应的计算任务,用户与系统之间有直接交互。
优点:
- 响应性:系统需要快速响应用户请求。
- 灵活性:用户可以根据实时反馈进行决策。
缺点:
- 资源限制:可能无法处理大规模数据集。
- 扩展性:难以扩展以支持大量并发用户。
流计算(Stream Computing)
定义:处理连续数据流的计算任务,数据是实时到达的。
优点:
- 实时性:能够对数据流进行实时分析和处理。
- 窗口操作:可以对数据流的特定窗口进行计算。
缺点:
- 复杂性:需要复杂的系统来处理数据的连续性和状态管理。
- 容错性:在流处理中处理故障和数据完整性更具挑战性。
总结
- 集中式计算适合管理简单、计算资源集中的场景。
- 分布式计算适合需要高可扩展性和容错性的场景。
- 批计算适合大规模数据处理,不需要即时响应的场景。
- 交互式计算适合需要即时反馈和决策支持的场景。
- 流计算适合需要实时处理连续数据流的场景。
分析题及答案
供电企业的SCADA系统每隔15分钟自动采集一次用户用电数据,为了减小数据库的存储压力,每个月都要导出上月的数据。营销部门想根据这些数据分析用户的用电习惯。 从数据库导出的数据文件随着时间不断增加,需要用合适的方式存储下来。第一种方案是存储到部署在单台服务器上的大型数据库Oracle里,第二种方案是存储到Hadoop集群的分布式文件系统HDFS里。你倾向于选择哪种方案,说说你的理由。
在这种情况下,我倾向于选择将数据存储到Hadoop集群的分布式文件系统(HDFS)中。以下是选择HDFS的几个主要理由:
1. 可扩展性
- HDFS:Hadoop集群可以轻松地通过增加更多的节点来扩展存储容量,非常适合处理随着时间增长的数据量。HDFS设计之初就是为了处理大规模数据集。
- Oracle:虽然Oracle是一个强大的数据库系统,但在单台服务器上部署时,其存储容量受限于单个服务器的硬件限制,水平扩展能力有限。
2. 成本效益
- HDFS:使用Hadoop集群可以利用普通的硬件构建一个成本效益高的存储解决方案,特别是在数据量迅速增长时。
- Oracle:维护一个大型的Oracle数据库可能需要昂贵的硬件和专业的数据库管理员,长期来看成本较高。
3. 数据处理能力
- HDFS:与Hadoop生态系统中的其他组件(如MapReduce、Hive、Pig等)结合使用,HDFS可以提供强大的数据处理能力,适合进行大规模数据分析。
- Oracle:虽然Oracle提供了强大的SQL处理能力,但在处理非结构化数据和大规模数据集时,可能不如Hadoop生态系统灵活。
4. 容错性
- HDFS:HDFS天生具有高容错性,数据会自动复制到多个节点,即使某些节点失败,数据也不会丢失。
- Oracle:虽然Oracle也有数据备份和恢复机制,但在单点故障的情况下,可能不如分布式系统那样具有即时的容错能力。
5. 灵活性
- HDFS:HDFS可以存储任何格式的数据,包括结构化、半结构化和非结构化数据,适合多种数据分析需求。
- Oracle:Oracle主要用于存储结构化数据,对于非结构化数据的处理能力有限。
6. 性能
- HDFS:对于大规模数据的读写操作,HDFS可以利用集群的并行处理能力,提供高性能。
- Oracle:在单台服务器上,Oracle的性能可能受到硬件资源的限制,特别是在数据量非常大时。
结论
考虑到供电企业SCADA系统产生的数据量和增长趋势,以及营销部门对数据分析的需求,HDFS提供了一个更加灵活、可扩展、成本效益高的解决方案。HDFS能够更好地适应数据量的增长,同时提供了强大的数据处理和分析工具,这对于从大量用户用电数据中提取有价值的信息至关重要。
AIchatOS2
版权归原作者 qianzizzz 所有, 如有侵权,请联系我们删除。