0


头歌大数据实训(总结)

1.HDFS:

1.1常用命令:

1.操作命令:

1.1创建文件夹
hdfs dfs -mkdir [-p] <文件路径>

例如,新建文件夹 /202012721、/202012721/dir1。命令如下:

hdfs dfs -mkdir /202012721
hdfs dfs -mkdir -p /202012721/dir1
  • [-p]:表示如果父目录不存在,先创建父目录。
1.2 列出指定的文件和目录
hdfs dfs -ls [-d][-h][-R] <文件路径>

[-d]:返回 path。

[-h]:h 指 human-readble,表示按照人性化的单位显示文件大小,比如文件显示为 10 MB,而不会显示 10240 KB。

[-R]:级联显示 paths 下的文件。

例如,列出根目录下的文件或目录。命令如下:

hdfs dfs -ls /
1.3新建文件
hdfs dfs -touchz <存放路径>

例如,在主目录 /202012721/input 下创建大小为 0 00 的空文件 file。命令如下:

hdfs dfs -touchz /202012721/input.txt
hdfs dfs -ls /202012721
1.4上传文件
hdfs dfs -put [-f] [-p] <本地路径> <HDFS上的路径>

例如,在本地创建一个文件

data.txt

并传到 HDFS 的

/202012721

目录下命令如下:

hdfs dfs -put data.txt /202012721/data.txt
1.5将本地文件移动到 HDFS
hdfs dfs -moveFromLocal <本地文件路径> <HDFS路径> 

例如,我们把本地文件 data2.txt 移动到 HDFS 的 /202012721 命令如下:

hdfs dfs -moveFromLocal data2.txt /202012721 
1.6下载文件
hdfs dfs -get [-p] <HDFS路径> <本地文件路径>

例如,将 HDFS 中的 /202012721/data.txt 文件下载并保存为本地的 ~/local_data.txt:

hdfs dfs -get /202012721/data.txt ~/local_data.txt
1.7查看文件
hdfs dfs -cat [-ignoreCrc] <文件路径>

[-ignoreCrc]:忽略循环检验失败的文件。

hdfs dfs -text [-ignoreCrc] <文件路径>

text 不仅可以查看文本文件,还可以查看压缩文件和 Avro 序列化的文件。

hdfs dfs -tail [-f] <文件路径>

[-f]:动态更新显示数据。

tail 查看的是最后 1 KB 的文件(Linux 上的 tail 默认查看最后 10 行记录)。

例如,查看 /202012721/data.txt。命令如下:

hdfs dfs -cat /202012721/data.txt
1.8追写文件
hdfs dfs -appendToFile <本地路径> <目标路径>

该命令将 localsrc 指向的本地文件内容写入目标文件 dst。

例如,在本地根目录下新建 data3.txt 文件,内容为 hadoop,追加到文件 /202012721/data.txt 中。命令如下:

hdfs dfs -appendToFile data3.txt /202012721/data.txt
hdfs dfs -cat /202012721/data.txt
1.9删除目录或者文件
hdfs dfs -rm [-f] [-r] <文件路径>

[-f]:如果要删除的文件不存在,不显示错误信息。

[-r/R]:级联删除目录下所有的文件和子目录文件。

例如,删除 HDFS 中的 /202012721/data2.txt 文件。命令如下:

hdfs dfs -rm /202012721/data2.txt
1.10显示占用的磁盘空间大小
hdfs dfs -du [-s] [-h] <文件路径>

[-s]:显示指定目录下文件总的大小。

[-h]:h 指 human-readble,表示按照人性化的单位显示文件大小,比如文件显示为 10 MB,而不会显示 10240 KB。

例如,显示 HDFS 根目录中 202012721 文件夹下所有文件的大小。命令如下:

hdfs dfs -du /202012721
1.11HDFS 中的文件复制
hdfs dfs -cp [-f] [-p | -p[topax]] <原路径> <目标路径>

[-f]:如果目标文件存在,将强行覆盖。

[-p]:将保存文件的属性。

例如,将 HDFS 中的 /202012721/data.txt 复制为 /202012721/data_copy.txt。命令如下:

hdfs dfs -cp /202012721/data.txt /202012721/data_copy.txt
1.12 HDFS 中的文件移动
hdfs dfs -mv <原路径> <目标路径>

例如,将 HDFS 中的 /202012721/data_copy.txt 移动(也可理解为改名)为 /202012721/data2.txt。命令如下:

hdfs dfs -mv /202012721/data_copy.txt /202012721/data2.txt
1.13管理-报告文件系统的基本信息和统计信息
hdfs dfsadmin -report
1.14查看拓扑
hdfs dfsadmin -printTopology

1.2例题:

任务描述
本关任务:使用 Hadoop 命令来操作分布式文件系统。

编程要求
在右侧命令行中启动 Hadoop ,进行如下操作。

在 HDFS 中创建 /usr/output/ 文件夹;
在本地创建 hello.txt 文件并添加内容:“ HDFS 的块比磁盘的块大,其目的是为了最小化寻址开销。”;
将 hello.txt 上传至 HDFS 的 /usr/output/ 目录下;
删除 HDFS 的 /user/hadoop 目录;
将 Hadoop 上的文件 hello.txt 从 HDFS 复制到本地 /usr/local 目录。

预期输出:

HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。

HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。

代码:

start-dfs.sh
hadoop fs -mkdir /usr
hadoop fs -mkdir /usr/output
vim hello.txt
hadoop fs -put hello.txt /usr/output
hadoop fs -rm -r /user/hadoop
hadoop fs -copyToLocal /usr/output/hello.txt /usr/local

HDFS系统初体验:

start-dfs.sh
hadoop fs -mkdir /task
hadoop fs -ls /
touch task.txt
vim task.txt
 
hello educoder

hello educoder输入完后,退出vim文本编辑器后再输入下面的内容

hadoop fs -put task.txt /task
hadoop fs -cat /task/task.txt

2.HBase:

2.1常用命令:

1. 启动HBase

start-hbase.sh

2. 关闭hbase

stop-hbase.sh

3. 进入客户端

./hbase shell

4. 退出客户端命令

quit

5. 查看namespace

list_namespace

6. 创建namespace

create_namespace "namespace"

7. 删除namespace

drop_namespace "namespace"

8.表操作

1. 查看所有表

hbase(main):024:0> list

2. 查看某个namespace下所有的表

hbase(main):027:0> list_namespace_tables "testns"

3. 创建表

hbase(main):027:0> create "namespace:表名","列族1","列族2"

hbase(main):027:0> create "testns:t_person","info","edu"

4. 查看表结构

hbase(main):027:0> desc "testns:t_person"

5. 删除表和禁用表

hbase(main):027:0> disable "namespace:表"

hbase(main):027:0> drop "namespace:表"

9.数据增删改查

1. 添加数据

put "namespace:表","rowkey","列族1:列名1","值"

hbase(main):007:0> put 'testns:t_person','1001','info:name','zhangsan'

2. 根据rowkey查找数据

get "namespace:表名","rowkey"

hbase(main):015:0> get 'testns:t_person','1001'

3. scan查询表中所有数据

hbase(main):015:0> scan 'testns:t_person'

4. scan 查询表中前2条数据

hbase(main):015:0> scan "restns:t_person",{LIMIT=>2}

5. 使用start row和end row范围查找

hbase(main):015:0> scan "testns:t_person",{STARTROW=>'1001',STOPROW=>'1003'}

6. 使用start row和limit查找

hbase(main):015:0> scan "testns:t_person",{STARTROW=>'1001',LIMIT=>2}

7. 修改数据

put "namespace:表名","rowkey","列族:列名","值"

8. 删除数据

delete "namespace:表","rowkey","列族:列名"

9. 删除某个rowkey对应的数据

deleteall "namespace:表","rowkey"

10. 统计表中所有数据

count "namespace:表"

11. 清空表中的所有数据

truncate "namespace:表"

12. 创建表

hbase(main):013:0> create "testns:user","info"

13. 修改版本数

hbase(main):013:0> alter "testns:user",{NAME=>'INFO',VERSIONS=>2}

14. 查看多版本

hbase(main):013:0> get "testns:user","10001",{COLUMN=>'info:name',VERSIONS=>3}

例题:

创建表:

创建test表,然后继续在HBase中创建两张表,表名分别为:dept,emp,列都为:data

输出:

describe 'test' Table test is ENABLED test describe 'dept' Table dept is ENABLED dept describe 'emp' Table emp is ENABLED emp

启动:

hbase shell

建表:

# 创建表 test
create 'test', 'data'
 
# 创建表 dept
create 'dept', 'data'
 
# 创建表 emp
create 'emp', 'data'

添加数据、删除数据、删除表:

建表插数据:

create 'mytable', 'data'
put 'mytable', 'row1', 'data:1', 'zhangsan'
put 'mytable', 'row2', 'data:2', 'zhangsanfeng'
put 'mytable', 'row3', 'data:3', 'zhangwuji'

3.MapReduce

Map/Reduce程序总共分为两块即:Map,Reduce,Map负责处理输入文件的内容。 mapper方法,它以空格为分隔符将一行切分为若干tokens,之后,输出< <word>, 1> 形式的键值对。

Reducer中的reduce方法 仅是将每个key出现的次数求和。 hadoop-streaming只来自各个mapper的键值对按照键排序,不会合并,因此我们需要自己将排序后的键值对合并成<键, 值列表>的形式后再发给reduce程序处理,recuder.py中的main()就是为了实现这个功能。

例题:

1.成绩统计

使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。

#! /usr/bin/python3

import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

# 使用name,age分别表示姓名和年龄
def mapper(line):
    ##########  begin      ############
    group = line.split('\\n')
    for people in group:
        if len(people.strip()) == 0:
            continue
        name, age = people.split(' ')
    ###########  End  #################

        print("%s\t%s" % (name, age))

if __name__ == '__main__':
    main()
#! /usr/bin/python3

import sys
from operator import itemgetter
# 找出values的最大值,并按name\tmax_age的形式输出。
def reducer(k, values):
    ##### Begin #########

    print("%s\t%s" % (k,max(values)))
    #####  End  #########

############################################# 
# mapper的输出经过分区处理后,数据行按照键排序;   
# 具有相同键的行排在一起;                      
# hadoop-streaming不会合并相同键的各个值。      
# 下面的代码将相同键的各个值放到同一个列表中,     
# 并调用reducer函数实现找出每个人的最大年龄并输出;
# 输出格式为:姓名\年龄                        
#############################################

def main():
    current_name = None
    ages = []
    name, age = '', 0
    for line in sys.stdin:
        line = line.strip()
        name, age = line.split('\t', 1)
        age = int(age)

        if current_name == name:
            ages.append(age)
        else:
            if current_name:
                reducer(current_name, ages)
                ages = []
            
            ages.append(age)
            current_name = name
    # 不要忘记最后一个人
    if current_name == name:
        reducer(current_name, ages)

if __name__ == '__main__':
    main()

2.文件内容合并去重

对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下: 第一列按学号排列; 学号相同,按x,y,z排列; 输入文件路径为:/user/tmp/input/; 输出路径为:/user/tmp/output/。 注意:输入文件后台已经帮你创建好了,不需要你再重复创建。

#! /usr/bin/python3
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    ########## Begin  ###############
    items = line.split('\\n') 
    
    for item in items:
        key,value = item.split() 
        print("%s\t%s" % (key,value))
   ###########  End    #############
if __name__ == '__main__':
    main()
#! /usr/bin/python3

import sys

def reducer(k, values):
    ############  Begin   ################

    value = sorted(list(set(values)))
    for v in value:
        print("%s\t%s" % (k,v))
    ############   End    ################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    for line in sys.stdin:
        line = line.strip()
        akey, avalue = line.split('\t')
        
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
  
            values.append(avalue)
            current_key = akey
    
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()
        

3.挖掘父子关系

你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下: 孙子在前,祖父在后; 输入文件路径:/user/reduce/input; 输出文件路径:/user/reduce/output。

#! /usr/bin/python3
  
import sys

def mapper(line):
    ###############  Begin    ############
    items = line.split("\\n")
    for item in items:
        child,parent = item.split()
        print("%s\t%s" % (child,"p-"+parent))
        print("%s\t%s" % (parent,"c-"+child))
    ###############   End     #############

def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
            pass
        else:
            mapper(line)

if __name__ == '__main__':
    main()
#! /usr/bin/python3

import sys

def reducer(k, values):
    ##############    Begin    ################ 
    grandc = []
    grandp = []
    for value in values:
        if value[:2] == 'c-':
            grandc.append(value[2:])
        elif value[:2] == 'p-':
            grandp.append(value[2:])
    
    for c in grandc:
        for p in grandp:
            print("%s\t%s" % (c,p))
    ##############   End    #################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    print("grand_child\tgrand_parent")
    for line in sys.stdin:
        line = line.strip()

        try:
            akey, avalue = line.split('\t')
        except:
            continue
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(avalue)
            current_key = akey
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()

4.数据清洗

1.去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据;

2.以字段“上映天数”和“当前日期”为依据,在尾列添加一个“上映日期”(releaseDate)的字段,该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13。例如:若字段“上映天数”的值为“上映2天”,字段“当前日期”为“2020-10-10”,则字段“上映日期”的值为“2020-10-09”)。如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。注意:若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值;

3.对字段“当日综合票房”和字段“当前总票房”的数据值进行处理,以“万”为单位,通过数字形式展示,若原数据中有“万”等表示量级的汉字,需去掉量级汉字(如:“1.5亿”需转换为“15000”,“162.4万”转换为“162.4”)。转换时需注意精度缺失问题(如:使用 double、float 等类型做算术运算时,1.14 亿转换为万时,结果会出现 11399.999999999998 万,可以使用 BigDecimal 类来处理这类精度缺失问题),字段“当日综合票房”和字段“当前总票房”最后保留两位小数;

4.清洗完的数据集设置为 1 个分区后,存储到 /root/files 目录下,分隔方式为 \t。

** 注意:不要改变字段原本的顺序。 **

数据集说明

本数据集是电影票房数据,包含十个字段的信息,数据集的字段含义说明如下:

盛世秧歌 7.06万 <0.1% 107 <0.1% 22 47.3% 1757.1万 展映 2019-10-16
决胜时刻 70.83万 0.6% 561 0.1% 34 21.6% 1.14亿 上映25天 2019-10-14
天池水怪 5.43万 <0.1% 55 <0.1% 30 -- 75.1万 2020-10-06

清洗后数据展示:

决胜时刻 70.83 0.6% 561 0.1% 34 21.6% 11400.0 上映25天 2019-10-14 2019-09-20
天池水怪 5.43 <0.1% 55 <0.1% 30 -- 75.1 2020-10-06 往期电影

注意事项

1.清洗数据后共多少行;

2.字段“上映日期”添加成功;

3.字段“当日综合票房”和字段“当前总票房”的单位处理成功。

启动HDFS:

start-dfs.sh
# dbhelper.py
 
import pymysql
import sys
import codecs
 
class DBHelper:
 
    def get_connection():
        # 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
        ########  Begin   ############
        conn = pymysql.connect(host='localhost',port=3306,\
                        user='root',passwd='123123',\
                        charset='utf8mb4',db='mydb')
        ########  End    ############    
        return conn
 
    @classmethod
    def get_region(cls):
        conn = cls.get_connection()
        regions = dict()
        with conn.cursor() as cur:
            #从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
            ############  Begin ###################
            cur.execute("select CodeNum,Address from allregion")
 
            for s in cur.fetchall():
                regions[s[0]] = s[1]           
            ############  End    #################
        conn.close()
        return regions
 
    @classmethod
    def get_userphones(cls):
        conn = cls.get_connection()
        userphones = dict()
        with conn.cursor() as cur:
        #从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
        ############  Begin ###################
            cur.execute("select phone,trueName from userphone")
 
            for t in cur.fetchall():
                userphones[t[0]] = t[1] 
        ############  End    #################
        conn.close()
        return userphones
 
def main():
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    region = DBHelper.get_region()
    users = DBHelper.get_userphones() 
 
if __name__ == '__main__':
    main()
#! /usr/bin/python3
#
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time
 
# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()
 
def main():
    # 正确输出utf-8编码的汉字
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
 
def mapper(line):
    # 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
    # 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
    ##########  begin      ##############
    items = line.split(',')
 
    caller = userphones.get(items[0])
    reciever = userphones.get(items[1])
    begin_time = int(items[2])
    end_time = int(items[3])
    caller_address = regions.get(items[4])
    reciever_address = regions.get(items[5])
 
    print(caller,reciever,sep=',',end=',')
    print(','.join(items[:2]),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(begin_time)),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(end_time)),end=',')
    print(str(end_time - begin_time),end=',')
    print(caller_address,reciever_address,sep=',')
    ###########  End  #################
 
if __name__ == '__main__':
    main()

总结:

map函数:

def mapper(line):
    words = line.split(' ')
    for word in words:
        if len(word.strip()) == 0:
            continue
        print("%s\t%s" % (word, 1))
if __name__ == '__main__':
    main()

reducer函数:

def reducer(k, values):
    print("%s\t%s" % (k, sum(values)))

4.Spark

1.Spark任务提交

jar包所在位置: /root/project.jar

主类:Student

提交模式:local

#!/bin/bash
 
cp -r  Spark/SparkRDD/target/project.jar /root
cd /opt/spark/dist/bin
#********** Begin **********#
./spark-submit \
--master local \
--class Student \
/root/project.jar
#********** End **********#

计算圆周率:

cd /opt/spark/dist/bin
./spark-submit 
--master local 
--class org.apache.spark.examples.SparkPi
/opt/spark/dist/examples/jars/spark-examples_2.11-2.2.0.jar

./spark-submit

--master 本地模式

--class 程序运行的主类名

xxx.jar

2.sparkRDD

RDD五大特性:

一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

1.集合并行化创建RDD:

SparkContext创建;

sc = SparkContext("local", "Simple App")

说明:"local" 是指让Spark程序本地运行,"Simple App" 是指Spark程序的名称,这个名称可以任意(为了直观明了的查看,最好设置有意义的名称)。

集合并行化创建RDD:

data = [1,2,3,4]
rdd = sc.parallelize(data)

collect算子:在驱动程序中将数据集的所有元素作为数组返回(注意数据集不能过大):

rdd.collect()

停止SparkContext

sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
  sc = SparkContext("local", "Simple RDD App")
 
  # 2.创建一个1到8的列表List
  data = list(range(1, 9))
 
  # 3.通过 SparkContext 并行化创建 rdd
  rdd = sc.parallelize(data)
 
  # 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
  rdd_content = rdd.collect()
 
  # 5.打印 rdd 的内容
  print(rdd_content)
 
  # 6.停止 SparkContext
  sc.stop()
 
 
    #********** End **********#

2.读取外部数据集创建RDD

文本文件RDD可以使用创建SparkContex的textFile方法。此方法需要一个 URI的文件(本地路径的机器上,或一个hdfs://,s3a:// 等 URI),并读取其作为行的集合。这是一个示例调用:

distFile = sc.textFile("data.txt")
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == '__main__':
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
    # 2.读取本地文件,URI为:/root/wordcount.txt
    raw = sc.textFile("/root/wordcount.txt")
    rdd = raw.map(lambda x:x)
    # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    rdd.collect()
    # 4.打印 rdd 的内容
    print(rdd.collect())
    # 5.停止 SparkContext
    sc.stop()
    #********** End **********#

3.Transformation - map

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个1到5的列表List
    List = [1,2,3,4,5]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(List)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
 
    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """
 
    # 5.使用 map 算子完成以上需求
    rdd_map = rdd.map(lambda x:(x*x if (x%2==0) else x*x*x))
    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd_map.collect())
    # 7.停止 SparkContext
    sc.stop()
 
    #********** End **********#

4.Transformation - mapPartitions

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
#********** Begin **********#
def f(iterator):
    list = []
    for x in iterator:
        list.append((x,len(x)))
    return list
#********** End **********#
 
if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    data = ["dog", "salmon", "salmon", "rat", "elephant"]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
 
    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """
 
    # 5.使用 mapPartitions 算子完成以上需求
    partitions = rdd.mapPartitions(f)
    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(partitions.collect())
    # 7.停止 SparkContext
    sc.stop()
 
    #********** End **********#

5.Transformation - filter

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
 
    """
    使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
    需求:
        过滤掉rdd中的奇数
    """
    # 5.使用 filter 算子完成以上需求
    rdd_filter = rdd.filter(lambda x:x%2==0)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(rdd_filter.collect())
    # 7.停止 SparkContext
    sc.stop()
 
    #********** End **********#

6.Transformation - flatMap

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
       #********** Begin **********#
       
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1,2,3],[4,5,6],[7,8,9]]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
    """
        使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
        需求:
            合并RDD的元素,例如:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
        """
    # 5.使用 filter 算子完成以上需求
    flat_map = rdd.flatMap(lambda x:x)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(flat_map.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#

7.Transformation - distinct

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    #********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
 
    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1,2,3,4,5,6,5,4,3,2,1]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
 
    """
       使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素去重,例如:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
       """
    # 5.使用 distinct 算子完成以上需求
    a = rdd.distinct()
 
    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(a.collect())
 
    # 7.停止 SparkContext
    sc.stop()
 
    #********** End **********#

8.Transformation - sortBy

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    # ********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
 
    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data = [1,3,5,7,9,8,6,4,2]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
 
 
    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5.使用 sortBy 算子完成以上需求
    a = rdd.sortBy(lambda x:x)
 
    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(a.collect())
 
    # 7.停止 SparkContext
    sc.stop()
 
    #********** End **********#

9.Transformation - sortByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    # ********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
 
    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data = [("B",1),("A",2),("C",3)]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
 
    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5.使用 sortByKey 算子完成以上需求
    a = rdd.sortByKey()
 
    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(a.collect())
 
    # 7.停止 SparkContext
    sc.stop()
 
    # ********** End **********#

10.Transformation - mapValues

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    # ********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
 
    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1",1),("2",2),("3",3),("4",4),("5",5)]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
 
    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """
    # 5.使用 mapValues 算子完成以上需求
    a = rdd.mapValues(lambda x:x*x if x%2==0 else x*x*x)
 
    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(a.collect())
 
    # 7.停止 SparkContext
    sc.stop()
 
    # ********** End **********#

11.Transformations - reduceByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    # ********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
 
    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
 
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
 
    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5.使用 reduceByKey 算子完成以上需求
    a = rdd.reduceByKey(lambda x,y:x+y)
 
    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    print(a.collect())
 
    # 7.停止 SparkContext
    sc.stop()
 
    # ********** End **********#

12.WordCount - 词频统计

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
 
    """
        需求:对本地文件系统URI为:/root/wordcount.txt 的内容进行词频统计
    """
    # ********** Begin **********#
 
    sc = SparkContext("local","pySpark")
    rdd = sc.textFile("/root/wordcount.txt")
    values = rdd.flatMap(lambda x:str(x).split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:tuple(x)[1],False)
    print(values.collect())
 
    # ********** End **********#

13.Actions - 常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
if __name__ == "__main__":
    # ********** Begin **********#
 
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())
    # 5.统计rdd的元素个数并print输出
    print(rdd.count())
    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())
    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))
    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x,y:x+y))
    # 9.停止 SparkContext
    sc.stop()
 
    # ********** End **********#

14.Friend Recommendation - 好友推荐

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
 
def word_couple(word1, word2):
    if hash(word1) > hash(word2):
        return word1 + '_' + word2
    return word2 + '_' + word1
 
def relations(items):
    result = []
    for i in range(1, len(items)):
        result.append((word_couple(items[0], items[i]), 0))
        for j in range(i+1, len(items)):
            result.append((word_couple(items[i], items[j]), 1))
    return result
 
def fun2(x):
    values = tuple(x[1])
    return ((x[0], 0) if min(values)==0 else (x[0], sum(values)))
 
if __name__ == "__main__":
    """
        需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
    """
    # ********** Begin **********#
    sc = SparkContext("local", "friend recommendation")
    src = sc.textFile("/root/friend.txt").map(lambda x:x.strip().encode('utf-8').split(" "))
    rdd = src.flatMap(relations).reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y).filter(lambda x:x[1]>0)
    print(rdd.collect())
 
    # ********** End **********#

3.Spark SQL

1.创建 SparkSession;

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()

2.读取所给 Json 数据创建 DataFrame;

df =spark.read.json("/jun.json")

3.创建视图;

df.createOrReplaceTempView("table1")

4.编写 sql 语句计算指标;

sqlDF = spark.sql("sql语句")

使用SparkSQL统计战斗机飞行性能

sql 语句参考处理思路:

最大飞行速度字段特征:带有分隔符的数字和中文组成,如 1,438.4 千米每小时

第一步:使用 regexp_extract 通过正则表达式提取中文前的所有数字。

regexp_extract函数:

regexp_extract(str, regexp, [idx])

参数说明:

str是被解析的字符串或字段名;

regexp 是正则表达式;

idx是返回结果,取表达式的哪一部分默认值为1;

0表示把整个正则表达式对应的结果全部返回;

1表示返回正则表达式中第一个()对应的结果,以此类推 。

示例:

select regexp_extract('hitdecisiondlist','(i)(.*?)(e)',0) ;

得到的结果为: itde

select regexp_extract('hitdecisiondlist','(i)(.*?)(e)',1) ;

得到的结果为: i

本例可使用如下表达式提取:

(regexp_extract(最大飞行速度,'[\d,.]+',0)

需要注意的是当把该表达式加入spark.sql("sql语句")时,\d和.需要转义(\)。

第二步:使用 regexp_replace 函数去除千位分隔符,。

regexp_replace 函数:

regexp_replace(str, pattern, replacement)[source]

参数说明:

str:待搜索的字符串表达式。

pattern:匹配内容的正则表达式。

replacement:替换用的字符串表达式。

本例可使用如下表达式将,替换为'':

regexp_replace(regexp_extract(最大飞行速度,'[\\d,\.]+',0),'\,','')

第三步:使用 CAST 函数将字符串转化为 double 用于排序。

本例可使用如下表达式:

cast(replace(regexp_extract(最大飞行速度,'[\\d,\.]+',0),',','') as float)

第四步:使用降序输出前三。

第五步:将处理结果保存到本地目录;

sqlDF.write.format("csv").save("保存路径")

第六步:停止 SparkSession。

spark.stop()

统计出全球飞行速度排名前三的战斗机。 本实训提供一份全球战斗机相关指标参数的 Json 数据(数据在 /root/jun.json)。

统计出指标后将结果以 csv 格式保存到 /root/airspark 目录。

# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL ") \
    .master("local")\
    .getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
out=spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),'\\\,','') as float) as speed,`名称` from table1  order by cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),'\\\,','') as float)  DESC limit 3")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#
spark.stop()

使用SparkSQL统计各个研发单位研制战斗机占比

# coding=utf-8

from pyspark.sql import SparkSession

#**********Begin**********#

#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL ") \
    .master("local")\
    .getOrCreate()
#读取/root/jun.json中数据
df = spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
out=spark.sql("select   concat(round(count(`研发单位`)*100/(select count(`研发单位`) as num from table1 where `研发单位` is not null and `名称`is not null ),2),'%') as ratio, `研发单位` from table1  where  `研发单位` is not null and `名称`is not null group by  `研发单位`")
#保存结果
out.write.mode("overwrite").format("csv").save("/root/airspark")
#**********End**********#

spark.stop()

SparkSQL 读取 CSV

spark = SparkSession.builder.appName("demo").master("local").getOrCreate()
spark.read.option("header", True).option("delimiter", "CSV分隔符").csv("csv路径")

option参数说明:

header为true:将 CSV 第一行数据作为头部信息,换一句来说,就是将 CSV 的第一行数据作为 SparkSQL 表的字段

delimiter:分隔符,例如,CSV 文件默认以英文逗号进行字段分隔,那么 delimiter 为英文逗号,如果文件以分号进行字段分隔,那么 delimiter 为分号

SparkSQL 内置字符串处理函数

正则表达式

正则表达式的限定符

将出租车轨迹数据规整化,清洗掉多余的字符串,并使用 DataFrame.show() 打印输出。

CSV 文件是以 \t 进行字段分隔,文件路径为 /root/data.csv

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ =='__main__':
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    #**********begin**********#
    df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")
    df.createTempView("data")
    spark.sql("""
    select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,
        regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,
        regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,
        regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,
        regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,
        regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,
        regexp_replace(POLYLINE,'\\\W+','') as POLYLINE
    from data
    """).show()
    #**********end**********#
    spark.stop()

SparkSql UDF

UDF对表中的单行进行转换,以便为每行生成单个对应的输出值。例如,大多数 SQL环境提供UPPER函数返回作为输入提供的字符串的大写版本。

用户自定义函数可以在Spark SQL中定义和注册为UDF,并且可以关联别名,这个别名可以在后面的SQL查询中使用。作为一个简单的示例,我们将定义一个 UDF来将以下JSON数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit):

python自定义udf函数:

df = sqlContext.read.json("temperatures.json")
 
df.registerTempTable("citytemps")
# Register the UDF with our SQLContext
 
sqlContext.registerFunction("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

注意,Spark SQL定义了UDF1到UDF22共22个类,UDF最多支持22 个输入参数。上面的例子中使用UDF1来处理我们单个温度值作为输入。如果我们不想修改Apache Spark的源代码,对于需要超过22个输出参数的应用程序我们可以使用数组或结构作为参数来解决这个问题,如果你发现自己用了UDF6 或者更高UDF类你可以考虑这样操作。

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
import json

if __name__ == '__main__' :
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    #**********begin**********#
    df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data2.csv")
    df.createTempView("data")
    spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL, TAXI_ID, ORIGIN_STAND, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show()
    spark.udf.register("timeLen", lambda x: {
        (len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 8
    })
    spark.udf.register("startLocation", lambda x: {
        str(json.loads(x)[0]) if len(json.loads(x)) > 0 else ""
    })
    spark.udf.register( "endLocation", lambda x: {
        str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else ""
    })
    df.createTempView("data2")
    res=spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME, POLYLINE, timeLen(POLYLINE) as TIMELEN, startLocation(POLYLINE) as STARTLOCATION, endLocation(POLYLINE) as ENDLOCATION from data2")
    res.createTempView("data3")
    res.show()
    spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data3 group by TIME,CALL_TYPE order by CALL_TYPE,TIME").show()
    #**********end**********#

5.样题:

命令题.

将/root/data/file2.txt的内容追加到HDFS上/user/2024/input/data.txt的文件尾部,并显示data.txt的内容。

1.将 /root/data/file2.txt 的内容追加到 HDFS 上 /user/2024/input/data.txt 的文件尾部 在命令行中执行以下命令:

hadoop fs -appendToFile /root/data/file2.txt hdfs://your-hdfs-namenode:port/user/2024/input/data.txt

这里的 your-hdfs-namenode:port 需要替换为你实际的 HDFS 名称节点地址和端口号。

2.显示 data.txt 的内容

hadoop fs -cat hdfs://your-hdfs-namenode:port/user/2024/input/data.txt

同样要将 your-hdfs-namenode:port 替换为实际的 HDFS 名称节点相关信息。

程序题:

银行客户数据包括流水号,客户编号,姓名,信用积分,地区,性别,年龄,年限,存贷款,产品数,有本行信用卡,活跃用户,收入,已流失,各数据项之间用半角逗号,分隔。数据示例如下:1,15634602,Hargrave,619,France,Female,42,2,0,1,1,1,101348.88,12,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,017,15737452,Romeo,653,Germany,Male,58,1,132602.88,1,1,0,5097.67,1

请统计各性别客户的数量,结果按照客户数量降序排列。预期输出格式如下:[('Female',60), ('Male',31)]

MapReduce:

from mrjob.job import MRJob

class GenderCountMRJob(MRJob):

    def mapper(self, _, line):
        fields = line.split(',')
        gender = fields[4]
        yield gender, 1

    def reducer(self, gender, counts):
        yield None, (gender, sum(counts))

    def reducer_sort(self, _, gender_count_pairs):
        sorted_pairs = sorted(gender_count_pairs, key=lambda x: x[1], reverse=True)
        for pair in sorted_pairs:
            yield pair

if __name__ == '__main__':
    GenderCountMRJob.run()

Spark RDD:

from pyspark import SparkContext

sc = SparkContext("local", "GenderCountSparkRDD")

data_rdd = sc.textFile("your_data_file.txt")

gender_count_rdd = data_rdd.map(lambda line: line.split(',')).map(lambda fields: (fields[4], 1)) \
                          .reduceByKey(lambda a, b: a + b)

sorted_gender_count_rdd = gender_count_rdd.map(lambda x: (x[1], x[0])).sortByKey(ascending=False) \
                                          .map(lambda x: (x[1], x[0]))

result = sorted_gender_count_rdd.collect()

for row in result:
    print(row)

sc.stop()

Spark SQL:

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = SparkSession.builder.appName("GenderCountSparkSQL").getOrCreate()

data_df = spark.read.csv("your_data_file.txt", header=False, inferSchema=True)
data_df = data_df.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "customer_id") \
                .withColumnRenamed("_c2", "name").withColumnRenamed("_c3", "credit_score") \
                .withColumnRenamed("_c4", "region").withColumnRenamed("_c5", "gender") \
                .withColumnRenamed("_c6", "age").withColumnRenamed("_c7", "years") \
                .withColumnRenamed("_c8", "deposit_loan").withColumnRenamed("_c9", "product_count") \
                .withColumnRenamed("_c10", "has_credit_card").withColumnRenamed("_c11", "active_user") \
                .withColumnRenamed("_c12", "income").withColumnRenamed("_c13", "has_churned")

gender_count_df = data_df.groupBy("gender").agg(count("gender").alias("count"))

sorted_gender_count_df = gender_count_df.orderBy("count", ascending=False)

result = sorted_gender_count_df.collect()

for row in result:
    print((row["gender"], row["count"]))

spark.stop()

在上述代码中,your_data_file.txt 需要替换为实际存储银行客户数据的文件名。

分析题:

供电企业的SCADA系统每隔15分钟自动采集一次用户用电数据,为了减小数据库的存储压力,每个月都要导出上月的数据。营销部门想根据这些数据分析用户的用电习惯。从数据库导出的数据文件随着时间不断增加,需要用合适的方式存储下来。第一种方案是存储到部署在单台服务器上的大型数据库Oracle里,第二种方案是存储到Hadoop集群的分布式文件系统HDFS里。你倾向于选择哪种方案,说说你的理由。

框架分析:

倾向于选择将数据存储到 Hadoop 集群的分布式文件系统 HDFS 里,理由如下:

数据量与可扩展性方面

HDFS 优势:供电企业的用电数据会随着时间不断增加,数据量庞大。HDFS 是为大规模数据存储而设计的分布式文件系统,能够轻松应对海量数据的存储需求。它可以通过在集群中添加更多的节点来线性扩展存储容量,方便适应数据量的持续增长。

Oracle 局限:虽然 Oracle 是大型数据库,但其部署在单台服务器上时,存储容量受限于该服务器的硬件配置。当数据量增长到一定程度,单台服务器可能面临存储空间不足的问题,扩展存储相对复杂且成本较高。

数据处理与分析方面

HDFS 优势:结合 Hadoop 生态系统,在 HDFS 存储数据后,可以方便地利用如 MapReduce、Spark 等分布式计算框架对数据进行高效处理和分析。这些框架能充分发挥集群的计算能力,并行处理大量用电数据以分析用户用电习惯,适合处理大规模数据的场景。 Oracle 局限:在单台服务器上的 Oracle 数据库进行大规模数据处理时,可能会受到服务器性能瓶颈的限制,如 CPU、内存等资源的不足。处理海量用电数据以分析用户用电习惯可能会耗费大量时间,效率相对较低。

成本效益方面

HDFS 优势:Hadoop 集群可以基于相对廉价的商用硬件搭建,通过增加节点来提升存储和计算能力,在大规模数据存储和处理场景下,具有较好的成本效益。

Oracle 局限:Oracle 数据库通常需要购买商业许可证,并且对于单台服务器配置要求较高以满足不断增长的数据存储和处理需求,这可能导致较高的硬件采购和软件许可成本。

综合考虑,对于供电企业不断增长的大量用电数据以及后续要进行的用户用电习惯分析需求,存储到 Hadoop 集群的分布式文件系统 HDFS 里是更合适的方案。

计算步骤分析:

存储到 Oracle 数据库(单台服务器)的计算步骤分析

**数据导入步骤: **

1.首先需要建立与 Oracle 数据库的连接,配置好相应的数据库连接参数,如主机地址、端口号、用户名、密码、数据库实例名等。

2.将从 SCADA 系统导出的每月用电数据文件,按照 Oracle 数据库规定的格式进行整理(可能涉及到数据类型转换、字段映射等操作),确保数据能正确导入数据库表中。

3.使用数据库提供的导入工具(如 SQL*Loader 等)或通过编写 SQL 插入语句,将整理好的数据逐行插入到预先创建好的用于存储用电数据的表中。

数据分析步骤

1.当营销部门要分析用户用电习惯时,需要在 Oracle 数据库中编写复杂的 SQL 查询语句。例如,要分析不同时间段用户的用电量变化,可能需要通过分组查询(GROUP BY)、条件筛选(WHERE)、聚合函数(如 SUM、AVG 等)等来提取相关数据。

2.若要进行更深入的跨时间段、跨用户群体等复杂分析,可能还需要编写存储过程、函数等来实现,这涉及到对数据库编程知识的熟练运用。

3.由于数据存储在单台服务器上的 Oracle 数据库中,在执行这些查询和分析操作时,受限于服务器的硬件资源(如 CPU、内存等)。如果数据量庞大,查询可能会耗费较长时间,甚至可能因为资源不足导致查询失败或性能严重下降。

**存储到 HDFS(Hadoop 集群)的计算步骤分析 **

数据导入步骤:

1.首先要确保 Hadoop 集群正常运行,并且与 SCADA 系统所在的环境能够进行数据传输。 2.将从 SCADA 系统导出的每月用电数据文件,直接上传到 HDFS 的指定目录下。可以使用 Hadoop 提供的命令行工具(如 hadoop fs -put 等)或在相关应用程序中调用 HDFS 的 API 来完成数据上传操作。

数据分析步骤:

1.在 HDFS 存储数据后,可以利用 Hadoop 生态系统中的分布式计算框架进行数据分析。比如使用 MapReduce 或 Spark 等。

以 MapReduce 为例:

1.首先要编写 MapReduce 程序,在 Mapper 阶段,将从 HDFS 读取的用电数据文件中的每一行数据进行解析,提取出与分析用户用电习惯相关的字段(如用户 ID、用电量、用电时间等),并以键值对的形式输出。例如,以用户 ID 为键,用电量和用电时间等信息为值。

2.在 Reducer 阶段,对 Mapper 输出的具有相同键(即同一用户)的键值对进行汇总处理,比如计算该用户在不同时间段的总用电量、平均用电量等,从而分析出用户的用电习惯。

以 Spark 为例:

1.可以先创建 SparkContext 或 SparkSession 来初始化 Spark 环境,然后通过 Spark 的 API 将存储在 HDFS 中的用电数据文件读取为 RDD(弹性分布式数据集)或 DataFrame(数据框)形式。

2.接着利用 Spark 提供的丰富的函数和操作符,如分组(groupBy)、聚合(agg)、窗口函数(window functions)等,对数据进行处理和分析,以得出用户的用电习惯相关的结论。 由于 Hadoop 集群是由多个节点组成的分布式系统,这些分布式计算框架可以充分利用集群的计算资源,并行处理大量的用电数据,大大提高了数据分析的效率,即使面对海量数据,也能相对快速地完成分析任务。

从计算步骤分析来看,存储到 HDFS 并结合分布式计算框架进行数据分析,在处理供电企业大量用电数据以分析用户用电习惯时,具有更好的灵活性、可扩展性和效率,相比之下更适合这种大规模数据处理的场景。

标签: 大数据 学习 笔记

本文转载自: https://blog.csdn.net/weixin_74146322/article/details/143366990
版权归原作者 Jayen H 所有, 如有侵权,请联系我们删除。

“头歌大数据实训(总结)”的评论:

还没有评论