0


大数据自学路线笔记 持续更新中

首先说明 面试 多记原理和流程 生产当中多记API 学习到新的方法或者是机制不懂的可以打开翻译一下中文意思 大多数都是见名知意的 包括自己写代码时 命名是一样

大数据前言

大数据三大特征 数据存储(Apache Hadoop HDFS、Apache HBase、Apache Kudu、云平台)
、数据计算(Apache Hadoop MapReduce 、Apache Spark、Apache Flink)、数据传输(Apache Kafka、Apache Pulsar、Apache Sqoop)
(2008年贡献给Apache软件基金会开源了)Hadoop=大数据 元老级数据栈 细分为三部分 存储的HDFS、计算的MapReduce、YARN资源调度。

路线

大数据路线:java -> linux(os6) -> shell -> hadoop (2.7.2) -> hive(1.3.1) -> kafka -> hbase(1.3.1) -> spark -> flink

hbase和kafka和hive顺序可调换 也可以hive-hbase-kafka

HADOOP三大组件

HDFS

naodenode 主角色 元数据 edits 和 fsimage元数据的备份 datanode负责数据存储 secondaryNamenode辅助处理元数据合并

HDFS基本操作

创建 hadoop fs** -mkdir **文件夹名字 合并
上传hadoop fs **-put **提交文件路径
追加hadoop fs -appendToFile 添加文件 添加入的文件
查看hadoop fs -cat 文件名
下载hadoop fs -get 下载的文件 本地路径
复制hadoop fs -cp
重命名hadoop fs -mv 文件 新文件名
删除 hadoop fs -rm -r 文件名
配置了垃圾桶将会删到垃圾桶里
hdfs 存储原理 block备份

hdfs io流

客户端向namenode请求上传文件 响应可上传文件 请求上传的block 请求返回datanode 返回节点 建立block传输 datanode答应成功 传输数据
客户端向namenode请求下载文件 namenode返回目标文件元数据 读取数据block 传输数据

nn和2n

fsimage edits
datanode工作机制
datanode 启动后向namenode注册 一小时上报所有block信息 心跳每秒一次心跳返回有namenode给该datanode的命令 超10半没有心跳则认为该节点不可用

MapReduce(简称MR)

**mapper reduce driver **

mapreduce 工作机制

maptesk 工作机制

read阶段--------------------------------------------------------------mapper阶段--------------collect收集阶段-----------------------------------溢写阶段--------------------------combine阶段
待处理数据 ----》提交信息job 开启maptask maptask 读数据 返回kv进mapper-----》环形缓冲区 分区排序 区内排序(100m 80%)----》溢写磁盘 分区区内有序=======》merge归并排序

reducetask阶段

copy阶段==========merge阶段sort阶段=====reduce阶段
拷贝阶段=========》分组排序==========》reduce
数据分布不均匀可能在reduce阶段产生数据倾斜

maptask shuffer工作机制

环形缓冲区、溢写分区区内排序、归并排序、分区合并压缩输出 、拷贝、归并排序除了mapper和reduce以外
//序列化 反序列化
//分区 实现partition接口
//排序 实现WritableComparable接口
//output
//map join
mapper 数据的切分
reduce 数据的合并
driver job
//reduce join
map端 存储标记 俩张表
reduce 切分 替换内容
容易出现数据倾斜问题

//map join

增加map端的工作量 减少reduce端的压力 缓存文件 加载数据 设置reducetask数量为0setNumReduceTasks(0) set up读取缓存文件 获取 切割 缓存到集合 关闭流 distributedcachedrive缓存文件
etl 数据清洗 主要是细心
压缩 io多就选择压缩 计算多就不用 mapper前 要可切分的压缩 mapper和reduce之间 选择压缩速度快的 减少shuffer流程 reduce后 调优配置参数修改

YARN

YARN工作机制

mr程序提交哦啊到客户端所在的节点 ---》申请一个Application application 提交资源路径----》提交job运行所需要的资源-----》 resourcemanager 将用户请求初始化成一个task 放在fifo调度队列当中====》nodemanager 创建容器(container)----》nodemanager 查看有几个切片 申请几个maptask---》mrappmaster发送脚本-------》向mr再申请俩个容器 运行reducetask程序

资源调度器

**fifo **
队列 先进先出
**公平调度器 **(资源大可选择公平调度器)
缺额排序 缺额最大的优先运行
多队列
公平贡献所有资源
capacity(容量 ,默认)(并发但并发度没公平高)
每个队列采用fifo调度策略
对同一用户提交的作业进行资源限制
选择一个比值最小的队列
其次考虑提交时间 用户限制内存 对内任务排序(可调优)

任务推测执行

作业完成时间为最慢任务完成时间
条件:每个task只有一个备份任务 job完成task 必须不少于5%
打开参数mapred-site.xml默认打开的
不能开启: 有严重数据倾斜,特殊任务 比如向数据库写数据
estimatedRuntime (60)=(currentTimestamp-taskStartTime)(6)/progress(10%)(0)
estimatedEndtime(60) =estimatedRunTime(60) + taskSartime(0)
estimatedEndtime (16)=currentTimestamp(6)+averageRuntime(10) (1)MR总是选择estimatedEndtime -estimatedEndtime 差值最大的任务,并之后为其启动备份任务
(2)防止资源浪费,设置备份任务上限
(3)空间换时间 集群资源紧缺的情况下合理使用该机制

Hadoop企业优化

MR跑的慢的原因

1、计算机性能:

CPU、内存、磁盘、网络
2、I/O优化:

数据倾斜、map和reduce数设置不合理、map运行时间过长reduce等待太久、小文件过多、不可切割超大文件、spill(溢写次数过多)

mapreduce优化方法

数据输入(合并小文件、采用combineTextInputFormat作为输入解决大量小文件问题)、

map阶段(减少溢写的次数、调节环形缓冲区的大小op.sort.mb增大内存百分比80%,减少merge合并次数、在map之后不影响业务的情况下采用combine处理 减少IO)、

reduce阶段(合理设置map和reduce数量、设置map和redcue共存、规避使用reduce、合理设置reduce buffer缓冲区)、

IO传输(采用压缩方式 snappy和LZO压缩编码器、使用SequenceFile二进制文件)、

数据倾斜(抽样和范围分区 抽样结果集来预设分区边界、自定义分区、combine(聚合并精简数据)、采用mapjoin 尽量避免reduce join)、

调优参数(map、reduce 、yran的一些内存、cpu参数、数量)

小文件解决方法:

Hadoop Archive(放入缓存打包小文件,减少namenode使用)、Sequence File (K,V合并小文件)、CombinFileInputFormat(多个文件合并成一个单独的Split,考虑存储的位置)、开启JVM重用会减少45的运行时间(线程池)

HIVE

create database databasename;
use databasenames;
create tables name(name string , number int , gender varchar);row format ...
(没有)external 都是内部表 翻译 外部的 没有指明外部的都默认是内部表
外部表更加安全 只删除数据 不删除元数据
员工和外包人员的区别
全部用hive操作时用内部表 和其他交互时用外部表

以什么分割:row format delimited fields terminated by '\t' 默认\001
还有location 必须指定 只能指定 文件夹

数据导入

load data inpath ''into table tablename;
load data local inpath ' ' into table tablename;

导出

insert overwrite 【local】 directory 'path' select ....;
bin/hive -e sql语句 >(重定向)
bin/hive -f sql脚本 > (重定向)

修改表

alter table

复杂函数

array <type>collection item terminated by '' 下标从0开始
map<key,value> collection items terminated by '' 每个键值对之间的分隔符 map keys terminated by '' key和values之间的分隔符
struct struct<name:string ,age:int> collection terminated by ''

操作

查询 基本语法:

【select
from 从什么表
where 过滤
group by 分组
having 处理后的数据过滤
order by 排序
limit 输出行数 】
分组 group by
join 是俩张表的横向拼接 再查询
union 是表的竖向拼接 要注意俩张表的列字段个数和字段类型对应
抽样 分桶抽 tablesample(bucket 3 into 10 on rowname) 十份 三个桶 hash求余 抽 tablesample(bucket 3 into 10 on rand()) 完全随机 tablesample(1 percent) 百分比抽 tablesample(1k)大小抽

开窗函数(也称为窗口函数)

一个一个窗子 开多少 就多少个窗口的范围进行计算 基于值每行数据的窗口为上一行到当前行 基于值的是 窗口为 【值-1,当前行】
基于行 聚合函数 over( 聚合函数 row between xx preceding and xxx ) unbound无限 cuttent 当前 fllowing向下 值 聚合函数 over(聚合函数 between xxxx and xxxx);
分区 over (partition by xxx order by xxx row between xxxxx and xxxxx); 一般partition by 和order by不会省略
常用:last frist lag_....... 、隔一行、rank函数

分区

partition by() 可指定多个分区
动态分区:(1)动态分区功能总开关(默认true,开启)set hive.exec.dynamic.partition=true insert set hive.exec.dynamic.partition.mode = nonstrict; 最多一千个分区 map reduce100 一条100000当查询结果为空时且进行动态分区时,是否抛出异常,默认false。
hive.error.on.empty.partition=false

分桶

clustered by () into num buckets 文件分块 分成几个文件

文件格式

txt ORC 头 orc 中间 索引 stri 块 尾部 读物尾部信息找 默认十万一个索引 parquest跟orc差不多

压缩

hive对数据进行压缩 orc建表时声明就行 计算过程中 压缩格式 snappy速度快 不支持切分 他本身就是块 可以用一个map处理

设置参数 set mapper reduce中间 计算过程中 压缩格式 snappy速度快 不支持切分 他本身就是块 可以用一个map处理

调优

Yarn节点分配给container 使用的内存、CPU、单个最大、最小的内存 看条件调节
MR maptask内存、cpu大小 reduce 申请的contain内存大小和cpu

执行计划Explain

mapper tree 和 reduce tree
explain format执行计划 注意看explain 执行计划

分组聚合优化

map-side聚合

总开关 判断是否适合聚合 set hive.map.aggr = true;
set hive.map.aggr.hash.min.reduction=1;

join优化

common join算法 map join 大表join小表 没有reduce 小表本地去跑读取小表数据 上传到缓存(可压缩)

**bucket map join **打破了map join只能大表join小表的限制 可大表join小标 不过要参与的表均为分桶表,关联字段为分桶字段且其中一张表为另一张表的几倍才可以

SMB map join 和bucket map join 的要求差不多 不同的是 sort merge join算法
map join 优化 hint提示 自动 或者手动触发

不过 一般都是通过全局配置参数 site配置文件 个别单独一个select调整

数据倾斜

某个key数据远超其他大量相同的key发往同一个reduce导致reduce所需时间远超其他reduce
分组导致的数据倾斜

**由于group by 字段分布不均导致倾斜进入reduce **

解决:
1、Map-Side聚合 启用map-side聚合
set hive.map.aggr=true;
set hive.map.aggr.hash.min.reduction=0.5;
set hive.groupby.mapaggr.checkinterval=100000;
2、skew-GroupBy 优化 hive自带的 启用优化即可
set hive.groupby.skewindata=true;

由join导致的数据倾斜 关联字段导致数据倾斜

解决:
1、map join
--启动Map Join自动转换
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=250000;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;

2、**skew join **专门解决大表和大表 倾斜的
--启用skew join优化
set hive.optimize.skewjoin=true;
--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;

MR任务并行度

map端的并行度 是由切片决定的 基本上不调 除非小文件 CombineHiveInputFormat,资源充足的情况下考虑增大map的并行度

set hive.auto.convert.join=true;

reduce并行度

set mapreduce.job.reduces=1;

小文件优化
set hive.merge.mapredfiles=true;

其他优化:

CBO优化

CBO在hive的MR引擎下主要用于join的优化,例如多表join的join顺序。

--开启cbo优化
set hive.cbo.enable=true;
--为了测试效果更加直观,关闭map join自动转换

谓词下推

谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。
--是否启动谓词下推(predicate pushdown)优化

set hive.optimize.ppd = true;
矢量化查询

Hive的矢量化查询,可以极大的提高一些典型查询场景(例如scans, filters, aggregates, and joins)下的CPU使用效率。
set hive.vectorized.execution.enabled=true;

** Fetch抓取**

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。

--是否在特定场景转换为fetch 任务
--设置为none表示不转换
--设置为minimal表示支持select *,分区字段过滤,Limit等
--设置为more表示支持select 任意字段,包括函数,过滤,和limit等
set hive.fetch.task.conversion=more;

** 本地模式**

Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
--开启自动转换为本地模式

set hive.exec.mode.local.auto=true;
--设置local MapReduce的最大输入数据量,当输入数据量小于这个值时采用local MapReduce的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
--设置local MapReduce的最大输入文件个数,当输入文件个数小于这个值时采用local MapReduce的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;

并行执行

有些Stage是可以并行执行的

--启用并行执行优化
set hive.exec.parallel=true;
--同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=8;

严格模式

分区表不使用分区过滤
使用order by没有limit过滤
笛卡尔积

Zookeeper

观察者 监控和链接
main() connect listen
选举机制 fillder lowder
暂时 永久
zdone

Hbase

noSQL 海量
row key 行键 column 列 列族(可多个) 行切块

DDL(表操作)

create '名字','列族' 必须要有列族 创建 list 查看 alter 修改表属性{} drop表 先要让其下线disable table_name
命名空间 相当于数据库 但是只能用create 不能use create_namespace '命名空间名字:表名' ,'列族' drop和数据库一样 不过还得先下线表 disable '命名空间名字 :table_name' drop '命名空间:表名' 再drop_namesapce '命名空间'

DML(数据操作)

put '命名空间' 'rowkey' '列族:列名' '值'

scan扫描全表

get获取 表里的数据

put 相同 就值不同的话就是覆盖
多版本 alter 'table_name',{NAME=>'列族名',VERSIONS=>num} 注意 name和versions 必须大写 如果需要版本的数量大于写入数量 则展示写入数量 否则多的覆盖展示版本数量 {COLUMN=>'列族 ,列名',VERSIONS=>3}

详细架构

首先是依赖于hdfs DataNode Hbase 依赖于 Zookeeper 所以 hbse和hdfs要先起, HMaster 与 Zookeeper交互 给他分担 客户端与表中间的操作, HReginServer 维护Regin 。。。切分的列块,HReginServer里面包含HLog预写入日志 (防止内存当中数据丢失的)
zk是用于客户端与 HRgion 列族在物理上称为Store 存储在Mem Store 、 Store File (HFile格式) 都要存储在HDFS上 与hdfs

写流程

(HBase 读比写慢的框架)

客户端请求meta表(元数据表)所在的RegionServer(在哪个机器存的) 返回 mate所在的地址hadoop102 请求meta表 返回 meta,获取RS 客户端缓存下来 发送put请求 先写入到预写入日志WAL/HLog 再写到memstore/内存 向客户端发送ack存储完成 hbase Flush流程 内存根据条件刷写在HDFS

客户端请求meta所在的regionServer(在哪个机器存的) 返回 mate所在的地址 hadoop102 请求meta表 返回 meta,获取RS 客户端缓存下来 发送get请求 同时读内存和磁盘数据 把内存磁盘的数据放在black cache 再合并向客户端返回时间戳大的

Compact

Minor Compact 小 合并 几个小的合并
Major Compact 全局 所有文件合并成一个 正真删除 drop标记不会立即删除 只是操作内存 合并之后删除 最少4个合并
数据的真正删除

split 切分

128M 切分。。。。。直到10G regin (预分区 均匀放 他的逻辑切分会发生数据倾斜 尽量不使用多列族 产生小文件 控制数据多个列族的增长速度就行 )

idear 增删改查 和HBASE 略

Hbase 和Hvie的区别

1、Hive是一个分析框架 (Hive数据仓库)方便使用HQL去管理和查询

2、 用于数据分析和清洗

3、引擎默认是**基于HDFS\MR **引擎有多个 还有spark

4、Hive存储在DataNode上 编写的HSQL 语句终将转化MapRedcue代码执行

**HBase **
1、Hbase 面向列族存储的非关系型数据库

2、用于存储结构化和非结构化的数据

3、基于HDFS数据 结构化存储HFile,存放在DataNode中 ,以region形式管理

4、延迟低接入在线业务使用 Hbase可以直线单线存储
Hbase 数据库 同时提高高效的速度访问

关联hive和hbase hive来操作hbase

变量设置和jar包即可 注意不能load进Hive所关联的那张表 注意 hbase中表已经存在的话 hive关联只能创建外部表 external 外部表 (例如有数据存在 突然有一天要进行数据分析 hive用于数据分析的 hbase 类似与数据库 会报错 内部表删除 元数据也会被删除 数据改变 hive也可以查到

Hbase优化 (rowkey设计五颗星重要!!!!!)

高可用 (hbase自带)HRegionServer 抢资源

Kafka

是一个分布式的基于发布/订阅模式的消息队列

(好处:解藕 、可恢复性、缓冲(生产大于消费)、灵活|峰值处理(动态 增加减少服务器)、异步通信) 消息中间件 例如班长 班主任
消息队列 点对点 :一对一

发布订阅模式

一对多

1、消费者主动拉取 (资源浪费 要维护一个一直问有没有)

2、 订阅形式 队列推给消费者 kafka基于拉取

kafka架构

**生产者 **

kafka集群(Broker 有分区和副本 Partition | Leader、follower)

zookeeper

**消费者 **

操作

创建
topics.sh kafka的增删改查的主题都用它
bin 下面的kafka-topics.sh脚本 --list命令 基于zookeeper 要连的机器 主题名字 分区数 副本数
bin/kafka-topics.sh --list --zookeeper hadoop103:2181
bin/kafka-topics.sh --create --zookeeper hadoop103:2181 --topic first --partitions 2 --replication-factor2
bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic frist
bin/kafka-topics.sh --describe -zookeeper hadoop103:2181 --topic frist
console-producer.sh 链接的主题 直接链kafka 链接机器
bin/kafka-console-producer.sh --topic first --broker-list hadoop103:9092
回车之后是一个箭头 就表示 可以发消息了 >
在另外一台机器上开启消费者
bin/kafka-console-consumer.sh --topic first --zookeeper hadoop103:2181 --from-beginning
就可以收到103发的消息了
加上参数就可以读到103生产者发送的一开始消息离线也可以接收

kafka存储

.index 快速定位 二分查找 分片和索引机制 1g一个索引 偏移量
.log
生产者 分区策略
好处:扩展 、高并发

kafka生产者

生产者分区的原则:

producer

将producer 封装成一个ProducerRecord对象

数据可靠性保证

topic的每个分区收到producer发送的数据后需要向producer发送ack收到曲儿 否则会重新发送数据
kafka副本同步策略是:区别完成同步才发送ack
ISR 为了完成ack 如果follower长时间没有向leader同步数据则该follower将被t出ISRLeader发生故障之后重新选举新的Leader 选择是时间的 time.max.ms

ack机制

ack=0 时:producer 不等待borker的ack 当borker故障时容易数据丢失
ack=1时:producer 等待borker的ack partition的leader落盘成功之后返回ack 如果在follower同步完成之前leader故障 那么将会丢失数据
ack =-1producer 等待 borker的ack等待patition的leader和ISR里面的follower区别落盘成功完成之后才返回ack。如果在follower同步完成后 broker发送ack之前leader发送故障 那么会造成数据重复 同步完成 leader挂掉 没有发送ack 又选举一个新的leader producer又发了一次

数据一致性问题

Log文件当中HW和LEO
LEO:每个副本最后一个offset
HW:所有副本重最小的LEO
leader
HW LEO
0 1 2 3 4 5 6 7 8 9 10 12 13 14
HW LEO
0 1 2 3 4 5 6 7 8
HW LEO
0 1 2 3 4 5 6 7 8 9 10
当 follower的LEO追上leader之后 可以重新加入ISR 相当于复活。
leader 故障 为了保证数据的一致性 follower高于hw的部分截掉 相当于是统一长度
保证的是数据的一直性 保证用户消费的一致性 并不能保证数据丢不丢

Exactly Once 语义

当ack设置为-1时候 数据不丢失 即At Least Once ack设置为0生产者每条消息只能被发送一次 不重复 At Most Once
At Least Once + 幂等性 = Exactly Once 既不重复也不丢失(设置即可)

kafka消费者consumer

采用pull拉的模式从broker读取数据
push推的方式很难适应消费者 速率不同的消费者 发送速率是由broker决定的 pull的不同之处是如果kafka中没有数据那么消费者可能会陷入循环中

消费者分区策略:

consumer 可以订阅多个主题订阅多个分区 消费者个数变化会触发重新分配
1、roundrobin 轮询机制 订阅相同的主题可以考虑这个
2、range 范围机制 不同主题用range
offset(补偿,抵销,抵消)维护
consumer会出现宕机 恢复之后要回到故障之前的位置继续消费 offset 保存在kafka本地和、zookeeper

offset

消费者组、主题、分区
不同组的消费者可以消费同一个分区
集群的写任意一个名字都行 那个hadoop103

高效读写(了解即可)

1、顺序写磁盘
2、零复制技术
3、分布式分区 并行

Zookeeper在Kafka中的作用

管理broker上下线
分区副本的分配 leader的选举 (选举规则抢资源controller)
分区分配策略
range以主题划分 先找到谁订阅了这个主题 要是消费者只有一个 那就得把主题里的区别分区内容给他 要是一个组内超过一个 轮询分

Kafka当中的事务

producer事务 引入一个组件保证生产者的数据精准写道开发集群里
consumer事务 offset保证消费者精准消费一次

API生产者流程

生产者api 异步发送 俩个线程 main和sender 一个线程共享变量RecordAccumulator
producer
main线程 producer sand()方法 Interceptors拦截器 serializer序列化 partitioner 分区器
RecordAccumulator(共享变量)

sander线程

总结

**生产者 kafka 消费者 **
首先broker里面有主题 分区 主题是虚拟概念 分区是物理 分区为消费生产提高并行度 防止topic挂掉 有副本 leader(生产消费) following(备份)之分 当leader 挂掉之后 ISR选举新的leader HW(保证消费者消费一致性 存储数据的一致性 不能解决重复不重复的问题) LEO 防止挂掉数据重复和丢失问题 生产者当中ack机制
**生产者 **
分区规则 指定分区就进指定分区 没有指定就是hash分区 什么都没指定 轮询(论着来)ack保证数据丢不丢失 0 表示不等待返回值 1leader能接收到 -1 isr当中所有副本 流程异步发送 俩个线程 main和sender 一个线程共享变量RecordAccumulator Interceptors拦截器 serializer序列化 partitioner 分区器
消费者
消费者组 消费者组里面的不同消费者不能消费同一个分区 offset 按照 组 主题 分区 分区分配策略 range 以主题划分 和 roundrobin 轮询

spark

基础介绍

spark分布式计算引擎 基于MR
分布式:单机 单进程 单节点 ; 伪分布式 多进程 单节点 分布式 多进程 多节点
分布式计算的核心是切分数据 减少数据规模
spark也是集群中心化 管理者很少做事 都是手下做事 没有备份 干活的叫Executor执行者 Driver管理者;驱动器
离线数据分析 部分实时数据 大数据运算优化 spark 三大spark core spark sql spark streaming
系统:完整的计算机程序 HDFS KAFKA 框架:不完整的计算机程序MR 核心的功能逻辑已经弄完但是业务相关的代码未开发Kafka
引擎:spark核心功能
利用spark提供的环境/入口将业务逻辑与spark 核心Core链接在一起 spark调度资源完成计算操作(过程:写好业务逻辑配置好环境提交给spark 完成数据计算流程)

saprk是基于mr开发的
mr java 不适于进行大量的数据处理
saprk scala 适合大量数据处理

处理方式

hadoo只考虑单一的计算操作结果放在file 当中
spark计算结果在内存当中

部署

YARN+SPARK|FINAK SPARK+SPARK
spark+yarn配置

Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster,
**俩种运行模式的区别 dirver的位置 **

Spark Core (核心)

RDD

做弹性分布式数据集 (分布式计算模型
1、一定是一个对象
2、一定是封装了大量的方法和属性(计算逻辑)
3、一定需要适合进行分布式处理(减少数据规模、并行计算 分区的概念 数据减少 分区的概念
数据结构:采用特殊的组织结构和管理数据

数据模型:User 对象 、Student对象 Teacher 对象 、、、、、
array_list、HashMap ListArray、LinkedList、String = char[]

中心化集群基础架构:主从Master-Slave
RDD模型可以封装数据的处理逻辑,但是逻辑不能太复杂
RDD的功能调用不会马上执行
多个RDD的功能是如何组合在一起的:IO输入输出流

map

map映射 对单值数据进行处理
将指定的值转为其他的值的场景
数据处理过程中不改变原始数据
说白了 new 一个新的RDD 不改变原来数据
调用封装好的map方法
map 方法就是将传入的a转为B 但是没限制a和b的关系。 a和b可以没有关系
什么样的数据类型 流到另外一个管道的数据类型
java1.8来源于scala 由马丁引入 ,代码能省就省 简化 如果@FunctionalInterface 就可以用函数编程 简化
return ; {}代码只有一行可 ()参数只有一个 可以省略 参数和箭头 参数在逻辑中只使用了一次
RDD的默认分区和上一个一样 分区编号也不变
kafka 当中类似于队列分区 分区内有序 分区间无序
线程不同的时候顺序不一定 你做你的 我做我的
RDD采用 数据处理完了再执行下一个
RDD不会保存数据
RDD是一个数据模型 : 一定是一个对象 可以new出来
spark在数据处理中,要求同一个组的数据必须在同一个分区中
打乱数据分区重新组合 被称为shuffle 一个分区可以存放多个组 分完区之后 再下一步
当前group by 会将数据保存在磁盘文件中 保证数据全部分完组之后进行后续操作 shuffle操作一定会落盘 在磁盘中等待 spark不保存数据资源浪费 shuffle操作的方法都有改变分区的能力

distinct

:分布式去重 采用了分组 和shuffle的处理方式 具备分区的能力 可以传参
按照指定的规则进行排序
对每个数据进行标记进行排序
可以传三个参数 排序规则,升序降序,分区
俩个毫无关系的kv 2元组
spark RDD对整体的处理 单值处理
个体 称为 kv
单值数据可以转为kv

groupBy

按照规定的数组进行分组
给一个标记
groupBy的结果 (0,【1,2】)
所以 结果就是kv类型的数据
将分组和聚合简化

按照相同的key来分组聚合 调用Integer 相加方法 计算的基本思想 :俩俩计算
计算过程中真正影响性能的是shuffle
增加磁盘读写的缓冲区
如果不影响最终结果的化 磁盘读写的数据越少性能越高

groupByKey

:按照k对v进行分组

reduceByKey

按照k对v进行俩俩聚合

sortByKey

按照k排序 自定义分区数
方法返回值为整形
大于o的整型 那么当前对象比其他对象大
小于o的整型 那么当前对象比其他对象小
一般 return this.age - o.age;

combine

:预聚合,reduceByKey可以在shuffle之前 预聚合 减少落盘数据量,所以性能能极大提高
需要分组聚合的功能 需要优先考虑reduceByKey

sortByKey

只支持key排序 可以套用其他方法排序 mapToPair

coalesce

改变分区 合并分区 缩减分区 没有shuffle 数据不会被改变 扩大分区没法改变 分区没有意义可以传参 true 打开shuffle扩大分区

repartition
重新分区

rdd.repartition(3); 就是coalsece 传参true 缩小coalsesce 扩大用repartition

RDD算子

判断是行动算子 还是转换算子 看返回值 .var

阀门 打开水才可以流到 管子里面
将旧的RDD通过方法转换成新的RDD 装修者模式
Spark分布式计算中 组合功能是在Driver端完成的 功能的执行是在Executor

RDD案例 (思想 代码略)

首先花时间进行数据需求分析 ---》然后开发原则(数据清洗)----》代码实现

RDD依赖

Spark中相邻的俩个RDD之间存在的依赖关系 连续的依赖关系被称为血缘关系
Spark中的每一个RDD都保存了血缘和依赖关系
本质上并不是RDD对象的关系,说的是RDD对象中分区数据的关系
窄依赖 如果计算上游的RDD的一个分区的数据被下游RDD的一个分区所独享
宽依赖 如果计算上游的RDD的一个分区的数据被下游RDD的多个分区所独享
依赖关系和任务数量,阶段数量

作业 job

行动算子执行时,会触发作业的执行(activejob)
任务(task):每个Executor执行的计算单元 task数量就是每个阶段分区之和
阶段(stage):一个RDD的计算流程默认就是一个完整阶段 但如果计算流程中有shuffle,那么流程就会一分为二,分开的每一段就被称为stage阶段 前一个阶段不执行完 后一个阶段不允许执行
阶段的数量和shuffle依赖的数量有关:1+shuffle依赖数量
任务分区的数据应该设定为多少?简单可以理解为就是资源核数,一旦推荐分区数量为资源核数的2-3倍

持久化

也相当于是优化
因为 spark不保存数据 不能回流 只能根据依赖 从头开始rdd
RDD流程 流过了就没了 数据是不会保存的
所以要用缓冲区/持久化 cache将数据保留到pairrdd那会 cache是保存在内存中 直接进行下一步groupBy
file放在磁盘中
stringIntegerJavaPairRDD1.cache(); stringIntegerJavaPairRDD1.persist(StorageLevel.MEMORY_ONLY()); 是相等的
stringIntegerJavaPairRDD1.persist(StorageLevel.MEMORY_ONLY());// 更加安全 and disk 内存不够就放磁盘 ser_2 几份
俩个进程 内存文件也不能共享 spark持久化 只对当前的应用程序有效

检查点

stringIntegerJavaPairRDD.checkpoint(); // **要设置检查点 **推荐HDFS
为了数据的安全 从头再跑一遍 所以性能比较低 保证数据
为了提高效率 spark推荐在检查点之前 执行cache方法 将数据保存 联合使用
cache 方法会在血缘关系中增加依赖关系。
stringIntegerJavaPairRDD.checkpoint 会切断血缘
所有的shuffle效率是非常低的 所以spark为了提升shuffle的算子的性能,每个shuffle算子都是含有缓存
RDD executor端没法拉取dirver端的数据 除非使用拉取的方法 例如 collect foreach循环是在Executor端 执行的 那么结果没法拉回到diver端

广播变量

数据拉取的单位是task 如果数据不是以task为传输单位而是以executor单位传输 那么性能会提高
RDD不能以executor单位传输
默认数据传输以task为单位 如果想要Executor为单位 要引用特殊的数据模型 broadcast
数据格式:JSON
1\每一行就是一个JSON格式的数据,而且表示一个对象,对象必须包含在{}当中
2、对象中的属性必须使用,号隔开
3、每个属性和属性名之间用冒号隔开
TODD RDD功能和方法,数据源的数据,格式不是我们学习的重点,使用非常的繁琐
所以在特殊场景当中,spark对数据处理进行了封装所以要学sparkSQL SparkSQL就是对Spark

RDD的封装

结构介绍

SparkSQL

SparkSQL介绍

Spark Core:核心模块
数据环境:JavaSparkContext
数据模型:JavaRDD
Spark SQL:结构化数据处理模块
SQL:为了数据库数据访问开发语言
Spark封装模块的目的就是在结构化数据的场景,处理起来方便
结构化数据:特殊结构数据=》(table,json)
半结构化数据 xml,html
非结构化数据 压缩文件、图形文件,视频音频

MR运行慢-> Spark

MR开发慢 ->Hive -> SQL

Spark | SQL

selecl from where group join

SQT (join) -> Hive -> MR (xxxx)
SQL (join) -> XXXXX -> RDD (yyyyy)

Spark | Hive => Shark => Spark On Hive => SparkSQL => Spark -> SQL -> RDD
-> llive On Spark > Hive-> SQL ->RDD(数据仓库)

spark SQL环境配置

spark SQL中对数据模型也进行了封装:RDD->Dataset 数据集 SQL返回的resultSet 返回数据集
接收文件数据源时。会将文件中的一行数据封装为Row对象
数据模型转换成表

使用SQL文的方式操作数据
Table or view not found: user; 没有找到user这个表 所以要将数据模型转换试图表

数据类型转换为二维的结构(行,列),可以通过SQL文进行访问
视图:是表的查询结果集,表可以增加,修改,删除,查询。
视图不能增加,不能修改,只能查询
当前版本的JDK不适合开发SQL

Spark SQL环境转换

TODD 环境之间的转变
Core: SparkContext->SparkSession
new JavaSparkContext(sparksql);
SQL:SparkSession -> SparkContext
SparkContext sparkContext = sparkSQL.sparkContext();
sparkContext.parallelize()
SQL:SparkSession ->Core: JavaSparkContext
SparkContext sparkContext = sparkSQL.sparkContext();
JavaSparkContext jsc= new JavaSparkContext(sparkContext);

UDAF AND UDF

UDAF是所有的数据产生一个数据集 需要一个缓冲区 用于临时存放数据
TODD UDAF 可以转换为UDF使用
udaf需要传入俩个参数 第一个参数表示UDAF对象
第二个参数表示UDAF 输入的类型
String sql = "select 'Name:'+ name from user";
String sql = "select concat('Name:',name) from user";
String sql = "select 'Name:'||name from user";
不同sql语言之间 不太一样
Spark =》Spark On Hive =》SparkSQL =》Spark parse SQL
》Hive On Spark =》数据仓库=》Hive parse SQL
TODD SparkSQL提供了一种特殊的方式 可以在SQL当中增加自定义方法来实现复杂的逻辑
UDF函数是每一行数据都会调用一次函数
register 方法传递三个参数
第一个参数表示SQL中的方法名
第二个参数表示逻辑:IN =>OUT
第三个参数表示返回的数据类型 ;DataType类型数据 ,需要使用Scala语法操作 需要特殊的使用方式

CSV文件

csv 以逗号隔开的数据格式
sparkSQL.read().csv("data/user.csv").show();
header 设置表头

JSON文件

TODD 读取什么数据格式就传什么格式的文件
JSON :JavaScript Object Notation
对象:{}
数组:【】
json文件整个数据格式符合json格式 而不是一行
列式存储 : 统计快 查询慢
行式存储: 查询快 统计慢
TODD SparkSQL其实就是Spark Core RDD的封装。RDD文件读取文件采用的Hadoop 按行读取
SparkSQL只需要保证JSON文件中一行数据符合JSON格式即可,无需整个文件符合

MySQL交互

     Dataset<Row> jdbc = sparkSQL.read().jdbc("jdbc:mysql://hadoop103:3306/test?useSSL=false", "spark_test", properties);
     jdbc.show();
     jdbc.write()
             .jdbc("jdbc:mysql://hadoop103:3306/test?useSSL=false", "spark_test1", properties);
     jdbc.show();  
Hive交互

System.setProperty("HADOOP_USER_NAME","root");
SparkSession sparkSQL = SparkSession
.builder()
.enableHiveSupport()// 添加hive支持
.master("local[*]")
.appName("sparkSQL")
.getOrCreate();
sparkSQL.sql("show tables").show();
sparkSQL.sql("create table user_info3(name String,age bigint)");
sparkSQL.sql("insert into table user_info3 values('zhangsan',10)");
sparkSQL.sql("select * from user_info3").show();

Spark Streaming

在特定场合下,Spark Core(RDD)的封装
Streaming:Steam + ming 翻译:流 +动作进行时 流处理
数据源源不断的流转过来
Spark Streaming实际上就是将无界的数据流切分成有界的数据流 方便运算
有界数据流 有界数据流才可以做计算
无界数据量 不可以做计算
SprkStreaming底层还是SparkCore,就是在流式数据处理中进行的封装
从数据处理方式的角度
流式数据处理:一个数据一个的处理
微批量数据处理:一小批一小批的数据处理
批量数据处理:一批数据一批数据的处理
从数据处理延迟的角度
实时数据处理:数据处理的延迟以毫秒为单位
离线数据处理:数据处理的延迟以小时,天为单位
Spark是批量数据处理,离线数据处理框架
SparkStreaming其实就是对Spark的封装
SparkStreaming是微批量,准实时数据处理
SparkStreamimg使用离散化流 (不连续的数据
Spark Core:RDD的方法被称为算子
Spark Streaming:DStream 的离散化流
生产环境中:窗口操作主要应用于这样子的需求,最近n时间,每个m时间数据变化
例如:最近一个小时每十分钟气温变化趋势

Function

DStream 确实就是对RDD的封装,但是不是所有的方法都进行了封装。有一些方法不能使用 :sortBy,sortByKey 如果特定场合下,或需要使用这些方法,那么就要将DStream转为RDD使用 自己感觉没有就转为RDD 使用即可

离散化流

DStream 确实就是对RDD的封装,但是不是所有的方法都进行了封装。有一些方法不能使用 :sortBy,sortByKey 如果特定场合下,或需要使用这些方法,那么就要将DStream转为RDD使用 自己感觉没有就转为RDD 使用即可离散化流 类似于行动算子 调用就执行 必须要有输出 mysql 控制台 。。。。 否则报错

窗口

开窗函数 范围 差不多的概念 window 方法可以改变窗口的数据范围(默认数据采集周期) window 可以传递俩个参数 第一个参数 表示窗口的数据范围(时间 第二个参数 表示窗口的移动幅度,可以不用传递,默认就是采集周期 注意 ; 数据窗口范围扩大,但是窗口移动幅度不变,数据有可能会有重复 范围和窗口 一致 数据不会有重复 数据窗口的范围幅度要小 可能会丢失数据

关闭

MySql JDBC : Table->close //Redis: Map ->KV //ZK:ZNode //HDFS:path

关闭SparkStreaming的时候需要在程序运行过程中通过外部操作进行关闭

javaStreamingContext.close();//强制

javaStreamingContext.stop(true,true);//优雅

内存

堆外内存:JVM管理不了的内存,称为堆外内存(Unsafe)
JVM管理的内存 堆内内存

Flink

介绍

是一个框架和分布式处理引擎,用于对有界和无界的数据流进行状态计算| 离线数据是有界的流;实时数据是无界的流
低延迟、高吞吐、结果具有良好的兼容性
电商、市场营销、物联网、电信基站等均需要处理流数据
分层API SQL->DataStream-》Function
和Spark做对比
数据模型:Spark采用RDD模型,SparkStreaming实际上也就是一组组的小批数据处理RDD的集合
Flink基本数据模型是数据流,以及事件Event序列
Spark是批计算,将DAG划分为不同的stage,一个完成之后才可以计算下一个
Flink是标准的流执行模式,一个事件在一个节点处理完之后可以直接发到下一个节点处理

运行模式

yarn模式

session 会话模式
cd /opt/en/hadoop-2.7.2/etc/hadoop
bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
./bin/flink run -c com.atguigu.bidata.flinktest.Flink04_Wcout_ParameterTool -p 3 /opt/en/flink/FlinkTutorial-1.0-SNAPSHOT.jar --host hadoop103 --port 8888

job 工作模式

./flink run –m yarn-cluster -c com.atguigu.bidata.flinktest.Flink04_Wcout_ParameterTool /opt/en/flink/FlinkTutorial-1.0-SNAPSHOT.jar --host hadoop103 –port 8888

flink四大组件

JobManager作业:控制一个任务应用程序执行的主进程 。接收任务程序 转为执行图 向Resource申请资源 申请好之后分发到TaskManager 协调作用 例如检查点的协调
TaskManager任务管理器 工作进程(干活的) TaskManage当中的定义的插槽 solt数量决定了多少个线程 TaskManager 与JobManager进行心跳之类的交互 之外也可以和他同一运行程序的TaskManage交换数据 每个solt之间也可以交换数据
ResourceManager资源管理器
Dispacher分发器 接口 桥梁的作用

Job流程

提交应用                启动并提交应用给                请求slots                            启动 |发出提供slots(插槽 )

app -------------》Dispacher------------------------》JobManager-------------------》向ResourceManager《---------------------------------》TaskManager(JobManage提供slots、TaskManage提交要在slots中执行的任务。最后就是TaskManage之间交换数据)
注册slots

Yarn上的提交流程

Flink Client 首先上传Flink的Jar包和配置HDFS 提交Job到ResourceManager 启动 ApplicationMaster开启NodeManager 申请Container容器 启动TaskManager

任务调度的原则

首先编写代码 生成数据流图 根据客户端 提交到flink集群 提交job 到JobManager JobManager 返回状态 结果。 JobManager分配开的每一个任务 都会在每个TaskManager下的solt执行 多线程执行 JobManager和TaskManager心跳之类的交互 TaskManager内数据会向下流动。

并行度

代码、-p、集群默认配置 ;并行执行任务的程度。每个任务后面都可以设置并行度。一个特定算子的子任务个数被称为并行度。所有算子中最大的并行度就是stream的并行度。
solt就是执行一个独立任务独立线程所需要的最小的资源 执行具有前后关系可以在一个solt里面资源共享 不具备前后关系的任务不分在一个solt里面 不同的共享组不同的solt

如何实现并行?
设置并行度,分发到不同的solts下运行
并行的任务 需要多少个solt?
分组 最大的solt数量

流图-》工作图-》执行图-》物理执行图

数据传输形式

不同算子之间 并行度不同
one to one stream维护着分区以及元素顺序窄依赖 (flat map 、map)| (洗牌) stream的分区会发生改变 类似于spark 轮询 shuffle过程 宽依赖 flink当中洗牌

任务链

相同并行度的one to one操作 Flink这样相连的算子链接在一起形成task 原来的算子成为里面的subtask one to one 并行度相同俩个条件缺一不可 加上同一共享组 不想合并也可以增加分区操作 或者是断开 还有就是环境里面全局禁用 前不断后断 调用方法 开始一个新的任务链合并

Flink API

Flink 主要分为四部:enviroment-》source-》transform->sink

1、配置执行环境 设置并行度 本地 提交远程
2、Source
3、Kafka
Kafka配置
UDF 自定义数据方法
实现SourceFunction 重写run和cancel方法 还有就是定义一个Bean来封装数据

Transform

Base

Map、FlatMap、Filter 都是one to one 的模式 没有分区 shuffle 打乱 /洗牌操作

KeyBy

将流拆分为不同的分区 :通过hash cord 求hash值 例如 10%3.。。。。1 分一分区 13%1.。。。。。1 分在一分区 。特点 相同key的数据一定在同一分区里面 不同的key也可能在同一分区里min

min和minby

区别 min:值 minby 最小值包含的那条值
基本/简单算子 map 形状 flatmap 拆分/打散 filter 过滤

reduce 分组

split and select

分流,按照字段进行分流

connect

合流 将老的转为元组类型 与低温流链接合并之后输出一个状态信息
一国俩制

重写方法 每个流输出不同结果
注意仅仅只能是一国俩制 map1 map2

union

联合多条流

Sink

Kafka 配置

JDBC链接和执行sql语句 自定义方法即可

数据类型

基础数据类型 Tupes 元组 bean对象 List、Arrays、Maps

window窗口

(范围 将无界的数据流切为有界流 数据分发到有限大小的桶bucket进行分析 一段时间
类型:时间窗口 Time Window 包含 滚动时间窗口 Tumbling time 固定的时间 没有重叠 时间对齐 左闭右开 固定的 size
滑动时间窗口 Sliding Windows 固定的窗口和滑动的间隔组成 长度固定可以叠加 类似于滑动的窗户 特殊的滚动窗口 看步长属于哪几个窗口 看设置的时间
会话时间窗口 时间不对齐 一段时间没有收到新的消息就沈城新的窗口 最小时间间隔 基于时间
计数窗口

Count Window 滚动计数窗口 滑动计数窗口

全局窗口

global window

窗口函数

俩大类 增量聚合函数 ReduceFunction 来一条计算一条 聚合操作

全窗口函数

所有数据收集起来 等到计算的时候会遍历所有数据 例如计算平均值或者是百分比
api 注意 window函数 必须在keyby使用之后才可以使用

滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(15)));
滑动 .timeWindow(Time.seconds(10),Time.seconds(5));
会话窗口 .window(EventTimeSessionWindows.withGap(Time.minutes(10)));
滚动计数窗口 .countWindow(5);
滑动计数窗口 .countWindow(10,2);
增量聚合窗口 aggregate
全局窗口 把所有的数据收集起来最后计算 apply

时间语义

Event Time 事件创建的时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间
乱序数据的影响 数据迟到状态
flink以EventTimem模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子
由于网络、分布式、会导致乱序
watermark 调慢所有时间 (延迟) 还有迟到的就使用window函数
允许数据迟到的时间和迟到数据的处理
迟到数据处理 时间语义基本上配合window使用
首先设置环境事件时间Event 然后配置时间戳和watermark 全局延迟时间 然后keyBy使用window 设置开窗时间 设置允许迟到时间 和迟到数据的处理

注意 并行度的设置 数据轮询分区 不影响后面的数据

状态管理

flink中的状态:算子的状态(Operatior State) 键控状态(Keyed State)状态后端(State Backends) 来一个处理一个的算子 被称为无状态算子/任务
可以认为是一个本地变量,可以被任务的业务访问逻辑范围 flink会进行状态管理 包括状态一致性、故障处理以及高效存储和访问
状态是跟特定的算子关联在一起的
算子需要注册其状态
总的来说,有俩种类型的状态:
算子状态(Operatior State)
算子状态的作用范围限定算子
键控状态(Keyed State)
根据输入流当中定义的key来维护和访问
列表状态(List State):一组
联合列表状态(Union List State):发生故障如何恢复
广播状态:(Broadcast State) 如果一个算子有多任务 多个任务状态又相同 那这种特殊情况就可以使用广播状态
值状态 Value State vlues 必须在richfunction里面 因为要获取运行状态
列表状态 List State list
映射状态 Map State k v
聚合状态 Reducing state & Aggregating State

状态后端

State Backends
状态存储访问和维护 组件-----可插入组件:状态后端 State Backends
状态后端主要负责本地状态管理,以及检测点

三种状态后端

MemoryStateBackend

内存级别状态后端 放在TaskManager的JVM堆内存上 将checkpoint存储在JobManager内存中 快速、低延迟、但不稳定

FsStateBackend

将checkpoint存在远程持久化文件系统上 FlieSystem上 ,而对于本地状态跟MemoryStateBackend一样

BocksDBStateBackend

将所有状态序列号之后尺寸在本地的RocksDB中存储
可在配置文件中设置
也可以代码中设置

Flink底层Process API

绝招 包含前面 没法解决的问题的方法
例如可以创建watermark
包含RichFunction里面的功能
能够创建闹钟 分流等操作 包含状态等

Flink当中的容错机制

一致性检查点checkpoint:一致性: 数据处理到哪里的/状态就可以 所有任务在某个时间点的一份快照是所有任务都恰好处理完一个相同的输入数据的时候 元数据

从检查点恢复

flink会定期保存状态的一致检查点 发生故障的时候flink会使用最近的检查点来一致恢复应用程序的状态并重新启动处理流程 开始消费并且处理检查点到发生故障之前的所有数据
这种检查点的保存和恢复机制可以为程序状态提供精准一次的一致性,因为所有算子都会保存检查点并恢复所有状态,这样依赖所有的输入流都会被重置到检查点完成时的状态 注意source 偏移量

Flink检查点算法(流 不可能保存全部数据)

1种简单的想法: 暂停应用保存状态到检查点,再恢复应用 (类似于拍集体照 全部手头事情暂停 拍照 慢。。。。 排队 之类的)
Flink的改进实现 (要快 低延迟)
基于Chandy-Lamport算法的分布式快照 自己保存自己的快照 分离开来
将检查点的保存和数据处理分离开,不暂停整个应用
第二步就是从checkpoint种读取数据,将状态重置
从检查点 重新启动应用程序后,其内部状态与检查点完成时的状态完全相同

检查点分界线:checkpoint barrier

flink检查点算法用到了一种称为分界线的特殊数据形式 哟过来把一条流数据按照 不同的检查点分开,基于分界线之前数据状态(state backends)改变 分界线所在检查点中 否则之后的检查点

保存点(save point)

标签: 大数据 笔记

本文转载自: https://blog.csdn.net/m0_55987538/article/details/139195915
版权归原作者 twm! 所有, 如有侵权,请联系我们删除。

“大数据自学路线笔记 持续更新中”的评论:

还没有评论