0


大数据技术及应用期末总结

目录


课程目标一 分布式编程的基本能力

操作HDFS的命令序列

75c33b4944ad4b1ab3ed6a5f8b03c409.png

9030be49b138430ea8510e84b935a251.png

9457f34314704bc093cb8cedbc3a11cc.png

29038276733f4c7ab035b6aeab38d316.png

4fc0bd4be55f4223a20a061d8c497f32.png

c8305d5aa7904a13adbcecbb96b72a98.png

操作HBase的命令序列

0bf1206933854036beaf4d84172637bc.png56f37e8f251346bfa4be851cc319a09d.png

84d27dcb630a4de9823eb01baee7cf64.png

MapReduce程序

代码示例1 values求和

mapper.py

#! /usr/bin/python3
import sys
def main():
    # 从标准输入流中接受数据行,对每一行调用mapper函数来处理
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
    words = line.split(' ')
    for word in words:
        if len(word.strip()) == 0:
            continue
        print("%s\t%s" % (word, 1))
if __name__ == '__main__':
    main()

reducer.py

#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
    print("%s\t%s" % (k, sum(values)))
def main():
    current_key = None
    values = []
    _key, _value = '', 0
    for line in sys.stdin:
        line = line.strip()
        _key, _value = line.split('\t', 1)
        _value = eval(_value)
        if current_key == _key:
            values.append(_value)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(_value)
            current_key = _key
    # 不要忘记最后一个键值对
    if current_key == _key:
        reducer(current_key, values)
if __name__ == '__main__':
    main()

代码示例2 文件去重(使用set)

mapper.py

#! /usr/bin/python3
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    ########## Begin  ###############
#     items = line.split('\n')
     for item in line:
         key,value = item.split()
         print("%s\t%s" % (key,value))
   ###########  End    #############
if __name__ == '__main__':
    main()

reducer.py

#! /usr/bin/python3

import sys

def reducer(k, values):
    ############  Begin   ################
    value = sorted(list(set(values)))
    for v in value:
       print("%s\t%s" % (k,v))
    ############   End    ################

def group():
    """将框架排序后的<k1,v1>,<k1,v2>,<k2,v3><k2,v4> 包装为
    <k1,[v1,v2]>,<k2,[v3,v4]>后提交reduce函数执行"""
    cur_key = None      
    last_key = None
    value = None
    values = []
    for line in sys.stdin:
        try:   #如果不是键值对
            last_key, value = line.strip().split("\t", 1)
        except:
            continue
        if not cur_key: #输入的是第一条记录
            cur_key = last_key
            values.append(value)
        elif cur_key == last_key:  # 输入的键值没变化
            values.append(value)
        else:                     # 输入的是一个新键,表明前一个键的值都已输完
            reducer(cur_key, values)
            cur_key = last_key
            values = []
            values.append(value)
    else:    # 所有的记录都已处理完,将最后一个键值对交reduce处理
        reducer(cur_key, values)

if __name__ == '__main__':
    group()

代码示例3 挖掘关系(排列组合)

mapper.py

#! /usr/bin/python3
  
import sys

def mapper(line):
    ###############  Begin   ############
    child,parent=line.split()
    print("%s\t-%s" % (child,parent))
    print("%s\t+%s" % (parent,child))
    ###############   End    #############

def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
            pass
        else:
            mapper(line)

if __name__ == '__main__':
    main()

reducer.py

#! /usr/bin/python3

import sys

def reducer(k, values):
    ##############    Begin    ################
    gp=[]
    gc=[]
    for v in values:
        if v.startswith("-"):
            gp.append(v[1:])
        else:
            gc.append(v[1:])
    for i in gc:
        for j in gp:
            print("%s\t%s" % (i,j))
    ##############   End      #################

SparkRDD

创建RDD

1.集合并行化创建RDD(parallelize)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc=SparkContext("local","Simple App")
 
    # 2.创建一个1到8的列表List
    data=[1,2,3,4,5,6,7,8]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,主要作用是:收集 rdd 的数据内容作为数组返回
    result=rdd.collect()

    # 5.打印 rdd 的内容
    print(result)

    # 6.停止 SparkContext
    sc.stop()
    #********** End **********#

2.读取外部数据集创建RDD(textFile)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == '__main__':
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc=SparkContext("local","Simple App")
 
    # 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
    # 2.读取本地文件,URI为:/root/wordcount.txt
    distFile=sc.textFile("/root/wordcount.txt")
 
    # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,主要作用是:收集 rdd 的数据内容以数组形式返回
    result=distFile.collect()
 
    # 4.打印 rdd 的内容
    print(result)
 
    # 5.停止 SparkContext
    sc.stop()
 
    #********** End **********#

RDD算子

1d903b6c543346688887b89278b0f4d2.png

9c08c6cadeba44479bff9034eb5d2378.png

1.map案例(rdd套函数,返回list)

sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    rdd_map = rdd.map(lambda x: x * 2)
    print(rdd_map.collect())

2.mapPartitions案例

def f(iterator):
    list = []
    for x in iterator:
        list.append(x*2)
    return list
if __name__ == "__main__":
    sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    partitions = rdd.mapPartitions(f)
    print(partitions.collect())

3.flatMap案例(解嵌套)

sc = SparkContext("local", "Simple App")
    data = [["m"], ["a", "n"]]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    flat_map = rdd.flatMap(lambda x: x)
    print(flat_map.collect())

4.filter案例

def filter(self, f):
        """
        Return a new RDD containing only the elements that satisfy a predicate.
        >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
        >>> rdd.filter(lambda x: x % 2 == 0).collect()
        [2, 4]
        """
        def func(iterator):
            return filter(fail_on_stopiteration(f), iterator)
        return self.mapPartitions(func, True)

5.distinct案例

sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    distinct = rdd.distinct()
    print(distinct.collect())

6.sortBy案例

sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("c",1),("b",1)]
    rdd = sc.parallelize(data)
    by = rdd.sortBy(lambda x: x)
    print(by.collect())

7.sortByKey案例

 sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("c",1),("b",1)]
    rdd = sc.parallelize(data)
    key = rdd.sortByKey()
    print(key.collect())

8.mapValues案例

sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("b",1)]
    rdd = sc.parallelize(data)
    values = rdd.mapValues(lambda x: x + 2)
    print(values.collect())

9.reduceByKey案例

sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("b",1)]
    rdd = sc.parallelize(data)
    print(rdd.reduceByKey(lambda x,y:x+y).collect())

10.常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口

    sc = SparkContext('local','Simple App')
    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List

    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())

    # 5.统计rdd的元素个数并print输出

    print(rdd.count())

    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())

    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))

    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x,y : x + y))

    # 9.停止 SparkContext

    sc.stop()
    # ********** End **********#

11.WordCount - 词频统计

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":

    """
        需求:对本地文件系统URI为:/root/wordcount.txt 的内容进行词频统计
    """
    # ********** Begin **********#
    sc = SparkContext("local","app");
    rdd = sc.textFile("/root/wordcount.txt")
    li = rdd.flatMap(lambda x : str(x).split(" ")).map(lambda x : (x,1)).reduceByKey(lambda x,y:x + y).sortBy(lambda x : x[1],ascending=False).collect();
    print(li)

    # ********** End **********#

12.Friend Recommendation - 好友推荐

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
def word_couple(word1, word2):
    if hash(word1) > hash(word2):
        return word1 + '_' + word2
    return word2 + '_' + word1
 
def relations(items):
    result = []
    for i in range(1, len(items)):
        result.append((word_couple(items[0], items[i]), 0))
        for j in range(i+1, len(items)):
            result.append((word_couple(items[i], items[j]), 1))
    return result
 
def fun2(x):
    values = tuple(x[1])
    return ((x[0], 0) if min(values)==0 else (x[0], sum(values)))
 
if __name__ == "__main__":
    """
        需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
    """
    # ********** Begin **********#
    sc = SparkContext("local", "friend recommendation")
    src = sc.textFile("/root/friend.txt").map(lambda x:x.strip().encode('utf-8').split(" "))
    rdd = src.flatMap(relations).reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y).filter(lambda x:x[1]>0)
    rdd = rdd.sortBy(lambda x:x)
    print(rdd.collect())
    # ********** End **********#

Spark SQL

DataFrame创建

基于RDD转换

1.直接createDataFrame,提供一个list做schema

f9819e2771aa4ffaa341333d072b8d52.png

2.使用StructType创建schema对象

86763216c8e647c698a561d55ae0cee6.png

3.用RDD算子toDF(不用createDataFrame)

703117471d2b402ba5c8e1c2865038df.png

读取文本文件转换(写操作)

983c814f07e24a9aadbb4c228a134fa8.png

df = spark.read.csv("path_to_input_file.csv", header=True, inferSchema=True)

b953c22af9604512a02f6d6f19d59bc5.png

1b516bfe5df143a5a0e2dab9e1432621.png

正则表达式

055432304de54c7cb3ebe78f81f35eee.png

17e385b0d9424649bcb54865005b588a.png

regexp_extract和regexp_replace

c9aaddfa60c0446eace89aaaea1e72dc.png

a9572b3de5a84256b6de03f637666ea6.png

spark SQL案例

# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL ") \
    .master("local")\
    .getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
out=spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as speed,`名称` from table1  order by cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float)  DESC limit 3")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL ") \
    .master("local")\
    .getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
out=spark.sql("select concat(round(count(`研发单位`)*100/(select count(*) as num from table1 where `研发单位` is not null and `名称`is not null ),2),'%') as ratio, `研发单位` from table1  where  `研发单位` is not null and `名称`is not null group by  `研发单位`")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ =='__main__':
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    #**********begin**********#
    df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")
    df.createTempView("data")
    spark.sql("""
    select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,
        regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,
        regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,
        regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,
        regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,
        regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,
        regexp_replace(POLYLINE,'\\\W+','') as POLYLINE
    from data
    """).show()
    #**********end**********#
    spark.stop()

课程目标二 设计大数据解决方案的能力

数据存储方案

关系数据库、文件系统、分布式文件系统和分布式数据库是数据存储和管理的不同方法,每种方法都有其特定的用途、优缺点和最佳实践场景。以下是它们的一些区别和优缺点:

关系数据库(RDBMS)

优点

  1. 结构化数据:数据以表格的形式存储,每行和列都有明确定义的数据类型。
  2. 事务支持:支持ACID(原子性、一致性、隔离性、持久性)事务,确保数据完整性。
  3. 数据完整性:通过主键、外键和约束等机制保证数据的准确性和一致性。
  4. 查询效率:使用SQL进行复杂查询,优化器可以高效执行查询。
  5. 成熟技术:广泛的社区支持,大量的工具和资源。

缺点

  1. 扩展性:水平扩展(增加更多节点)比较困难,通常采用垂直扩展(增强单个节点的能力)。
  2. 性能瓶颈:在处理大规模数据集时可能会遇到性能瓶颈。
  3. 成本:需要专业的DBA和维护成本。

文件系统

优点

  1. 简单性:文件系统易于理解和使用,不需要复杂的数据库管理系统。
  2. 灵活性:可以存储任何类型的数据,不需要预定义的数据模型。
  3. 便携性:文件可以在不同的系统和应用程序之间轻松传输。

缺点

  1. 数据一致性:缺乏数据一致性和完整性的保障机制。
  2. 数据冗余:容易导致数据冗余和重复。
  3. 查询效率:对于复杂的数据查询和分析,效率低下。

分布式文件系统(DFS)

优点

  1. 可扩展性:可以跨多个节点存储和处理大量数据。
  2. 容错性:数据在多个节点上复制,提高了数据的可靠性和容错性。
  3. 高性能:通过并行处理提高数据访问速度。

缺点

  1. 复杂性:需要复杂的系统来管理数据的一致性和复制。
  2. 延迟:跨网络访问数据可能会增加延迟。
  3. 数据一致性:在某些情况下,可能需要额外的机制来保证数据一致性。

分布式数据库

优点

  1. 可扩展性:可以水平扩展,通过增加更多节点来处理更大的数据集。
  2. 高可用性:数据在多个节点上复制,提高了系统的可用性和容错性。
  3. 负载均衡:可以跨多个节点分布查询负载。
  4. 灵活性:可以结合关系数据库和非关系数据库的优点。

缺点

  1. 复杂性:需要复杂的架构来管理数据的一致性、复制和分区。
  2. 成本:可能需要更多的硬件和维护成本。
  3. 数据一致性:在分布式系统中保持数据一致性是一个挑战。

对于需要高度事务性和数据一致性的应用,关系数据库可能是最佳选择;而对于需要处理大规模数据集和高吞吐量的应用,分布式文件系统或分布式数据库可能更合适。

数据计算框架

集中式计算、分布式计算、批计算、交互式计算和流计算是不同的计算范式,它们在数据处理、系统架构和应用场景上有所不同。以下是它们的区别和优缺点:

集中式计算(Centralized Computing)

定义:所有计算任务都在一个中心节点或服务器上执行。

优点

  • 简化管理:所有硬件和软件都集中在一个地点,易于管理和维护。
  • 性能:单个强大的服务器可以提供高性能的计算能力。

缺点

  • 可扩展性:难以水平扩展以处理更大的工作负载。
  • 容错性:单点故障可能导致整个系统的计算能力丧失。
  • 成本:需要昂贵的高性能硬件。

分布式计算(Distributed Computing)

定义:计算任务分布在多个物理或逻辑上分离的节点上执行。

优点

  • 可扩展性:通过增加更多节点来提高计算能力。
  • 容错性:节点间的数据复制提高了系统的可靠性。
  • 成本效益:可以使用普通硬件构建大规模集群。

缺点

  • 复杂性:需要复杂的系统来管理数据一致性和节点通信。
  • 网络依赖:对网络延迟和带宽有依赖。

批计算(Batch Computing)

定义:处理大量数据集合的计算任务,通常不需要即时响应。

优点

  • 效率:适合大规模数据处理,可以优化资源使用。
  • 可靠性:可以在计算过程中处理节点故障。

缺点

  • 延迟:不适合需要即时结果的任务。
  • 资源占用:在批处理窗口期间可能占用大量资源。

交互式计算(Interactive Computing)

定义:需要即时响应的计算任务,用户与系统之间有直接交互。

优点

  • 响应性:系统需要快速响应用户请求。
  • 灵活性:用户可以根据实时反馈进行决策。

缺点

  • 资源限制:可能无法处理大规模数据集。
  • 扩展性:难以扩展以支持大量并发用户。

流计算(Stream Computing)

定义:处理连续数据流的计算任务,数据是实时到达的。

优点

  • 实时性:能够对数据流进行实时分析和处理。
  • 窗口操作:可以对数据流的特定窗口进行计算。

缺点

  • 复杂性:需要复杂的系统来处理数据的连续性和状态管理。
  • 容错性:在流处理中处理故障和数据完整性更具挑战性。

总结

  • 集中式计算适合管理简单、计算资源集中的场景。
  • 分布式计算适合需要高可扩展性和容错性的场景。
  • 批计算适合大规模数据处理,不需要即时响应的场景。
  • 交互式计算适合需要即时反馈和决策支持的场景。
  • 流计算适合需要实时处理连续数据流的场景。

分析题及答案

供电企业的SCADA系统每隔15分钟自动采集一次用户用电数据,为了减小数据库的存储压力,每个月都要导出上月的数据。营销部门想根据这些数据分析用户的用电习惯。 从数据库导出的数据文件随着时间不断增加,需要用合适的方式存储下来。第一种方案是存储到部署在单台服务器上的大型数据库Oracle里,第二种方案是存储到Hadoop集群的分布式文件系统HDFS里。你倾向于选择哪种方案,说说你的理由。

在这种情况下,我倾向于选择将数据存储到Hadoop集群的分布式文件系统(HDFS)中。以下是选择HDFS的几个主要理由:

1. 可扩展性

  • HDFS:Hadoop集群可以轻松地通过增加更多的节点来扩展存储容量,非常适合处理随着时间增长的数据量。HDFS设计之初就是为了处理大规模数据集。
  • Oracle:虽然Oracle是一个强大的数据库系统,但在单台服务器上部署时,其存储容量受限于单个服务器的硬件限制,水平扩展能力有限。

2. 成本效益

  • HDFS:使用Hadoop集群可以利用普通的硬件构建一个成本效益高的存储解决方案,特别是在数据量迅速增长时。
  • Oracle:维护一个大型的Oracle数据库可能需要昂贵的硬件和专业的数据库管理员,长期来看成本较高。

3. 数据处理能力

  • HDFS:与Hadoop生态系统中的其他组件(如MapReduce、Hive、Pig等)结合使用,HDFS可以提供强大的数据处理能力,适合进行大规模数据分析。
  • Oracle:虽然Oracle提供了强大的SQL处理能力,但在处理非结构化数据和大规模数据集时,可能不如Hadoop生态系统灵活。

4. 容错性

  • HDFS:HDFS天生具有高容错性,数据会自动复制到多个节点,即使某些节点失败,数据也不会丢失。
  • Oracle:虽然Oracle也有数据备份和恢复机制,但在单点故障的情况下,可能不如分布式系统那样具有即时的容错能力。

5. 灵活性

  • HDFS:HDFS可以存储任何格式的数据,包括结构化、半结构化和非结构化数据,适合多种数据分析需求。
  • Oracle:Oracle主要用于存储结构化数据,对于非结构化数据的处理能力有限。

6. 性能

  • HDFS:对于大规模数据的读写操作,HDFS可以利用集群的并行处理能力,提供高性能。
  • Oracle:在单台服务器上,Oracle的性能可能受到硬件资源的限制,特别是在数据量非常大时。

结论

考虑到供电企业SCADA系统产生的数据量和增长趋势,以及营销部门对数据分析的需求,HDFS提供了一个更加灵活、可扩展、成本效益高的解决方案。HDFS能够更好地适应数据量的增长,同时提供了强大的数据处理和分析工具,这对于从大量用户用电数据中提取有价值的信息至关重要。

AIchatOS2

标签: 大数据 python

本文转载自: https://blog.csdn.net/2301_79775973/article/details/143591574
版权归原作者 qianzizzz 所有, 如有侵权,请联系我们删除。

“大数据技术及应用期末总结”的评论:

还没有评论