大数据面试指南
java
1、 排序,冒泡,选择
冒泡:
比较相邻两个数
双重循环,外层控制排序的轮次,内层控制比较的次数
选择:
假设一个最小值的索引,每次比较后,会更新最小值的索引,并更新最小值的值
也是双重循环,外层控制比较的轮次,内层控制比较的次数
2 、二分查找
首先确定中间值,然后和目标值比较,如果中间值大于目标值,则目标值在左边,更新中间值继续比较,直到中间值等于目标值,如果中间值小于目标值,则目标值在右边,如果中间值等于目标值,则找到目标值,结束。
3、 hashmap和hashtable的区别? hashmap的底层原理。
相同点:都是以键值对的方式存储数据
不同点:
hashmap是线程不安全的,允许键和值为null
hashtable是线程安全的,不允许键和值为null
底层原理:jdk1.7之前都是哈希表,即数组+链表,jdk1.8之后hashmap是数组+链表+红黑树
4、 arraylist和linklsit的区别?
相同点:都是线程不安全的
不同点:
arraylist的底层是数组,增删慢,查询快,效率高
linklist的底层是双链表,查询慢,增删快,效率高
5、 stringbuilder和stringbuffer的区别?
相同的点:底层都是字符数组,都是动态扩容
不同点:
stringbuilder是线程安全的,通过synchronized进行同步处理,效率低,多用于多线程
stringbuffer是线程不安全的,不进行同步处理,效率高
6 、String的特性
字符串一旦被创建就不能改变,
Java程序中的字符串都是该类的实例
因为String的对象不可变,所以它们可以共享(常量池)
字符串可以被看做一个字符数组
7 、实现多线程的几种方式?
三种方式
1 继承Thread类,重写run方法,创建线程对象,调用start方法
2 实现Runnable接口,重写run方法,创建线程对象,调用start方法
3 实现Callable接口,重写call方法,创建线程对象,调用start方法
8、 锁?单例?
- 锁
1、 锁是用于控制对共享资源的访问,防止多个线程同时访问造成数据不一致和竞争条件的问题
synchronized锁用于同步方法和代码块2、锁的种类
独占锁:可以进行读写操作
共享锁:只有读的权限
互斥锁:同一时间只允许一个访问者对其访问
读写锁:写只允许一个,读可以有多个
- 单例模式
2、单例模式分为两种:都是确保某个类只有一个实例,并提供全局访问
饿汉模式:实例在类的加载时被创建,线程安全
懒汉模式:调用获取对象的方法时被创建,线程不安全,多线程下使用会创建对个实例,可以考虑使用同步锁(synchronized)解决,但锁的粒度太大,会影响性能
9 、java中的修饰区别?public,static等
public可以在任何类中访问
protected:只能在本包和子类中访问,一般用于继承
default:只能在本包中访问
private:只能在本类中访问
static:用于创建类变量和类方法,它是类不是实例
final:用于声明常量,防止方法的重写和类的继承
10、 jvm中对象的生命周期?
1、加载
类的加载器类加载器通过类的全限定名找到并加载类的字节码文件。加载完成后,JVM会为每个类生成一个唯一的Class对象,并存储在方法区中
2、验证
虚拟机对加载的类进行各种验证,以确保类文件的字节流符合JVM规范,例如验证字节码的格式、语义是否正确,以及是否符合安全规范
3、准备
JVM为类的静态变量分配内存,并设置默认初始值。
4、解析
虚拟机将常量池内的符号引用替换为直接引用。解析阶段是可选的,某些虚拟机可以延迟到使用时才进行解析。
5、初始化
JVM执行类构造器 () 方法,初始化类的静态变量和静态代码块。这是类加载过程中的最后一步,
且是线程安全的,确保类的静态变量在多线程环境中正确初始化。
6、使用
对象的生命周期在程序中可以通过引用持续使用
7、卸载
类卸载是指从方法区中删除类的类型信息、常量、静态变量等数据,并且释放加载的类字节码。
类卸载通常发生在类加载器被回收时,而类加载器回收的条件是无法访问到该类的实例,也就是无法到达的对象。
11、 gc机制?
负责回收不再被程序引用的对象,从而释放内存空间,避免内存泄漏和碎片化。GC机制的主要目标是优化内存使用和提高程序的性能。
12、 时间复杂度,空间复杂度
时间复杂度是指算法执行所需的时间与输入规模之间的关系。常见的时间复杂度符号包括 O(1)、O(n)、O(n^2)、O(log n)、O(n log n) 等。
空间复杂度是指算法在执行过程中所需的存储空间与输入规模之间的关系。常见的空间复杂度符号包括 O(1)、O(n)、O(n^2) 等。
终极问题: Java熟悉吗?
java的基本使用没有问题,比如大数据中我们也经常使用大数据框架的一些javaAPI,比如hbase读写,经常会用到。
在实际的工作中,我们也经常使用scala或者其他语言进行开发。
你能胜任Java岗位吗?
我非常有信心能够胜任Java部门的工作。首先,我在大学期间学习了Java编程,并且在多个项目中使用过Java,积累了一定的开发经验。
其次,我对编程语言和技术栈的学习和掌握能力较强,能够迅速上手并适应新的开发环境和工作需求。
此外,大数据开发中也涉及到很多Java相关的技术,例如Hadoop的MapReduce编程、Spark的核心编程API等,
这些经验都让我对Java有了更深入的理解。无论是在大数据部门还是Java部门,我都愿意接受新的挑战,并为公司贡献我的力量。”
Linux&Shell
1. Linux常用高级命令
序号命令命令解释1top查看内存2df -h查看磁盘存储情况3iotop查看磁盘IO读写(yum install iotop安装)4iotop -o直接查看比较高的磁盘读写程序5netstat -tunlp | grep 端口号查看端口占用情况6uptime查看报告系统运行时长及平均负载7ps -aux查看进程
2. 用Shell写过哪些脚本
- 集群启动,分发脚本
- 数仓与mysql的导入导出
- 数仓层级内部的导入
- Spark任务启动脚本
- FlinkSQL任务执行脚本
MySQL
1.什么是关系型数据库?
关系型数据库(RDB,Relational Database)就是一种建立在关系模型的基础上的数据库。关系模型表明了数据库中所存储的数据之间的联系(一对一、一对多、多对多)。
关系型数据库中,数据都被存放在了各种表中(比如用户表),表中的每一行就存放着一条数据(比如一个用户的信息)
大部分关系型数据库都使用 SQL 来操作数据库中的数据。并且,大部分关系型数据库都支持事务的四大特性(ACID)。
2.什么是SQL?
SQL 是一种结构化查询语言(Structured Query Language),专门用来与数据库打交道,目的是提供一种从数据库中读写数据的简单有效的方法。
几乎所有的主流关系数据库都支持 SQL ,适用性非常强。并且,一些非关系型数据库也兼容 SQL 或者使用的是类似于 SQL 的查询语言。
SQL 可以帮助我们:
- 新建数据库、数据表、字段;
- 在数据库中增加,删除,修改,查询数据;
- 新建视图、函数、存储过程;
- 对数据库中的数据进行简单的数据分析;
- 搭配 Hive、Spark SQL、Flink SQL进行大数据分析;
- 搭配 SQL Flow 做机器学习;
SQL主要分为四类:
- DDL(Data Definition Language)数据定义语言用来定义数据库对象:数据库,表,列等。关键字:create,drop,alter等
- DML(Data Manipulation Language)数据操纵语言用来对数据库中表的数据进行增删改。关键字:insert.delete,update等
- DQL(Data Query Language)数据查询语言用来查询数据库中表的记录(数据)。关键字:select,where等
- DCL(Data Control Language)数据控制语言用来定义数据库的访问权限和安全级别,及创建用户。关键字:GRANT,REVOKE等
3.什么是事务?
设想一个场景,这个场景中需要插入多条相关联的数据到数据库,不幸的是,这个过程可能会遇到下面这些问题:
- 数据库中途突然因为某些原因挂掉了。
- 客户端突然因为网络原因连接不上数据库了。
- 并发访问数据库时,多个线程同时写入数据库,覆盖了彼此的更改。
- ……
上面的任何一个问题都可能会导致数据的不一致性。为了保证数据的一致性,系统必须能够处理这些问题。事务就是我们抽象出来简化这些问题的首选机制。事务的概念起源于数据库,目前,已经成为一个比较广泛的概念。
简而言之:事务是逻辑上的一组操作,要么都执行,要么都不执行。
事务最经典也经常被拿出来说例子就是转账了。假如小明要给小红转账 1000 元,这个转账会涉及到两个关键操作,这两个操作必须都成功或者都失败。
- 将小明的余额减少 1000 元
- 将小红的余额增加 1000 元。
事务会把这两个操作就可以看成逻辑上的一个整体,这个整体包含的操作要么都成功,要么都要失败。这样就不会出现小明余额减少而小红的余额却并没有增加的情况。
4.事务的四大特征ACID
- 原子性(Atomicity)> 原子性是指事务包含的所有操作要么全部成功,要么全部失败回滚。
- 一致性(Consistency)> 一致性是指事务必须使数据库从一个一致性状态变换到另一个一致性状态,也就是说一个事务执行之前和执行之后都必须处于一致性状态。> > 拿转账来说,假设用户A和用户B两者的钱加起来一共是5000,那么不管A和B之间如何转账,转几次账,事务结束后两个用户的钱相加起来应该还得是5000,这就是事务的一致性。
- 隔离性(Isolation)> 隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。> > 即要达到这么一种效果:对于任意两个并发的事务T1和T2,在事务T1看来,T2要么在T1开始之前就已经结束,要么在T1结束之后才开始,这样每个事务都感觉不到有其他事务在并发地执行。
- 持久性(Durability)> 持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的,即便是在数据库系统遇到故障的情况下也不会丢失提交事务的操作。> > 例如在使用JDBC操作数据库时,在提交事务方法后,提示用户事务操作完成,当程序执行完成直到看到提示后,就可以认定事务以及正确提交,即使这时候数据库出现了问题,也必须要将事务完全执行完成,否则就会造成看到提示事务处理完毕,但是数据库因为故障而没有执行事务的重大错误。
5.MySQL中的约束
对表中的数据进行限定,保证数据的正确性、有效性和完整性
- 非空约束:not null,值不能为null
- 唯一约束:unique,值不能重复
- 主键约束:primary key
- 外键约束:foreign key,让表与表产生关系,从而保证数据的正确性
6.数据库的设计
1.多表之间的关系
- 一对一(了解)如:人和身份证分析:一个人只有一个身份证,一个身份证只能对应一个人实现方式:一对一关系实现,可以在任意一方添加唯一外键指向另一方的主键。
- 一对多(多对一)如:部门和员工分析:一个部门有多个员工,一个员工只能对应一个部门实现方式:在多的一方建立外键,指向一另一方的主键。
- 多对多如:学生和课程分析:一个学生可以选择很多门课程,一个课程也可以被很多学生选择实现方式:多对多关系实现需要借助第三张中间表。中间表至少包含两个字段,这两个字段作为第三张表的外键,分别指向两张表的主键。
2.数据库设计的范式
设计数据库时,需要遵循的一些规范。要遵循后边的范式要求,必须先遵循前边的所有范式要求。
基本表及其字段之间的关系, 应尽量满足第三范式。
但是,满足第三范式的数据库设计,往往不是最好的设计。
为了提高数据库的运行效率,常常需要降低范式标准:适当增加冗余,达到以空间换时间的目的。
在实际开发中最为常见的设计范式有三个:
- 1.第一范式(确保每列保持原子性)> 第一范式是最基本的范式。如果数据库表中的所有字段值都是不可分解的原子值,就说明该数据库表满足了第一范式。> > 第一范式的合理遵循需要根据系统的实际需求来定。比如某些数据库系统中需要用到“地址”这个属性,本来直接将“地址”属性设计成一个数据库表的字段就行。但是如果系统经常会访问“地址”属性中的“城市”部分,那么就非要将“地址”这个属性重新拆分为省份、城市、详细地址等多个部分进行存储,这样在对地址中某一部分操作的时候将非常方便。这样设计才算满足了数据库的第一范式,如下表所示。上表所示的用户信息遵循了第一范式的要求,这样在对用户使用城市进行分类的时候就非常方便,也提高了数据库的性能。
- 2.第二范式(确保表中的每列都和主键相关)> 第二范式在第一范式的基础之上更进一层。> > 第二范式需要确保数据库表中的每一列都和主键相关,而不能只与主键的某一部分相关(主要针对联合主键而言)。> > 也就是说在一个数据库表中,一个表中只能保存一种数据,不可以把多种数据保存在同一张数据库表中。比如要设计一个订单信息表,因为订单中可能会有多种商品,所以要将订单编号和商品编号作为数据库表的联合主键,如下表所示。订单信息表这样就产生一个问题:这个表中是以订单编号和商品编号作为联合主键。这样在该表中商品名称、单位、商品价格等信息不与该表的主键相关,而仅仅是与商品编号相关。所以在这里违反了第二范式的设计原则。而如果把这个订单信息表进行拆分,把商品信息分离到另一个表中,把订单项目表也分离到另一个表中,就非常完美了。如下所示。这样设计,在很大程度上减小了数据库的冗余。如果要获取订单的商品信息,使用商品编号到商品信息表中查询即可。
- 3.第三范式(确保每列都和主键列直接相关,而不是间接相关)> 第三范式需要确保数据表中的每一列数据都和主键直接相关,而不能间接相关。比如在设计一个订单数据表的时候,可以将客户编号作为一个外键和订单表建立相应的关系。而不可以在订单表中添加关于客户其它信息(比如姓名、所属公司等)的字段。如下面这两个表所示的设计就是一个满足第三范式的数据库表。这样在查询订单信息的时候,就可以使用客户编号来引用客户信息表中的记录,也不必在订单信息表中多次输入客户信息的内容,减小了数据冗余。
ClickHouse
1. 简单介绍一下ClickHouse
ClickHouse的全称是Click Stream,Data WareHouse,简称ClickHouse。
ClickHouse 是近年来备受关注的开源列式数据库管理系统,主要用于数据分析 (OLAP)领域。通过向量化执行以及对CPU底层指令集(SIMD)的使用,它可以对海量数据进行并行处理,从而加快数据的处理速度。
ClickHouse从 OLAP 场景需求出发,定制开发了一套全新的高效列式存储引擎,并且实现了数据有序存储、主键索引、稀疏索引、数据 Sharding、数据 Partitioning、TTL、主备复制等丰富功能。
2. ClickHouse 有哪些应用场景
- 绝大多数请求都是用于读访问的;
- 数据需要以大批次(大于 1000 行)进行更新,而不是单行更新;
- 数据只是添加到数据库,没有必要修改;
- 读取数据时,会从数据库中提取出大量的行,但只用到一小部分列;
- 表很“宽”,即表中包含大量的列;
- 查询频率相对较低(通常每台服务器每秒查询数百次或更少);
- 对于简单查询,允许大约 50 毫秒的延迟;
- 列的值是比较小的数值和短字符串(例如,每个 URL只有 60 个字节);
- 在处理单个查询时需要高吞吐量(每台服务器每秒高达数十亿行);
- 不需要事务;
- 数据一致性要求较低;
- 每次查询中只会查询一个大表。除了一个大表,其余都是小表;
- 查询结果显著小于数据源。即数据有过滤或聚合。返回结果不超过单个服务器内存。
3. ClickHouse具有哪些特点
- 支持完备的SQL操作
- 列式存储与数据压缩
- 向量化执行引擎
- 关系型模型(与传统数据库类似)
- 丰富的表引擎
- 并行处理
- 在线查询
- 数据分片
4. ClickHouse有哪些劣势
- 不支持真正的
delete/update
操作,不支持transactions
(事物) - 不支持二级索引
- 不支持高并发查询,官方建议
100 QPS
> ClickHouse是并行计算,单个查询就可以跑满多个CPU核心,而不像MySQL单个查询单线程执行。 - 需要批量写入,频繁的单条写入会带来写入问题> ClickHouse存储结构有点类LSM,每次的insert基本都会生成一个文件目录,后台线程Merge目录文件,如果频繁写入,> 后台线程就会Merge不过来,产生>
> Too many parts>
> 异常。建议每秒不超过一次写入,并且是Batch写入。 - 有限的SQL语法支持,JOIN语法也比较另类,暂时不支持窗口函数
5. ClickHouse的架构
- ClickHouse 采用典型的分组式的分布式架构,包括:- Shard:集群内划分为多个分片或分组(Shard 0 … Shard N),通过 Shard的线性扩展能力,支持海量数据的分布式存储计算。- Node:每个 Shard 内包含一定数量的节点(Node,即进程),同一 Shard 内的节点互为副本,保障数据可靠。ClickHouse 中副本数可按需建设,且逻辑上不同 Shard 内的副本数可不同。- ZooKeeper Service:集群所有节点对等,节点间通过 ZooKeeper 服务进行分布式协调
6. ClickHouse为何如此之快
- 将硬件性能发挥到极致> 基于将硬件功效最大化的目的,ClickHouse会在内存中进行GROUP BY,并且使用HashTable装载数据。
- 算法方面精益求精> 在ClickHouse的底层实现中,经常会面对一些重复的场景,例如字符串子串查询、数组排序、使用HashTable等。对于不同的场景会用不同的算法。
- 勇于尝鲜,不行就换> 除了字符串之外,其余的场景也与它类似,ClickHouse会使用最合适、最快的算法。如果世面上出现了号称性能强大的新算法,ClickHouse团队会立即将其纳入并进行验证。如果效果不错,就保留使用;如果性能不尽人意,就将其抛弃。
- 特定场景,特殊优化> 针对同一个场景的不同状况,选择使用不同的实现方式,尽可能将性能最大化。关于这一点,其实在前面介绍字符串查询时,针对不同场景选择不同算法的思路就有体现了。类似的例子还有很多,例如去重计数uniqCombined函数,会根据数据量的不同选择不同的算法:当数据量较小的时候,会选择Array保存;当数据量中等的时候,会选择HashSet;而当数据量很大的时候,则使用HyperLogLog算法。> 对于数据结构比较清晰的场景,会通过代码生成技术实现循环展开,以减少循环次数。接着就是大家熟知的大杀器—向量化执行了。SIMD被广泛地应用于文本转换、数据过滤、数据解压和JSON转换等场景。相较于单纯地使用CPU,利用寄存器暴力优化也算是一种降维打击了。
- 持续测试,持续改进> 如果只是单纯地在上述细节上下功夫,还不足以构建出如此强大的ClickHouse,还需要拥有一个能够持续验证、持续改进的机制。由于Yandex的天然优势,ClickHouse经常会使用真实的数据进行测试,这一点很好地保证了测试场景的真实性。与此同时,ClickHouse也是我见过的发版速度最快的开源软件了,差不多每个月都能发布一个版本。没有一个可靠的持续集成环境,这一点是做不到的。正因为拥有这样的发版频率,ClickHouse才能够快速迭代、快速改进。
- 行存储和列存储> 分析场景中,我们一般会读大量的行而取少量的列,在列式存储结构下,我们只需要取对应的列数据就可以,不参与计算的列完全不会被扫描到,这会极大的降低磁盘 IO 的消耗。
- 数据压缩> 基于列式存储的结构,同一列中的数据属于同一类型,压缩效果会更加显著。列存储往有着高达十倍甚至更高的压缩比,节省了大量的存储空间,降低了存储成本。
- 向量化执行引擎> SIMD(Single Instruction Multiple Data)即单条指令操作多条数据,它是通过数据并行以提高性能的一种方式,可以简单理解为在寄存器层面对程序中的数据做并行处理,Clickhouse 在能够提升计算效率的地方大量使用了 SIMD,通过使用 SIMD,基本上能带来几倍的性能提升,像阿里云的 PolarDB-X 也引入了向量化执行引擎,为表达式计算带来了几十倍的性能提升。
- 多线程与分布式> 分布式领域存在一条定律,计算移动比数据移动更加划算,这也是其核心所在,将数据的计算直接发放到数据所在的服务器,多机并行处理,再把最终的结果汇集在一起;另外 Clickhouse 也通过线程级别并行的方式为效率进一步提速,极致去利用服务器的资源。
- 多样化的表引擎> ClickHouse 提供了大量的数据引擎,分为数据库引擎、表引擎> MergeTree 作为家族系列最基础的表引擎,主要有以下特点:> > > - 存储的数据按照主键排序,允许创建稀疏索引,从而加快数据查询速度;> - 支持分区,可以通过 PRIMARY KEY语句指定分区字段> - 支持数据副本> - 支持数据采样
7、表引擎
1、mergetree:主要表引擎,支持索引和数据分区,适用于大数据计算
2、log:Log表引擎用于存储日志和其他机器数据。不支持索引和数据分区。适用于需要快速访问和计算的小数据量场景。
3、Memory:Memory表引擎将所有数据存储在RAM中,适用于临时表和小数据量。适用于需要快速访问和计算的小数据量场景。
4、Distributed:Distributed表引擎用于在多个节点间分布查询和数据。它自动分发查询和数据到所有节点。适用于需要在多个节点间进行数据分析的分布式场景。
redis
概念:一种开源的、内存中的数据结构存储系统
数据类型:String:字符串
list:有序的字符串
hash:键值对
set:无序唯一的字符串集合
zset:有序唯一的字符串集合
两种持久化机制:快照、AOF 只追加日志文件
穿透:大量的用户访问缓存和数据库都不存在的数据
解决方法:缓存空值;布隆过滤器:在缓存层增加布隆过滤器,过滤掉不存在的请求。
雪崩:缓存系统中大量缓存数据在同一时间过期,导致大量请求直接访问数据库,造成数据库压力骤增,甚至导致数据库崩溃。
解决方法:设置缓存失效时间随机化;提前加载热点数据到缓存中
击穿:某些热点数据在缓存失效的一瞬间,有大量并发请求同时访问这些热点数据,由于缓存未命中,所有请求都会直接访问数据库,导致数据库压力过大。
解决方法:缓存永不过期,定时更新缓存;互斥锁,保证只有一个请求去加载数据并更新缓存,其他请求等待。
Hadoop
1. Hadoop常用端口号
作用hadoop2.xHadoop3.x访问HDFS端口500709870访问MR执行情况端口80888088历史服务器1988819888客户端访问集群端口90008020
2. HDFS读流程和写流程
- 读流程
- Client 向 NameNode 发起 RPC 请求,来确定请求文件 block 所在的位置;
- NameNode 会视情况返回文件的部分或者全部 block 列表,对于每个 block,NameNode 都会返回含有该 block 副本的 DataNode 地址;
- 这些返回的 DataNode 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离Client近的排靠前;心跳机制中超时汇报的 DataNode 状态为 STALE,这样的排靠后;
- Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是 DataNode,那么将从本地直接获取数据;底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;
- 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向 NameNode 获取下一批的 block 列表;
- 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的 DataNode 继续读。
- read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回 Client 请求包含块的 DataNode 地址,并不是返回请求块的数据;
- 最终读取来所有的 block 会合并成一个完整的最终文件。
- 写流程
- 总结:
写:
1、client通过rpc通信框架向namenode发送写的请求,namenode会检查用户是否有上传文件的权限,磁盘空间是否足够以及文件的路径是否存在。
2、 如果满足条件,namenode会针对这个文件创建一个Entry对象,并返回成功的状态给DFS,DFS接收到成功的状态后,会创建FSDataOutputStream对象给客户端使用。
3、然后向namenode获取文件存储在HDFS所需要的所有的datanode的节点,namenode会针对这个文件产生的block块分配datanode节点,并且datanode之间会建立pipline通道为了将来传输数据的时候更快。
4、namenode会将该文件block块分配给datanode的信息返回给客户端,客户端根据机架感知与最近的datanode建立联系,客户端将一个block默认分为2048个packet进行发送,每次发送一个packet,直到一个block块中的所有的packet发送完毕,每发送完一个block块会返回一个确认值给客户端,当最后一个block块发送完毕时,会释放FSDataOutputStream对象,关闭datanode之间的pipline通道,文件上传完毕。
3. MapReduce流程
计算流程是: map阶段 —> shuffle阶段 —> reduce阶段
- map阶段:在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,默认一个block对应一个切片。一个切片对应一个MapTask, 切片完成后开始执行自定义map代码逻辑
- shuffle阶段:
- Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
- 写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。
- Reduce阶段:当MapTask执行完成之后开始执行ReduceTask - 执行ReduceTask之前会先从Map端拉取- 从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。- 最后将合并后的结果作为输入传入Reduce任务中。- 最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。
- mapreduce的总结
将一个文件上传到hdfs中,默认会按照每128M切分一个block块,在map任务之前会对数据进行逻辑切分,大小也是128M,根据切片产生map任务,
map任务以键值对的方式读取数据,数据会进入到环形缓冲区(基于内存设计处来的数据结构)中,根据reduce的数量和数据本身键值对的值进行分区编号,编号的作用会决定数据会被那个reduce所拉取,编号之后会进快速排序,当数据写入到环形缓冲区达到80%时,数据会以小文件的方式溢写到磁盘中,然后对小文件进行合并,合并的算法是归并排序,
产生map任务的结果文件,reduce端会根据编号将相同编号的map任务的结果文件拉取到同一个reduce当中,将分区编号去掉,进行归并排序,并执行reduce逻辑,产生reduce端结果文件
3. Shuffle优化
- 配置方面: 1. 增大map阶段的缓冲区大小。2. map阶段输出结果使压缩;压缩算法使用lzo。3. 增加reduce阶段copy数据线程数。4. 增加副本数,从而提高计算时的数据本地化。
- 程序方面: 1. 在不影响计算结果的情况下建议使用combiner。2. 输出结果的序列化类型尽量选择占用字节少的类型。
- 架构方面: 1. 将http改为udp,因为http还要进行3次握手操作。
4. Yarn工作机制
1、client向yarn提交job作业,首先找ResourceManager分配资源,
2、ResourceManager开启一个Container,在Contatiner中运行一个Applicationmanager
3、Applicationmanager找一台nodemanager启动Applicationmaster,计算任务所需的资源
4、Applicationmaster向Applicationmanager申请运行任务所需的资源
5、Resourcescheduler将资源封装发给Applicationmaster
6、Applicationmaster将获取到的资源分配给各个nodemanager
7、各个nodemanger得到任务和资源开始执行map task
8、maptask执行结束后,开始执行reduce task
9、map task和reduce task将执行的结果反馈给Applicationmaster
10、Applicationmaster将任务执行的结果反馈给Applicationmanager
5. Yarn调度器
1)Hadoop调度器重要分为三类:
- FIFO 、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
- Apache默认的资源调度器是容量调度器;
- CDH默认的资源调度器是公平调度器。
2)区别:
- FIFO调度器:支持单队列 、先进先出 生产环境不会用。
- 容量调度器:支持多队列,保证先进入的任务优先执行。
- 公平调度器:支持多队列,保证每个任务公平享有队列资源。
3)在生产环境下怎么选择?
- 大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK;
- 中小公司,集群服务器资源不太充裕选择容量。
4)在生产环境怎么创建队列?
- 调度器默认就1个default队列,不能满足生产要求。
- 按照框架:hive /spark/ flink 每个框架的任务放入指定的队列(企业用的不是特别多)
- 按照业务模块:登录注册、购物车、下单、业务部门1、业务部门2
5)创建多队列的好处?
- 因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。
- 实现任务的降级使用,特殊时期保证重要的任务队列资源充足。
6. Hadoop宕机
- 如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
- 如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize
7. Hadoop解决数据倾斜方法
1)提前在map进行combine,减少传输的数据量
- 在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。
- 如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。
2)导致数据倾斜的key 大量分布在不同的mapper
(1)局部聚合加全局聚合。
- 第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
- 第二次mapreduce,去掉key的随机前缀,进行全局聚合。
- 思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。
- 这个方法进行两次mapreduce,性能稍差。
(2)增加Reducer,提升并行度
- JobConf.setNumReduceTasks(int)
(3)实现自定义分区
- 根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer
8. 集群资源分配参数(项目中遇到的问题)
- 集群有30台机器,跑mr任务的时候发现5个map任务全都分配到了同一台机器上,这个可能是由于什么原因导致的吗?
- 解决方案:yarn.scheduler.fair.assignmultiple 这个参数 默认是开的,需要关掉
Hive
1. Hive的架构
- 概念
hive是基于Hadoop的一个数据仓库工具,将结构化数据文件映射成数据库表,并且提供SQL查询功能,可以将SQL语句转化成mapreduce执行
1、用户接口:client,jdbc/odbc,web/ui
2、元数据:表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、 表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的derby数据库中,推荐存储在MySQL数据库
3、hadoop:使用hdfs进行存储,使用mapreduce进行计算
4、driver:
解析器:将 SQL 字符串转换成抽象语法树 AST,这一步一般都用第 三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存在、字段是否存在、SQL 语义是否有误。
编译器:将 AST 编译生成逻辑执行计划
优化器:对逻辑执行计划进行优化
执行器:把逻辑执行计划转换成可以运行的物理计划
2. Hive和数据库比较
Hive和数据库除了拥有类似的查询语言,再无类似之处
- 数据存储位置 - Hive存储在HDFS上,数据库将数据保存在块设备或者本地文件系统中
- 数据更新 - Hive默认不支持update,delete操作,需要开始事务配置。一般场景不建议使用跟新。mysql支持更新删除操作。如果在hive中需要update。可以insert into 新表 select 字段1,字段2,if(更新条件,返回,否则返回)from 旧表
- 执行延迟 - Hive执行延迟较高,数据库的执行延迟较低。这个是有条件的,在数据规模较小时,在数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
- 数据规模 - Hive支持很大规模的数据计算,数据库可以支持的数据规模较小。
- SQL语法区别: 1 hive不支持where后面跟子查询。 2 hive支持建表分区操作,mysql不支持。 3 group by。hive中sql,select 中的维度字段,必须出现在group by 后面。mysql语法可以不用。 省份,城市,确诊病例 select pro,city,sum(quezhen) s from t group by pro,city 4 sort by,distribute by,group by 5 mapjoin ,mysql没有。
3. 内部表和外部表
内部表和外部表的区别在于元数据和原始数据
- 删除数据时: - 内部表:元数据和原始数据全部删除- 外部表:只删除元数据
- 在公司生产环境下,什么时候创建内部表,什么时候创建外部表? - 在公司中绝大多数场景都是创建外部表- 自己使用的临时表,才会创建内部表
4. 4个By的区别
在生产环境中order by用的较少,容易导致OOM
在生产环境中sort by+distribute by用的多
- order by:全局排序,只有一个reduce
- sort by:分区内有序
- distribute by:类似于MR中partition,进行分区,结合sort by使用
- cluster by:当distribute by和sort by字段相同的时候,可以使用cluster by方式。cluster by除了具有distribute by的功能外还兼具了sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
5. 系统函数
- date_add、date_sub函数(加减日期)
- next_day函数(周指标相关)
- date_format函数(根据格式整理日期)
- last_day函数(求当月最后一天日期)
- collect_set函数
- get_json_object解析json函数
- NVL(表达式1,表达式2) - 如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。
6. 自定义UDF、UDTF函数
- 在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤? > 自定义函数:> UDF:一对一,继承UDF类,重写evaluate()> UDAF:多对一,sum、count、max、min、avg,实现UDAF接口, init、iterate、terminatePartial、merge、terminate这几个函数> UDTF:一对多,将一行数据拆分成多行,继承GenericUDTF类,重写initlizer()、process()、close()。
- 为什么要定义UDF、UDTF? - 因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试- 有些需求SQL无法直接处理,可以使用代码辅助解决
7. 窗口函数
- Rank- Rank()排序相同时会重复,总数不会变 - rank() over(partiition by regionX order by nameX desc) as tn- 1 93;2 90;2 90;4 89 排名不是连续的,相同的分数是同名次,前 100 名只有 100 个- dense_rank()排序相同时会重复,总数会减少 - dense_rank() over()- 1 93;2 90;2 90;3 89 排名是连续的,相同的分数是同名次,前 100 名可能多于 100 个- row_number()会根据顺序计算 - row_number() over(partition by regionX order by nameX desc) as tn- 1 93;2 90;3 90 排名是连续的,相同的分数会有排名先后,前 100 名只有 100 个
- over():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化- current row:当前行- n preceding:往前n行数据- n following:往后n行数据- UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点- lag(col,n):往前第n行数据- lead(col,n):往后第n行数据- NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
- 手写TopN
# 例如 统计班级前3名select tt1.id ,tt1.name ,tt1.clazz ,tt1.sum_score ,tt1.rnfrom(select t1.id ,t1.name ,t1.clazz ,t2.sum_score ,row_number()over(partitionby clazz orderby t2.sum_score desc)as rn from students t1 leftjoin(select id ,sum(score)as sum_score from score groupby id ) t2 on t1.id = t2.id) tt1 where tt1.rn<=3;
8. Hive优化
- MapJoin- 如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即在reduce阶段完成join。容易发生数据倾斜,可以用MapJoin把小表全部加载到内存在map端进行join,避免reduce处理。
- 行列过滤- 列处理:在select中,只拿需要的列,如果有,尽量使用分区过滤,少用select *- 行处理:在分区裁剪中,当使用外关联时,如果将副表的过滤条件写在where后面,那么就会先全表关联,之后在过滤。
- 列式存储
- 采用分区技术 -避免全表扫描,提高查询效率。一般数据量比较大的表,要建分区,一般使用日期作为分区字段。
- 合理设置Map数- mapred.min.split.size:指的是数据的最小分割单元大小- mapred.max.split.size:指的是数据的最大分割单元大小- 通过调整max可以起到调整map数的作用,减小max可以增大map数,增大max可以减少map数- 需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。
- 合理设置Reduce数> reduce个数并不是越多越好- 过的的启动和初始化reduce也会消耗时间和资源- 另外有多少个reduce就会输出多少个文件,如果生成了很多个小文件,那么如果这些小文件作为下一次任务的输入,则也会出现小文件过多的问题- 在设置reduce个数的时候也需要考虑这两个原则;处理大数据量利用合适的reduce数,使单个的reduce任务处理数据量大小要合适
- 小文件如何产生的?- 动态分区插入数据,产生大量的小文件,从而导致map数量剧增- reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的)- 数据源本身就包含大量的小文件。
- 小文件解决方案1. 在Map执行前合并小文件,减少Map数 - CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。2. merge - SET hive.merge.mapfiles = true; – 默认true,在map-only任务结束时合并小文件- SET hive.merge.mapredfiles = true; – 默认false,在map-reduce任务结束时合并小文件- SET hive.merge.size.per.task = 268435456; – 默认256M- SET hive.merge.smallfiles.avgsize = 16777216; – 当输出文件的平均大小小于16m该值时,启动一个独立的map-reduce任务进行文件merge3. 开启JVM重用 - set mapreduce.job.jvm.numtasks=10
- 开启map端combiner(在不影响最终业务逻辑)- set hive.map.aggr=true;
- 压缩(选择快的)> 设置map端输出、中间结果压缩(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)- set hive.exec.compress.intermediate=true --启用中间数据压缩- set mapreduce.map.output.compress=true --启用最终数据压缩- set mapreduce.map.outout.compress.codec=…; --设置压缩方式
- 采用tez引擎或者spark引擎
9. Hive解决数据倾斜方法
- 数据倾斜的本质原因 key值分布不均匀,key重复的比较多。一般在group by,join,distinct容易发生倾斜。
- 数据倾斜长什么样
- 怎么产生的数据倾斜1. 不同数据类型关联产生数据倾斜- 情形:比如用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时。- 后果:处理此特殊值的reduce耗时;只有一个reduce任务。默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。- 解决方案:把数据类型转换成字符串类型32+-
select * from users aleft outer join logs bon a.usr_id = cast(b.user_id as string)
2. 控制空值分布> 在生产环境经常会用大量空值vcxz数据进入到一个reduce中去,导致数据倾斜- 解决方案:自定义分区,将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个Reducer。- 注意:对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算量大大减少 - 解决数据倾斜的方法1. group by> group by优于distinct group- 采用sum() group by 的方式来替换count(distinct) 完成计算2. mapjoin3. 开启数据倾斜时负载均衡> 思想:先随机分发并处理,再按照key group by来分发处理> > 操作:当选项设置为true,生成的查询计划会有两个MRJob> > 第一个MRJob中,map的输出结果集合会随机分布到reduce中,每个reduce在做部分聚合操作,并输出结果,这样的处理结果是相同的group by key有可能被分发到不同的reduce中,从而达到负载均衡的目的> > 第二个MRJob在根据预处理的数据结果按照group by key分不到reduce中(这个可以保证相同的原始group by key被分布到同一个reduce中),最后完成最终的聚合操作。- set hive.groupby.skewindata=true;4. 总结> 产生的原因:key值分布不均匀,建表考虑不周全,业务数据本身的特性> 解决方案:> 1、小表join大表,使用mapjoin小表,或者修改hive的配置自动开启mapjoin,set hive.auto.convert.join=true;> 2、大表join大表,a、对空key进行过滤,b、对空key进行转换,对空key的值进行随机赋值> 3、在join的时候,尽可能选择相同的连接键> 4、map-side聚合,set hive.map.aggr=true;
10. Hive里边字段的分隔符用的是什么?为什么用\t?有遇到过字段里边有\t的情况吗,怎么处理的?
- hive 默认的字段分隔符为ascii码的控制符\001(^A),建表的时候用fields terminated by ‘\001’。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和javaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束。一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。
11. Tez引擎优点
- Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能
- MR、Tez、Spark计算引擎的区别: - MR引擎:多job串联,基于磁盘,落盘的地方比较多,虽然慢,但一定能跑出结果,一般处理周、月、年指标- Spark引擎:虽然在shuffle过程中也落盘,但是并不是所有的算子都需要shuffle,尤其是多算子过程,中间过程不落盘,而且有DAG有向无环图,兼顾了可靠性和效率。一般处理天指标。- Tez引擎:完全基于内存,如果数据量特别大,请慎重,容易OOM。一般用于快速出结果,数据量比较小的场景。
12. MySQL元数据备份
- MySQL之元数据备份(项目中遇到的问题) - 元数据备份 - 如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个副本- MySQL utf8超过字节数问题 - MySQL的utf8编码最多存储3个字节,当数据中存在表情号、特色符号时会占用超过3个字节数的字节,那么会出现错误 Incorrect string value: ‘\xF0\x9F\x91\x91\xE5\xB0…’- 解决办法:将utf8修改为utf8mb4 - 首先修改库的基字符集和数据库排序规则- 再使用 SHOW VARIABLES LIKE ‘%char%’; 命令查看参数- 确保character_set_client、character_set_server、character_set_connection、character_set_database这几个参数的值为utf8mb4如果不是使用set命令进行修改,如set character_set_server = utf8mb4;
13. Union与Union all区别
- union会将联合的结果集去重,效果较union all差
- union all 不会对结果集去重,所以效率高
14. 数据清洗怎么做的?怎么用spark做数据清洗
数据清洗的目的是为了保证数据质量,包括数据的完整性、唯一性、一致性、合法性和权威性。数据清洗的结果是对各种脏数据进行对应的处理方式,从而得到标准的、干净的、连续的数据,提供给数据统计和数据挖掘使用。
- 解决数据的完整性问题 - 通过其他信息补全- 通过前后数据补全- 如果实在无法补全,虽然可惜,但是还是要剔除掉进行统计。如果后续其他分析还需要这没必要删除。
- 解决数据唯一性问题 - 根据主键进行去除,去除重复数据- 制定一系列规则,保证根据某种规则下只保存一条数据。
- 解决数据权威性的问题 - 选择最权威的数据作为统计和挖掘
- 解决合法性的问题 - 设定判定规则,通过特定的规则来判断字段或者值来确定数据是否需要被清洗
15. Hive分区分桶的区别
Hive分区:是指按照数据表的某列或者某些列分为多个区,区从形式上可以理解为文件夹,比如我们要收集某个大型网站的日志数据,一个网站每天的日志数据存在同一张表上,由于每天会生成大量的日志,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找。
Hive分桶:分桶是相对分区进行更细粒度的划分。分桶将整个数据内容安装某列属性值得hash值进行区分,如要安装name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件。
- 分桶对数据的处理比分区更加的细化,分区针对的是数据的存储路径,分桶针对的是数据文件
- 分桶是按照hash值进行切分的,相对来说比较公平,分区是按照列的值划分,容易造成数据倾斜
- 分桶、分区不干扰,分区表可以划分为分桶表
16. Hive的执行顺序
- from --> join on --> where --> group by --> 聚合函数 -->having -->select -->distinct–> order by
17. Hive和HBase的区别和联系
- 联系 - HBase和Hive都是架构在hadoop之上的,都是用HDFS作为底层存储
- 区别 - Hive是建立在Hadoop之上为了减少MapReduce jobs编写工作的批处理系统,HBase是为了支持弥补Hadoop对实时操作的缺陷的项目 。总的来说,hive是适用于离线数据的批处理,hbase是适用于实时数据的处理。- Hive本身不存储和计算数据,它完全依赖于HDFS存储数据和MapReduce处理数据,Hive中的表纯逻辑。- hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。- 由于HDFS的不可随机读写,hive是不支持随机写操作,而hbase支持随机写入操作。- HBase只支持简单的键查询,不支持复杂的条件查询
18. Spark on Hive和Hive on Spark的区别
- Spark on Hive> 顾名思义即将Spark构建在Hive之上,Spark需要用到Hive- 就是通过Spark SQL加载Hive的配置文件,获取到Hive的metastore信息,进而获得metastore,但底层运行的还是Spark RDD- Spark SQL获取到metastore之后就可以取访问Hive表的数据- 接下来就可以通过Spark SQL来操作Hive表中存储的数据
- Hive on Spark> 顾名思义即将Hive构建在Spark之上,Hive需要用到Spark- Hive的底层默认计算引擎从MapReduce改为Spark
19. Hive数据格式和压缩格式
行存储的特点:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中的一个值,其余的值都在相邻的地方,因此此时行存储查询的速度快
列存储的特点:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量,每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法
- 数据格式 > textfile:默认格式,不支持切片,可以使用load加载> sequencefile:二进制格式,其具有使用方便、可分割、可压缩的特点> rcfile:行列式文件,首先将表分为几个行组,对每个行组内的数据进行按列存储,每一列的数据都是分开存储> orcfile:对rcfile的优化,是列式存储,文件是可切分(Split)的,支持各种复杂的数据类型(list,map)> parquet:列存储
- 存储和压缩 > Gzip:不支持切片> Bzip2:Hadoop自带支持切片> LZO:支持切片> snappy:Hadoop本身不支持,需要安装,不支持切片
20. 连续登陆问题
在电商、物流和银行可能经常会遇到这样的需求:统计用户连续交易的总额、连续登陆天数、连续登陆开始和结束时间、间隔天数等
- 建表语句
createtable deal_tb(
id string
,datestr string
,amount string
)row format delimited fieldsterminatedby',';
- 计算逻辑
注意每个用户每天可能会有多条交易记录
- 先按用户和日期分组求和,使每个用户每天只有一条数据
select id
,datestr
,sum(amount)as sum_amount
from deal_tb
groupby id,datestr
- 根据用户ID分组按日期排序,将日期和分组序号相减得到连续登陆的开始日期,如果开始日期相同说明连续登陆
select tt1.id
,tt1.datestr
,tt1.sum_amount
,date_sub(tt1.datestr,rn)as grp
from(select t1.id
,t1.datestr
,t1.sum_amount
,row_number()over(partitionby id orderby datestr)as rn
from(select id
,datestr
,sum(amount)as sum_amount
from deal_tb
groupby id,datestr
) t1
) tt1
- 统计用户连续交易的总额、连续登陆天数、连续登陆开始和结束时间、间隔天数
select ttt1.id
,ttt1.grp
,round(sum(ttt1.sum_amount),2)as sc_sum_amount
,count(1)as sc_days
,min(ttt1.datestr)as sc_start_date
,max(ttt1.datestr)as sc_end_date
,datediff(ttt1.grp,lag(ttt1.grp,1)over(partitionby ttt1.id orderby ttt1.grp))as iv_days
from(select tt1.id
,tt1.datestr
,tt1.sum_amount
,date_sub(tt1.datestr,rn)as grp
from(select t1.id
,t1.datestr
,t1.sum_amount
,row_number()over(partitionby id orderby datestr)as rn
from(select id
,datestr
,sum(amount)as sum_amount
from deal_tb
groupby id,datestr
) t1
) tt1
) ttt1
groupby ttt1.id,ttt1.grp;
- 精简版
select t1.id
,t1.grp
,round(sum(t1.sum_amount),3)as total_amount -- 连续交易总额,count(1)as total_days -- 连续登录天数,min(datestr)as start_date -- 连续登录开始的时间,max(datestr)as end_date -- 连续登录结束的时间,datediff(t1.grp,lag(t1.grp,1)over(partitionby t1.id orderby t1.grp))as interval_days -- 间隔天数from(select id
,datestr
,round(sum(amount),3)as sum_amount
,date_sub(datestr,row_number()over(partitionby id orderby datestr))as grp
from deal_tb
groupby id,datestr
) t1 groupby t1.id,t1.grp;
- 结果
1 2019-02-07 13600.23 3 2019-02-08 2019-02-10 NULL
1 2019-02-08 2991.650 5 2019-02-12 2019-02-16 1
1 2019-02-09 1510.8 2 2019-02-18 2019-02-19 1
1 2019-02-10 537.71 1 2019-02-21 2019-02-21 1
2 2019-02-07 13600.23 3 2019-02-08 2019-02-10 NULL
2 2019-02-08 3026.649 4 2019-02-12 2019-02-15 1
2 2019-02-10 1510.8 2 2019-02-18 2019-02-19 2
2 2019-02-11 537.71 1 2019-02-21 2019-02-21 1
3 2019-02-07 13600.23 3 2019-02-08 2019-02-10 NULL
3 2019-02-08 2730.04 5 2019-02-12 2019-02-16 1
3 2019-02-09 1510.8 2 2019-02-18 2019-02-19 1
3 2019-02-10 537.71 1 2019-02-21 2019-02-21 1
数据仓库
1. 分层设计
数仓分层阿里整体分为了5层,分别是ODS,DWD,DIM,DWS,ADS
- ODS(Operational Data Store)1. 面向主题的”数据运营层,也叫ODS层,是最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就说传说中的 ETL 之后,装入本层。本层的数据,总体上大多是按照源头业务系统的分类方式而分类的。2. 一般来讲,为了考虑后续可能需要追溯数据问题,因此对于这一层就不建议做过多的数据清洗工作,原封不动地接入原始数据即可,至于数据的去噪、去重、异常值处理等过程可以放在后面的DWD层来做。
- DWD(Data Warehouse Detail)1. 该层一般保持和ODS层一样的数据粒度,并且提供一定的数据质量保证。同时,为了提高数据明细层 的易用性,该层会采用一些维度退化手法,将维度退化至事实表中,减少事实表和维表的关联。2. 另外,在该层也会做一部分的数据聚合,将相同主题的数据汇集到一张表中,提高数据的可用性。
- DWS(Data Warehouse Service)1. 又称数据集市或宽表。按照业务划分,如流量、订单、用户等,生成字段比较多的宽表,用于提供后 续的业务查询,OLAP分析,数据分发等。2. 一般来讲,该层的数据表会相对比较少,一张表会涵盖比较多的业务内容,由于其字段较多,因此一 般也会称该层的表为宽表。在实际计算中,如果直接从DWD或者ODS计算出宽表的统计指标,会存在计 算量太大并且维度太少的问题,因此一般的做法是,在DWM层先计算出多个小的中间表,然后再拼接成 一张DWS的宽表。由于宽和窄的界限不易界定,也可以去掉DWM这一层,只留DWS层,将所有的数据在 放在DWS亦可。
- DIM(Dimension)1. 维表层主要包含两部分数据: 1. 高基数维度数据:一般是用户维度表、商品维度表类似的维度表。数据量可能是千万级或者上亿级别。2. 低基数维度数据:一般是配置表,比如枚举值对应的中文含义,或者日期维表。数据量可能是个位数或者几千几万。
- ADS(Application Data Service)- 数据应用层ADS(Application Data Service):存放数据产品个性化的统计指标数据。根据DWD和DWS层加工生成。
2. 事实表有那些类型
- 事实表有三种类型:事务事实表、周期事实表和累积快照事实表
- 事务事实表也称原子事实表,描述业务过程,跟踪控件或时间上某点的度量事件,保存的是最原子的数据
- 周期快照事实表以一个周期为时间间隔,来记录事实,一般周期可以是每天、每周、每月、每年等
- 累积快照事实表用来描述过程开始和结束之间的关键步骤事件,覆盖过程的整个生命周期,通常具有多个日期字段来记录关键时间点;当过程随着生命周期不断变化时,记录也会随着过程的变化而被修改
- 三种事实表对比
3. 数仓建模的三大范式
- 第一范式:属性不可在分割
- 第二范式:所有非主属性都完全依赖于主关键字
- 第三范式:非主关键字不能依赖于其他非主关键字。即非主关键字之间不能有函数(传递)依赖关系
4. 数仓模型规范
- 禁止逆向调用
- 避免同层调用
- 优先使用公共层
- 避免跨层调用
5. 数仓中数据模型
- 星型模型 - 在数据仓库建模中,星型模型是维度建模中的一种选择方式。星型模型是以一个事实表和一组维度表组合而成,并且以事实表为中心,所有的维度表直接与事实表相连。
- 雪花型模型 - 雪花模型也是维度建模中的另一种选择,它是对星型模型的扩展,雪花模型的维度表可以拥有其他的维度表,并且维度表与维度表之间是相互关联的。因此,雪花模型相比星型模型更规范一些。但是,由于雪花模型需要关联多层的维度表,因此,性能也比星型模型要低,所以一般不是很常用。
- 星座模型 - 维表是共享状态的,可以被多个事实表关联使用,这种模式可以看做星型模式的汇集,因而称作星系模式或者事实星座模式
6. ODS层做哪些事情
- 保持数据原貌,不做任何修改
- 压缩采用LZO,压缩比是100g数据压缩完10g左右
- 创建分区表
7. DWD层做哪些事情
- 数据清洗 - 空值去除- 过滤核心字段无意义的数据,比如订单表中订单id为null,支付表中支付id为空- 将用户行为宽表和业务表进行数据一致性处理
- 脱敏 - 对手机号、身份证号等敏感数据脱敏
- 维度退化 - 对业务数据传过来的表进行维度退化和降维。(商品一级二级三级、省市县、年月日)
- 压缩LZO
- 列式存储parquet
8. 为什么要设计数据分层?
- 清晰数据结构:每一个数据分层都有它的作用域和职责,在使用表的时候能更方便地定位和理解
- 减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算
- 统一数据口径:通过数据分层,提供统一的数据出口,统一对外输出的数据口径
- 复杂问题简单化:将一个复杂的任务分解成多个步骤来完成,每一层解决特定的问题
9. 数据仓库的定义
- 首先,用于支持决策,面向分析型数据处理;其次,对多个异构的数据源有效集成,集成后按照主题进行重组,并包含历史数据,而且存放在数据仓库中的数据一般不再修改。
- 数据仓库(Data Warehouse)是一个面向主题的(subject oriented)、集成的(integrated)、相对稳定的(non-volatile)、反应历史变化(time variant)的数据集合,用于支持管理决策(decision making support)。
10. 数据仓库和数据库的区别?
从目标、用途、设计来说
- 数据库是面向事物处理的,数据是由日常的业务产生的,常更新;数据仓库是面向主题的,数据来源多样,经过一定的规则转换得到,用来分析。
- 数据库一般用来存储当前事务性数据,如交易数据;数据仓库一般存储的历史数据。
- 数据库的设计一般是符合三范式的,有最大的精确度和最小的冗余度,有利于数据的插入;数据仓库的设计一般不符合三范式,有利于查询
11. 事实表设计流程
- 选择业务 - 在明确了业务需求以后,接下来需要进行详细的需求分析,对业务的整个生命周期进行分 析,明确关键的业务步骤,从而选择与需求有关的业务过程。业务过程通常使用行为动词 表示业务执行的活动
- 声明粒度 - 粒度的声明是事实表建模非常重要的一步,意味着确定事实表的每一行所表示的业务含义, 粒度传递的是与事实表度量有关的细节层次。明确的粒度能确保对事实表中行的意思的理 解不会产生混淆,保证所有的事实按照同样的细节层次记录
- 确定维度 - 完成粒度声明以后,也就意味着确定了主键,对应的维度组合以及相关的维度字段就可以 确定了,应该选择能够描述清楚业务过程所处的环境的维度信息
- 确定事实 - 事实可以通过回答“过程的度量是什么”来确定。应该选择与业务过程有关的所有事实, 且事实的粒度要与所声明的事实表的粒度一致。事实有可加性、半可加性、非可加性三 种类型,需要将不可加性事实分解为可加的组件
- 冗余维度 - 在大数据的事实表模型设计中,考虑更多的是提高下游用户的使用效率,降低数据获取 的复杂性,减少关联的表数量。所以通常事实表中会冗余方便下游用户使用的常用维度, 以实现对事实表的过滤查询、控制聚合层次、排序数据以及定义主从关系等操作
数据中台
1. 专有名词解释
- 业务过程:- 指在业务中发生的最小单元的行为或事务,不可再拆分事件,比如创建订单,浏览网页等等。业务过程产生的行为明细, 比如支付了一笔订单,浏览了某个网页,最终都会汇集到事实表中,而大部分情况下,事实表都会聚焦于某个特定的业务 过程。
- 原子指标:- 原子指标是对指标统计口径、具体算法的一个抽象
- 业务限定:- 在进行数据分析时,有时需要根据业务规则来定义一些限定条件,以此来过滤出符合条件的记录。比如要统计【PC端访客 数】这个指标,则需要先创建【PC端】这个业务限定。后续,这个业务限定可以用于其他相同规则的指标定义,比如 【PC端浏览次数】等。
- 派生指标:- 派生指标即常见的统计指标,为保证统计指标标准、规范、无二义性地生成,基于四部分生成:原子指标(明确统计口径, 即计算逻辑)、业务限定(统计的业务范围,筛选出符合业务规则的记录)、统计周期(统计的时间范围,比如最近一天, 最近30天等)、统计粒度(统计分析的对象或视角,定义数据需要汇总的程度,可理解为聚合运算时的分组条件,粒度是 维度的一个组合,比如某个指标是某个卖家在某个省份的成交额,那么粒度就是卖家、地区这两个维度的组合)。
- 逻辑架构
- 指标体系生成
原子指标+统计周期+统计粒度+业务限定=派生指标
2. 数据中台建设方法论
企业的发展,往往伴随着业务更多元化,而与此同时企业在积极推进业务数据化,因此越来越多的企业伴随着各个垂直业务的发展,形成了一个个垂直的数据中心,如何打通这些数据并且以统一的标准进行建设,以达到技术降本、应用提效、业务赋能的目标,是众多企业面临的问题。阿里巴巴提出的数据中台模式正是为解决这些问题而生,并通过实践形成了统一全域数据体系,实现了计算存储累计过亿的成本降低、响应业务效率多倍提升、为业务快速创新提供坚实保障。以下将以OneData、OneID、OneService三大核心方法论介绍数据中台的理念。
- OneData - 数据标准化:数据规范定义从业务源头标准化;- 技术内核工具化:规范定义、建模研发、调度运维;- 元数据驱动智能化:从半自动化飞跃到智能化规划计算和存储。
- OneID - 技术驱动数据连接:基于超强ID识别技术连接数据;- 技术内核工具化:超强ID识别、高效标签生产;- 业务驱动技术价值化:从孤岛变化到高质量数据进化到高价值数据。- OneID技术在业务中的主要应用于消除数据孤岛、提高营销触达、精确人群圈选以及360客户特征分析等四大主要场景。
- OneService - 主题式数据服务:主题逻辑表屏蔽复杂物理表;- 统一多样化数据服务:一般查询加OLAP分析配合在线服务;- 跨源数据服务:屏蔽多种异构数据源
Flume
1.Flume工作机制是什么
- Flume 用于从大量不同的源有效的收集、聚合、移动大量日志数据进行集中式存储。
- Flume 的核心是 Agent,Agent 中包括 Source、Channel 和 Sink。Agent 是最小的独立运行单位。在 Agent 中数据流向为 Source>Channel>Sink。
- Source 负责收集数据,传递给 Channel。支持多种收集方式,比如 Avro、Thrift、Spooling Directory、Taildir、Kafka、HTTP 等。其中 Taildir Source:观察指定的文件,并在监测到添加的每个文件的新行后几乎实时的尾随它们;Spooling Directory Source:监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点,拷贝到 spool 目录下的文件不可以再打开编辑,并且 spool 目录下不可包含相应的子目录。
- Channel 作为数据通道,接受 Source 的数据并储存,传递给 Sink。Channel 中的数据会在被 Sink 消费前一直保存,等 Sink 成功把数据发送到下一跳 Channel 或最终目的地后才会删除缓存的数据。支持多种类型的 Channel,包括 Memory、JDBC、Kafka、File 等。其中Memory Channel 可以实现高速的吞吐,但是无法保证数据的完整性;File Channel 是一个持久化的隧道,它持久化所有的事件,并将其存储到磁盘中。
- Sink 消费 Channel 中的数据,传递到下一跳 Channel 或最终目的地,完成后将数据从Channel 中移除。支持多种类型的 Sink,包括 HDFS、Hive、Hbase、Kafka 等。Sink 在设置存储数据的时候,可以向文件系统、数据库、hadoop 存数据;在日志数据比较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到 Hadoop 中,便于日后进行相应的数据分析。
- Flume 支持多个 Agent 相连,形成多极 Agent。此时上一级 Sink 和下一集 Source 都必须使用 Avro 协议。使用多级 Flume 可以实现日志的聚合,第一层 Agent 接收日志,第二层Agent 统一处理。
- Flume 还支持将一个流从一个 Source 扇出到多个 Channel。有两种模式的扇出,复制和复用。在复制流程中,事件被发送到所有配置的通道。在复用的情况下,事件仅发送到合格信道的子集。
2. Flume组成、Put事务、Take事务
- taildir source- 断点续传、多目录- 哪个flume版本产生的? - Apache1.7、CDH1.6- 没有断点续传功能时怎么做的? - 自定义- taildir挂了怎么办 - 不会丢数据:断点续传- 怎么处理重复数据 - 不处理 - 生产环境下通常不处理,因为会影响传输效率- 处理 - 自身:在taildirsource里面增加自定义事务- 找兄弟:下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis)- taildir source 是否支持递归遍历文件夹读取文件? - 不支持- 自定义:递归遍历文件夹 +读取文件
- file channel、memory channel、kafka channel- file channel- 数据存储于磁盘,优势:可靠性高;劣势:传输速度低- 默认容量:100万event- FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量- memory channel- 数据存储于内存,优势:传输速度快;劣势:可靠性差- 默认容量:100个event- kafka channel- 数据存储于Kafka,基于磁盘,优势:可靠性高- 传输速度:kafka channel > memory channel+kafka sink 原因省去了sink- 生产环境如何选择- 如果下一级是kafka,优先选择kafka channel- 如果是金融、对钱要求准确的公司,选择file channel- 如果就是普通的日志,通常可以选择memory channel
- HDFS sink- 设置时间1小时、大小128m、event个数(0禁止)- hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0
- 事务- Source到Channel是Put事务- Channel到Sink是Take事务
5. Flume采集数据会丢失吗?
- 如果是FileChannel不会,Channel存储可以存储在File中,数据传输自身有事务。如果是MemoryChannel有可能丢。
Sqoop
1. Sqoop参数
/opt/module/sqoop/bin/sqoop import \
--connect \
--username \
--password \
--target-dir \
--delete-target-dir \
--num-mappers \
--fields-terminated-by \
--query "$2" ' and $CONDITIONS;'
2. Sqoop的工作原理是什么
- hadoop 生态圈上的数据传输工具。 可以将关系型数据库的数据导入非结构化的 hdfs、hive 或者 hbase 中,也可以将 hdfs 中的数据导出到关系型数据库或者文本文件中。 使用的是 mr 程序来执行任务,使用 jdbc 和关系型数据库进行交互。
- import 原理:通过指定的分隔符进行数据切分,将分片传入各个 map 中,在 map 任务中在每行数据进行写入处理没有 reduce。
- export 原理:根据要操作的表名生成一个 java 类,并读取其元数据信息和分隔符对非结构化的数据进行匹配,多个 map 作业同时执行写入关系型数据库
3. Sqoop导入导出Null存储的一致性问题
- Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string。
4. Sqoop数据导出一致性问题
5. Sqoop底层运行的任务是什么
- 只有Map阶段,没有Reduce阶段的任务。默认是4个MapTask。
6. Sqoop一天导入多少数据
- 100万日活=>10万订单,1人10条,每天1g左右业务数据
- Sqoop每天将1G的数据量导入到数仓。
7. Sqoop数据导出的时候一次执行多长时间
- 每天晚上00:30开始执行,Sqoop任务一般情况40 -50分钟的都有。取决于数据量(11:11,6:18等活动在1个小时左右)。
8. Sqoop数据导出Parquet
Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式
- 创建临时表,把Parquet中表数据导入到临时表,把临时表导出到目标表用于可视化
- Sqoop里面有参数,可以直接把Parquet转换为text
- ads层建表的时候就不要建Parquet表
DataX
1. 简单介绍一下DataX
- DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。(这是一个单机多任务的ETL工具)
2. DataX架构
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
3. DataX运行流程
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
4. DataX的使用
基于官网提供的JSON格式配置文件模版,选择对应的读写插件,根据实际情况修改相关配置,最后使用dataX命令提交运行即可
5. 数据采集流程和同步场景
Zookeeper
1. 选举机制
- 半数机制:2n+1,安装奇数台
- 10台服务器:3台
- 20台服务器:5台
- 100台服务器:11台
- 台数多,好处:提高可靠性;坏处:影响通信延时
2. 常用命令
- ls、get、create
3. Paxos算法(扩展
- 分布式系统中的节点通信存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。基于消息传递通信模型的分布式系统,不可避免的会发生以下错误:进程可能会慢、被杀死或者重启,消息可能会延迟、丢失、重复,在基础Paxos场景中,先不考虑可能出现消息篡改即拜占庭错误的情况。Paxos算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。
4. CAP法则
- CAP法则:强一致性、高可用性、分区容错性;
- Zookeeper符合强一致性、高可用性!
Kafka
1. Kafka架构
- 生产者、Broker、消费者、ZK;
- 注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息
2. Kafka的机器数量
- Kafka机器数量=2(峰值生产速度副本数/100)+ 1
3. 副本数设定
- 一般我们设置成2个或3个,很多企业设置为2个。
- 副本的优势:提高可靠性;副本劣势:增加了网络IO传输
4. Kafka压测
- Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
5. Kafka日志保存时间
- 默认保存7天;生产环境建议3天
6. Kafka中数据量计算
- 每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟
- 平均每秒钟:1150条
- 低谷每秒钟:50条
- 高峰每秒钟:1150条*(2-20倍)=2300条-23000条
- 每条日志大小:0.5k-2k(取1k)
- 每秒多少数据量:2.0M-20MB
7. Kafka的硬盘大小
- 每天的数据量100g * 2个副本 * 3天 / 70%(30%预留空间)
8. Kafka监控
- 公司自己开发的监控器,有实力的公司;
- 开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle
9. Kakfa分区数
- 创建一个只有1个分区的topic
- 测试这个topic的producer吞吐量和consumer吞吐量。
- 假设他们的值分别是Tp和Tc,单位可以是MB/s。
- 然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
- 例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;
- 分区数=100 / 20 =5分区
- 分区数一般设置为:3-10个
10. Kafka挂掉
- Flume记录
- 日志有记录
- 短期没事
11. Kafka数据丢失
- Ack=0,producer发送一次就不再发送了,不管是否发送成功。可能会丢数据
- Ack=1,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。如果leader成功写入后,还没来得及把数据同步到follower节点就挂了,这时候消息就丢失了。
- Ack=-1,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
- 只有当Ack=-1并且分区有多个副本的时候才能保证数据不丢失。
12. Kafka数据重复
- 幂等性+ack-1+事务
- Kafka数据重复,可以在下一级:SparkStreaming、flink或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
13. Kafka数据积压
- 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
14. Kafka参数优化
- Broker参数配置(server.properties)
# 1、日志保留策略配置
# 保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
# 2、Replica相关配置
# 默认副本1个
default.replication.factor:1
# 3、网络通信延时
# 当集群之间网络不稳定时,调大该参数
replica.socket.timeout.ms:30000
# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数
replica.lag.time.max.ms= 600000
- Producer优化(producer.properties)
compression.type:none
#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
- Kafka内存调整(kafka-server-start.sh)
# 默认内存1个G,生产环境尽量不要超过6个GexportKAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
15. Kafka高效读写数据
- Kafka本身是分布式集群,同时采用分区技术,并发度高。
- 顺序写磁盘,Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。
- 批量读写
- 零拷贝技术
14. Kafka数据有序性
- 单分区内有序
- 多分区,分区与分区间无序
15、重点总结:
1、kafka中的topic可以被多个消费者消费吗
可以,可以通过单播模式(只有一个消费者组)和广播模式实现(多个消费者组)
2、Kafka读取数据快的原因
1、顺序写入到磁盘
2、分区并行处理
3、零拷贝,允许数据在磁盘和网络之间直接传输,不需要通过用户空间进行复制,提高数据的传输速率
4、充分利用page cache
5、数据压缩
6、批处理 3、可以删除某个topic吗?
可以删除
4、Kafka的特点
高吞吐量,持久化数据存储,可扩展性
5、怎么保证Kafka中的数据是有序?
全局有序:只有一个生产者,一个分区,一个消费者,使用单线程
局部有序:只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,
根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。
此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。
6、如何保证Kafka消费数据不重复?
幂等性,确保Kafka多次发送同一条消息,只有一个成功
事务,生产者允许多个消息写入Kafka时,要么全部成功要么全部失败,确保数据的唯一性
ack 机制+副本
ack=0,生产者只负责生产数据,不负责数据是否写入成功,不能保证数据是否丢失,性能好
ack=1(默认),当主分区写入成功,就返回成功
ack=-1/all,生产者生产数据必须等待到所有的副本都同步成功,才返回成功,不会丢失数据,性能差
HBase
1. HBase存储结构
2. RowKey设计原则
- rowkey长度原则
- rowkey散列原则 1. Hash2. 时间戳反转3. 加盐
- rowkey唯一原则
3. Phoenix二级索引
对于HBase而言,如果想精确地定位到某行记录,唯一的办法是通过rowkey来查询。如果不通过rowkey来查找数据,就必须逐行地比较每一列的值,即全表扫瞄。对于较大的表,全表扫瞄的代价是不可接受的。但是,很多情况下,需要从多个角度查询数据。例如,在定位某个人的时候,可以通过姓名、身份证号、学籍号等不同的角度来查询,要想把这么多角度的数据都放到rowkey中几乎不可能(业务的灵活性不允许,对rowkey长度的要求也不允许)
二级索引本质上是建立行键与列值的映射关系
- 全局索引::适用于读多写少的业务场景,因为所有对数据表的更新操作,都会引起索引表的更新
- 本地索引:适用于写操作频繁以及空间受限制的场景,索引数据和数据表的数据存放在相同的服务器中,这样避免了在写操作的时候往不同服务器的索引表中写索引带来的额外开销0
4. Hbase读写流程
- 读流程
- 首先从 ZooKeeper 找到 meta 表的 region 位置,然后读取 hbase:meta 表中的数据, hbase:meta 表中存储了用户表的 region 信息
- 根据要查询的 namespace 、表名和 rowkey 信息,找到写入数据对应的 Region 信息
- 找到这个 Region 对应的 RegionServer ,然后发送请求
- 查找对应的 Region
- 先从 MemStore 查找数据,如果没有,再从 BlockCache 上读取
- 如果BlockCache 没有,再从storeFile读取
- 写流程
- 首先从 ZooKeeper 找到 hbase:meta 表的 Region 位置,然后读取 hbase:meta 表中的数据, hbase:meta 表中存储了用户表的 Region 信息
- 根据 namespace 、表名和 rowkey 信息找到写入数据对应的 Region 信息
- 找到这个 Region 对应的 RegionServer ,然后发送请求
- 把数据分别写到 HLog ( WriteAheadLog )和 MemStore 各一份
- MemStore 达到阈值后把数据刷到磁盘,生成 StoreFile 文件
- 删除 HLog 中的历史数据
5. Hbase为什么写比读快
- Hbase底层的存储引擎为LSM-Tree(Log-Structure‘d Merge-Tree)。LSM核心思想的核心就是放弃部分读能力,换取写入的最大化能力。LSM Tree它的核心思路其实非常简单,就是假定内存足够大,因此不需要每次有数据更新就必须将数据写入到磁盘中,而可以先将最新的数据驻留在内存中,等到积累到最后多之后,再使用归并排序的方式将内存内的数据合并追加到磁盘队尾(因为所有待排序的树都是有序的,可以通过合并排序的方式快速合并到一起)。另外,写入时候将随机写入转换成顺序写,数据写入速度也很稳定。/?
- 读取的时候稍微麻烦,需要合并磁盘中历史数据和内存中最近修改操作,所以写入性能大大提升,读取时可能需要先看是否命中内存,否则需要访问较多的磁盘文件。极端的说,基于LSM树实现的HBase的写性能比MySQL高了一个数量级,读性能低了一个数量级。
Scala
1. 函数式编程
- 高阶函数、匿名函数、函数柯里化、函数参数以及函数至简原则。
2. 集合
- List Map Set Tuple
- 可变集合,不可变集合
- 常用的方法,(map,flatmap,filter,take,last,head)
3. 隐式转换
- 掌握隐式方法、隐式参数、隐式类,
- 隐式转换的作用是可以给对象动态增加新的方法
4. 模式匹配
- 可以匹配基本数据类型,字符串,枚举,样例类,类型
Spark
1. RDD的五大特性
- RDD由很多partition构成,有多少partition就对应有多少task
- 算子实际上是作用在每一个分区上,
- RDD之间有依赖关系,宽依赖和窄依赖,用于切分Stage
- Spark默认是hash分区,ByKey类的算子只能作用在kv格式的rdd上
- Spark为task的计算提供了最佳的计算位置,移动计算而不是移动数据
2. Spark常用算子
- map
- flatMap
- filter
- Union
- join
- groupBy
- groupByKey
- reudceByKey
- sortByKey
- foreach
- foreachPaetition
- saveAsTextFile
3. reduceByKey和groupByKey的区别
- reduceByKey会在map端做预聚合,可以减少shuffle过程中传输的数据量,提高执行效率,groupByKey不能做预聚合
- 在某些业务场景reduceByKey没办法实现,需要使用groupByKey
- 尽量使用reduceByKey代替groupByKey
4. Spark缓存
缓存级别
级别使用空间CPU时间是否在内存中是否在磁盘上备注MEMORY_ONLY高低是否MEMORY_ONLY_2高低是否数据存2份MEMORY_ONLY_SER_2低高是否数据序列化,数据存2份MEMORY_AND_DISK高中等部分部分如果数据在内存中放不下,则溢写到磁盘MEMORY_AND_DISK_2高中等部分部分数据存2份MEMORY_AND_DISK_SER低高部分部分MEMORY_AND_DISK_SER_2低高部分部分数据存2份DISK_ONLY低高否是DISK_ONLY_2低高否是数据存2份NONE不缓存OFF_HEAP堆外内存缓存级别选择
Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择
- 如果使用 MEMORY_ONLY 存储在内存中的 RDD / DataFrame 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD / DataFrame 上的操作以最快的速度运行。
- 如果内存不能全部存储 RDD / DataFrame ,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
- 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
- 如果想快速还原故障,建议使用多副本存储级别 MEMORY_ONLY_2 / MEMORY_ONLY_SER_2 。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
5. Spark部署方式
- Local::运行在一台机器上,测试使用。
- Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。
- Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。yarn-clientDriver在本地启动,在本地可以看到详细日志,如果在本地启动太多Spark任务会导致本地网卡流量剧增,不适合上线使用
- Mesos:国内大环境比较少用。
6. Spark任务调度和资源调度流程
- Yarn-client模式
- 重试机制:如果task执行失败taskscheduler会重试3次,如果还失败,DAGscheduler会重试4次 如果是因为shuffle过程中文件找不到的异常,taskscheduler不负责重试task,而是由DAGscheduler重试上一个stage
- 推测执行:如果有的task执行很慢,taskscheduler会在发生一个一摸一样的task到其它节点中执行,让多个task竟争,谁先执行完成以谁的结果为准
- 资源调度
yarn-client为例
1、在本地启动Driver
2、Driver向ResourceManager申请资源
3、RM分配一个节点启动ApplicationMaster
4、AM向RM申请启动Executor
5、RM随机分配一批节点启动Executor
6、Executor反向注册给Driver
- 任务调度
1、当遇到一个Action算子时,开始任务调度
2、构建DAG有向无环图
3、将DAG传递给DAGScheduler
4、DAGScheduler根据宽窄依赖切分Stage
5、DAGScheduler将Stage以taskSet的形式发送给TaskScheduler
6、TaskScheduler将task发送到Executor中执行,会尽量将task发送到数据所在的节点执行
7. Spark Shuffle
Spark的两种核心Shuffle(HashShuffle与SortShuffle)的工作流程(包括未优化的HashShuffle、优化的HashShuffle、普通的SortShuffle与bypass的SortShuffle)(重点)
- 未经优化的HashShuffle:
- 优化后的Shuffle
- 普通的SortShuffle:
- 当 shuffle read task 的 数 量 小 于 等 于 spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。
7. Repartition和Coalesce区别
- 关系:两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
- 区别:repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle,一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
8. cache和checkpoint区别
- 都是做RDD持久化的
- cache:内存,不会截断血缘关系,使用计算过程中的数据缓
- checkpoint:磁盘,截断血缘关系,在checkpoint之前必须没有任何任务提交才会生效,checkpoint过程会额外提交一次任务。
9. 广播变量和累加器
- 累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
- 共享变量出现的原因:1. 通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响Driver中的对应变量。2. Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
10. 当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数
- 使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接
11. 简述SparkSQL中RDD、DataFrame、DataSet三者的区别与联系?
- RDD优点:1. 编译时类型安全2. 编译时就能检查出类型错误3. 面向对象的编程风格4. 直接通过类名点的方式来操作数据缺点:1. 序列化和反序列化的性能开销2. 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。3. GC的性能开销,频繁的创建和销毁对象, 势必会增加GC
- DataFrame1. DataFrame引入了schema和off-heap2. schema : DataFrame每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
- DataSet1. DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。2. 当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。
- 相互转换
12. Spark Sql默认并行度
- 参数spark.sql.shuffle.partitions 决定 默认并行度200
13. Spark优化
- 代码优化1. 避免创建重复的RDD2. 尽可能复用同一个RDD3. 对多次使用的RDD进行持久化 - 默认情况下,性能最高的当然是MEMORY_ONLY- 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级4. 使用高性能的算子 - 使用reduceByKey/aggregateByKey替代groupByKey- 使用foreachPartitions替代foreach Action算子- 使用filter之后进行coalesce操作5. 广播大变量 - 会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能6. 使用Kryo优化序列化性能7. 将reduce jon 转化成map join - Spark sql 中实现方式 /*+broadcast(a) */- RDD中将小表广播
- 参数优化1. –num-executors 50 \ executor的数量2. –executor-cores 2 \ executor的核数3. –executor-memory 4G \ executor的内存4. –driver-memory 2G \ driver的内存5. –conf spark.sql.shuffle.partitions=200 \ spark sql shuffle之后的分区数6. –conf spark.storage.memoryFraction=0.6 \ 用于缓存的内存占比7. –conf spark.shuffle.memoryFraction=0.2 \ 用于shuffle的内存占比8. –conf spark.locality.wait=10 \ task在executor中执行之前的等待时间,默认3秒9. –conf spark.yarn.executor.memoryOverhead=2048 \ 堆外内存10. –conf spark.network.timeout=120s \ spark网络连接的超时时间
14. Spark数据倾斜
- Spark中的数据倾斜,表现主要有下面几种:> 数据倾斜产生的原因:1、数据分布不均,2,同时产生了shuffle1. Executor lost,OOM,Shuffle过程出错;
- lDriver OOM;
- 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束;
- 正常运行的任务突然失败
- 数据倾斜优化 1. 使用Hive ETL预处理数据 - 适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。- 实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。- 方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。- 方案优缺点: 1. 优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。2. 缺点:治标不治本,Hive ETL中还是会发生数据倾斜。- 方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。- 项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。2. 过滤少数导致倾斜的key - 方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。- 方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。- 方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。- 方案优缺点: 1. 优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。2. 缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。- 方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。3. 提高shuffle操作的并行度 - 方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。- 方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,默认是200,对于很多场景来说都有点过小。- 方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。- 方案优缺点: 1. 优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。2. 缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。- 方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。4. 双重聚合 (局部聚合+全局聚合) - 方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。- 方案实现思路:这个方案的核心实现思路就是进行两阶段聚合:第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。- 方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。- 方案优缺点: 1. 优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。2. 缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。5. 将reduce join转为map join - 方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。- 方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量,广播给其他Executor节点;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。- 方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。- 方案优缺点: 1. 优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。2. 缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。6. 采样倾斜key并分拆join操作 - 方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。- 方案实现思路:对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀;而不会导致倾斜的大部分key形成另外一个RDD。接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0
n的前缀;不会导致倾斜的大部分key也形成另外一个RDD。再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join即可。最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。- 方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。- 方案优缺点: 1. 优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。2. 缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。7. 使用随机前缀和扩容RDD进行join - 方案实现思路:该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0n的前缀。最后将两个处理后的RDD进行join即可。
14. Spark Streaming背压机制
- 把spark.streaming.backpressure.enabled 参数设置为ture,开启背压机制后Spark Streaming会根据延迟动态去kafka消费数据,上限由spark.streaming.kafka.maxRatePerPartition参数控制,所以两个参数一般会一起使用
Flink
1. 简单介绍一下 Flink】、
- Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:
- DataSet API(已过时), 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
- DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
- Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
- 此外,Flink 还针对特定的应用领域提供了领域库,例如: Flink ML,Flink 的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。 Gelly,Flink 的图计算库,提供了图计算的相关API及多种图计算算法实现。根据官网的介绍,Flink 的特性包含:
2. Flink相比传统的Spark Streaming区别?
这个问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来:Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型。
- 架构模型Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。
- 任务调度Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。
- 时间机制Spark Streaming 支持的时间机制有限,只支持处理时间。 Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。
- 容错机制对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用两阶段提交协议来解决这个问题。
3. Flink技术栈
- 自下而上,每一层分别代表:Deploy 层:该层主要涉及了Flink的部署模式,在上图中我们可以看出,Flink 支持包括local、Standalone、Cluster、Cloud等多种部署模式。Runtime 层:Runtime层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。API层:API 层主要实现了面向流(Stream)处理和批(Batch)处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API,后续版本,Flink有计划将DataStream和DataSet API进行统一。Libraries层:该层称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
4. Flink 的运行必须依赖 Hadoop组件吗
- Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
5. 你们的Flink集群规模多大
- 大家注意,这个问题看起来是问你实际应用中的Flink集群规模,其实还隐藏着另一个问题:Flink可以支持多少节点的集群规模?在回答这个问题时候,可以将自己生产环节中的集群规模、节点、内存情况说明,同时说明部署模式(一般是Flink on Yarn),除此之外,用户也可以同时在小集群(少于5个节点)和拥有 TB 级别状态的上千个节点上运行 Flink 任务。
6. Flink的基础编程模型了解吗
- 上图是来自Flink官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。
7. Flink集群有哪些角色?各自有什么作用?
- Flink 程序在运行时主要有 TaskManager,JobManager,Client三种角色。其中JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。TaskManager是实际负责执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。
8. 说说 Flink 资源管理中 Task Slot 的概念
- 在Flink架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。
- Flink程序多个task可以共享同一个Slot,有利于资源最大化利用
8. Flink 的常用算子
- Flink 最常用的常用算子包括:Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
9. Flink的并行度
- Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度:1. 操作算子层面(Operator Level)2. 执行环境层面(Execution Environment Level)3. 客户端层面(Client Level)4. 系统层面(System Level)
- 需要注意的优先级:算子层面>环境层面>客户端层面>系统层面
- 并行度直接决定了Flink程序需要申请多少资源
10. Flink重启策略
- 固定延迟重启策略(Fixed Delay Restart Strategy)
- 故障率重启策略(Failure Rate Restart Strategy)
- 没有重启策略(No Restart Strategy)
- Fallback重启策略(Fallback Restart Strategy)
11. Flink广播变量
- Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
12. Flink窗口
滚动窗口
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
滑动窗口
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
会话窗口
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
全局窗口
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据
13. 说说Flink中的状态存储
- Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
- Flink1.15改版了,新版本改成了两种, 1. HashMapStateBackend:先将状态放在TaskManager的内存在,checkpoint触发时将状态持久化道hdfs中,状态大小会受到内存的限制,效率较高2. EmbeddedRocksDBStateBackend:先将数据保存在TaskManager本地RocksDB数据库中,checkpoint触发时将状态持久化道hdfs中
14. Flink中的时间有哪几类
- Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。如果以 EventTime 为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果以 IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。
15. Flink 中的Watermark是什么概念,起到什么作用
- Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。
- 水位线默认等于最新数据的时间戳,水位线只能增长不能降低。
- 由于数据在传输的过程中可能会乱序,为了解决乱序问题,可以将水位线前移,延迟窗口的计算,避免数据丢失
16. TableEnvironment
- TableEnvironment是Table API和SQL集成的核心概念。这个类主要用来:
- 在内部catalog中注册表
- 注册外部catalog
- 执行SQL查询
- 注册用户定义(标量,表或聚合)函数
- 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
17. Flink SQL的实现原理是什么?是如何实现 SQL 解析的呢
- 首先 Flink SQL解析是基于Apache Calcite这个开源框架
- 基于此,一次完整的SQL解析过程如下: 1. 用户使用对外提供Stream SQL的语法开发业务应用2. 用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划3. 采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划4. 对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行
18. Flink是如何支持批流一体的
- 所谓的流批统一就是同一套Api既能做流处理由能做批处理
- 在新版Flink中已经删除了DataSetApi ,统一使用DataStream Api/Table Api进行流处理和批处理1. DataStreamApi
# 批处理env.setRuntimeMode(RuntimeExecutionMode.BATCH);# 流处理 (默认)env.setRuntimeMode(RuntimeExecutionMode.STREAMING);# 根据数据源自动选择env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);# 注意,批处理模式只能用于处理有界流
2. Table Api-- 流处理(默认)SET'execution.runtime-mode'='streaming';-- 批处理SET'execution.runtime-mode'='batch';
19. Flink是如何做到高效的数据交换的
- 在一个Flink Job中,数据需要在不同的task中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。默认是32KB,200毫秒
20. Flink是如何做容错的
- Flink 实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。
21. Flink 分布式快照的原理是什么
- Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。
- 核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。
22. 确保精确一次(exactly once)
- 当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:1. Flink 不会从快照中进行恢复(at most once)2. 没有任何丢失,但是你可能会得到重复冗余的结果(at least once)3. 没有丢失或冗余重复(exactly once)
- Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。
- Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置
CheckpointingMode.AT_LEAST_ONCE
关闭 Barrier 对齐来提高性能。 - 为了实现端到端的精确一次,以便 sources 中的每个事件都仅精确一次对 sinks 生效,必须满足以下条件:1. 你的 sources 必须是可重放的,并且2. 你的 sinks 必须是事务性的(或幂等的)
23. 说说 Flink的内存管理是如何做的?
- Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:
- Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
- Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。
- User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。
24. Flink中的Window出现了数据倾斜,你有什么解决办法?
- window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:1. 在数据进入窗口前做预聚合2. 重新设计窗口聚合的key
25. Flink中在使用聚合函数 GroupBy、Distinct、KeyBy 等函数时出现数据热点该如何解决?
- 数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:
- 在业务上规避这类问题- 例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。
- Key的设计上- 把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。
- 参数设置- Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。
26. Flink任务延迟高,想解决这个问题,你会如何入手?
- 在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
27. Flink是如何处理反压的?
- 当下游消费的速率比上游生产的速率小时flink会发生反压,反压发生之后会向上游传递,一直到source task,source task会降低拉取数据的速度
- 解决反压 1. 增加资源,提高并行度2. 优化代码,减少状态使用3. 如果将数据写入到mysql导致的反压,可以增加batch的大小和开始预聚合4. 如果是将数据写入kafka导致反压,可以增加kafka的分区数,提高并行度
28. Flink Job的提交流程
- 用户提交的Flink Job会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是基于Akka工具包的,是通过消息驱动。整个Flink Job的提交还包含着ActorSystem的创建,JobManager的启动,TaskManager的启动和注册。
项目架构
1. 阿里云大数据架构nnnj
- 离线实时一体化数仓
- 基于云上提供的近乎无限计算和存储资源,结合云原生数据仓库MaxCompute、实时计算Flink版、交互式分析MC-Hologres以及数据开发与治理DataWorks,打造一体化的新一代数据仓库架构,同时满足离线和实时分析需求。
- 实时数仓架构
- 整合阿里云实时计算Flink版+交互式分析MC-Hologres两大飞天大数据实时计算+分析引擎利器,实现新一代实时分析数据仓库架构。
2. 开源大数据架构
离线大数据架构
Lambda 架构随着大数据应用的发展,人们逐渐对系统的实时性提出了要求,为了计算一些实时指标,就在原来离线数仓的基础上增加了一个实时计算的链路,并对数据源做流式改造(即把数据发送到消息队列),实时计算去订阅消息队列,直接完成指标增量的计算,推送到下游的数据服务中去,由数据服务层完成离线&实时结果的合并。
Kappa 架构- Kappa 架构可以认为是 Lambda 架构的简化版(只要移除 lambda 架构中的批处理部分即可)
3. 框架版本选型
- Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)
- CDH6.3.2:国内使用最多的版本,但CM不开源,但其实对中、小公司使用来说没有影响(建议使用)10000美金一个节点 CDP7.0
- HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少
4. 服务器选型
服务器使用物理机还是云主机
- 机器成本考虑1. 物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,惠普品牌。一般物理机寿命5年左右。2. 云主机,以阿里云为例,差不多相同配置,每年5W
- 运维成本考虑:1. 物理机:需要有专业的运维人员(1万*13个月)、电费(商业用户)、安装空调2. 云主机:很多运维工作都由阿里云已经完成,运维相对较轻松
- 企业选择1. 金融有钱公司和阿里没有直接冲突的公司选择阿里云(上海)2. 中小公司、为了融资上市,选择阿里云,拉倒融资后买物理机。3. 有长期打算,资金比较足,选择物理机。
5. 集群规模
6. 数仓分层
- ODS层做了哪些事?1. 保持数据原貌,不做任何修改2. 压缩采用LZO,压缩比是100g数据压缩完10g左右。3. 创建分区表
- DWD层做了哪些事?1. 数据清洗- 空值去除- 过滤核心字段无意义的数据,比如订单表中订单id为null,支付表中支付id为空- 将用户行为宽表和业务表进行数据一致性处理2. 清洗的手段- hive- Spark sql- Flink- rdd3. 脱敏- 对手机号、身份证号等敏感数据脱敏4. 维度退化- 对业务数据传过来的表进行维度退化和降维。(商品一级二级三级、省市县、年月日)5. 压缩LZO6. 列式存储parquet
- DWS层做了哪些事1. DWS层有3-5张宽表(处理100-200个指标 70%以上的需求) - 具体宽表名称:用户行为宽表,用户购买商品明细行为宽表,商品宽表,购物车宽表,物流宽表、登录注册、售后等。2. 哪个宽表最宽?大概有多少个字段? - 最宽的是用户行为宽表。大概有60-100个字段3. 具体用户行为宽表字段名称 - 评论、打赏、收藏、关注–商品、关注–人、点赞、分享、好价爆料、文章发布、活跃、签到、补签卡、幸运屋、礼品、金币、电商点击、gmv
元数据管理(Atlas血缘系统)
- Atlas 是一个可伸缩和可扩展的核心基础治理服务集合 ,使企业能够有效地和高效地满足 Hadoop 中的合规性要求,并允许与整个企业数据生态系统的集成。
- Apache Atlas为组织提供开放式元数据管理和治理功能,用以构建其数据资产目录,对这些资产进行分类和管理,并为数据科学家,数据分析师和数据治理团队提供围绕这些数据资产的协作功能。
- 参考连接:https://www.cnblogs.com/mantoudev/p/9986408.html
数据质量监控(Griffin)
- 为什么要做数据质量监控1. 数据不一致企业早期没有进行统一规划设计,大部分信息系统是逐步迭代建设的,系统建设时间长短各异,各系统数据标准也不同。企业业务系统更关注业务层面,各个业务系统均有不同的侧重点,各类数据的属性信息设置和要求不统一。2. 数据不完整由于企业没有统一的录入工具和数据出口,业务系统不需要的信息就不录,造成同样的数据在不同的系统有不同的属性信息,数据完整性无法得到保障。3. 数据不合规没有统一的数据管理平台和数据源头,数据全生命周期管理不完整,同时企业各信息系统的数据录入环节过于简单且手工参与较多,就数据本身而言,缺少是否重复、合法、对错等校验环节,导致各个信息系统的数据不够准确,格式混乱,各类数据难以集成和统一,没有质量控制导致海量数据因质量过低而难以被利用,且没有相应的数据管理流程。4. 数据不可控企业各单位和部门关注数据的角度不一样,缺少一个组织从全局的视角对数据进行管理,导致无法建立统一的数据管理标准、流程等,相应的数据管理制度、办法等无法得到落实。同时,企业基础数据质量考核体系也尚未建立,无法保障一系列数据标准、规范、制度、流程得到长效执行。5. 数据冗余各个信息系统针对数据的标准规范不一、编码规则不一、校验标准不一,且部分业务系统针对数据的验证标准严重缺失,造成了企业顶层视角的数据出现“一物多码”、“一码多物”等现象。
- 建设方法
- 质量监管平台建设,主要包含如下8大流程步骤:
- 质量需求:发现数据问题;信息提报、收集需求;检核规则的需求等;
- 提炼规则:梳理规则指标、确定有效指标、检核指标准确度和衡量标准;
- 规则库构建:检核对象配置、调度配置、规则配置、检核范围确认、检核标准确定等;
- 执行检核:调度配置、调度执行、检核代码;
- 问题检核:检核问题展示、分类、质量分析、质量严重等级分类等;
- 分析报告:数据质量报告、质量问题趋势分析,影响度分析,解决方案达成共识;
- 落实处理:方案落实执行、跟踪管理、解决方案Review及标准化提炼;
- 知识库体系形成:知识经验总结、标准方案沉淀、知识库体系建设。
- 监控指标1. 单表数据量监控一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值SQL结果:var 数据量 = select count()from 表 where 时间等过滤条件报警触发条件设置:如果数据量不在[数值下限, 数值上限], 则触发报警同比增加:如果((本周的数据量 - 上周的数据量)/上周的数据量100)不在 [比例下线,比例上限],则触发报警环比增加:如果((今天的数据量 - 昨天的数据量)/昨天的数据量*100)不在 [比例下线,比例上限],则触发报警报警触发条件设置一定要有。如果没有配置的阈值,不能做监控日活、周活、月活、留存(日周月)、转化率(日、周、月)GMV(日、周、月)复购率(日周月)2. 单表空值检测某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内目标字段:选择要监控的字段,不能选“无”SQL结果:var 异常数据量 = select count(*) from 表 where 目标字段 is null单次检测:如果(异常数据量)不在[数值下限, 数值上限],则触发报警3. 单表重复值检测一个或多个字段是否满足某些规则目标字段:选择要监控的字段,group by 这里的字段列表后,没有重复单次检测:如果(异常数据量)不在[数值下限, 数值上限], 则触发报警4. 单表值域检测一个或多个字段没有重复记录目标字段:选择要监控的字段,支持多选检测规则:填写“目标字段”要满足的条件。其中$1表示第一个目标字段,$2表示第二个目标字段,以此类推。上图中的“检测规则”经过渲染后变为“delivery_fee = delivery_fee_base+delivery_fee_extra”阈值配置与“空值检测”相同5. 跨表数据量对比主要针对同步流程,监控两张表的数据量是否一致SQL结果:count(本表) - count(关联表)阈值配置与“空值检测”相同
数据治理
- 包括:数据质量管理、元数据管理、权限管理(ranger sentry)
- CDH cloudmanager; HDP ambari
- 数据治理是一个复杂的系统工程,涉及到企业和单位多个领域,既要做好顶层设计,又要解决好统一标准、统一流程、统一管理体系等问题,同时也要解决好数据采集、数据清洗、数据对接和应用集成等相关问题。
- 数据治理实施要点主要包含数据规划、制定数据标准、整理数据、搭建数据管理工具、构建运维体系及推广贯标六大部分,其中数据规划是纲领、制定数据标准是基础、整理数据是过程、搭建数据管理工具是技术手段、构建运维体系是前提,推广贯标是持续保障。
数据湖
数据湖(Data Lake)是一个存储企业的各种各样原始数据的大型仓库,其中的数据可供存取、处理、分析及传输。
目前,Hadoop是最常用的部署数据湖的技术,所以很多人会觉得数据湖就是Hadoop集群。数据湖是一个概念,而Hadoop是用于实现这个概念的技术。
数据仓库****数据湖主要处理历史的、结构化的数据,而且这些数据必须与数据仓库事先定义的模型吻合。能处理所有类型的数据,如结构化数据,非结构化数据,半结构化数据等,数据的类型依赖于数据源系统的原始数据格式。非结构化数据(语音、图片、视频等)数据仓库分析的指标都是产品经理提前规定好的。按需分析数据。根据海量的数据,挖掘出规律,反应给运营部门。拥有非常强的计算能力用于处理数据
面试说明
简历
- 基本信息
- 专业技能:这个技术特点+可以干什么+延申1 熟悉java,可以进行相关api使用。熟悉scala可以进行spark相关编程编写。 2 熟悉Linux常用命令,shell基本编程。可以进行相关脚本的开发和部署。 3 熟悉关系型数据mysql,小型缓存队列redis。进行常规的数据源对接。 4 熟练搭建hadoop生态体系的集群组件以及cdh版本。可以进行简单的日常维护。 5 熟悉mapreduce原理及相关编程,熟悉hdfs数据分布式存储及相关操作。 6 熟悉数据仓库建模,可以使用hive进行离线数仓的建设及大规模数据的存储设计,熟练使用Hive-SQL进行相关业务指标的统计分析 7 熟练使用hbase,kafka进行相关实时应用场景的开发。熟悉hbase的原理及协处理器的开发。 8 熟悉数据集成工具,可以使用Sqoop,flume,datax,cannal做数据同步和ETL相关操作, 将关系型数据库和数仓之间进行数据打通。可以进行全量和增量数据同步。 9 熟悉Saprk-Core/Spark——SQL可以进行离线分析,可以使用Spark-Streanming进行实时数据分析 10 熟悉机器学习常用算法,如朴素贝叶斯,支持向量机,逻辑回归等,可以使用python进行相关建模分析。 11 熟悉Flink的集群的搭建,熟悉Flink种TableAPI,Window&TIme的使用,可以进行流式计算场景的分析开发。 12 熟悉数据中台相关概念,熟悉阿里云常用的大数据组件,如Maxcompute、Dataworks、Datahub、RDS相关使用。
- 项目背景- 项目的流程【1,2,3,但是拒绝都按照数仓分层去写。按照自己的理解,去叙述项目流程。用到的专业技术名称+使用这些技术做了什么+具体的场景指标】- 你负责部分:数仓的相关指标分析,使用hive进行分析,sparkSQL。- 遇到的问题:
- 自我评价:忽略。
- 简历名称:姓名+大数据开发+城市.pdf
- 投简历:boss,拉钩,智联,51,新安人才网。
- 搜索的关键词: 大数据,hadoop,spark,数据仓库,ETL。多投简历。
- 手机端:沟通。 介绍:您好+你会的东西+期望有面试机会。
- 联系方式:
面试
一般第一轮是电话面试。
目的:拿offer。
基本基调:自信+自然+流畅。
请做下自我介绍:
1 基本信息+会的东西+客套。
2 当面试官问你不会的问题。
3 不要和面试官争论
4 学校的成绩怎么?
5 死问java和mysql怎么办?
准备:背书包,纸质的笔记本,简历若干份。
前台:填表格。基本信息。
很有礼貌:不要抖腿。不要浮夸。
面试过程种,主动学习的能力。
1 你平时是怎么学习的。博客,技术帖子,技术视频。官网。
2 遇到困难怎么解决的。多和组长汇报你的工作,体现你的工作态度。
3 你有什么问题想要问我?
1 假设有机会参加我们公司工作,我具体要负责哪些工作内容呢。
2 我们这个组大数据开发有多少人呢,我要负责的开发内容是是什么
加这个面试官的微信。
人事聊:
1 有没有对象?
2 为什么来杭州?
3 能不能加班??
4 能不能出差??
5 能不能接受外包和驻场。
6 期望的薪水?实习3-5K。转正期望可以8k。
版权归原作者 尔等都不懂涐 所有, 如有侵权,请联系我们删除。