MaxCompute
关键字
odps (Open Data Platform and Service)
分区
分区表是指拥有分区空间的表,即在创建表时指定表内的一个或者某几个字段作为分区列。分区表实际就是对应分布式文件系统上的独立的文件夹,一个分区对应一个文件夹,文件夹下是对应分区所有的数据文件。
使用限制
- 单表分区层级最多为6级。
- 单表分区数最大值为60000个。
- 单次查询允许查询最多的分区个数为10000个。
- STRING分区类型的分区值不支持使用中文。
基本语法
join
left join
right join
union 并集
求两个数据集的并集,即将两个数据集合并成一个数据集。必须所有字段都一样
intersect 交集
求两个数据集的交集,即输出两个数据集均包含的记录。
except 补集
求第二个数据集在第一个数据集中的补集,即输出第一个数据集包含而第二个数据集不包含的记录。
left anti join 取左表在右表中没有数据
当join条件不成立时,返回左表中的数据。如果左表中满足指定条件的某行数据没有在右表中出现过,则此行保留在结果集中。在MaxCompute中,与left anti join类似的操作为not in subquery
left semi join 取左表在右表中出现的数据
当join条件成立时,返回左表中的数据。如果左表中满足指定条件的某行数据在右表中出现过,则此行保留在结果集中。在MaxCompute中,与left semi join类似的操作为in subquery
set odps.sql.allow.fullscan=true;
select * from sale_detail;--返回结果。
+------------+-------------+-------------+------------+------------+| shop_name | customer_id | total_price | sale_date | region |+------------+-------------+-------------+------------+------------+| s1 | c1 |100.1|2013| china || s2 | c2 |100.2|2013| china || s3 | c3 |100.3|2013| china |+------------+-------------+-------------+------------+------------+
select * from sale_detail_sj;--返回结果。
+------------+-------------+-------------+------------+------------+| shop_name | customer_id | total_price | sale_date | region |+------------+-------------+-------------+------------+------------+| s1 | c1 |100.1|2013| china || s2 | c2 |100.2|2013| china || s5 | c2 |100.2|2013| china || s2 | c2 |100.2|2013| china |+------------+-------------+-------------+------------+------------+
select * from sale_detail a left semi join sale_detail_sj b on a.total_price=b.total_price;--返回结果
+------------+-------------+-------------+------------+------------+| shop_name | customer_id | total_price | sale_date | region |+------------+-------------+-------------+------------+------------+| s2 | c2 |100.2|2013| china || s1 | c1 |100.1|2013| china |+------------+-------------+-------------+------------+------------+
select * from sale_detail a left anti join sale_detail_sj b on a.total_price=b.total_price;--返回结果
+------------+-------------+-------------+------------+------------+| shop_name | customer_id | total_price | sale_date | region |+------------+-------------+-------------+------------+------------+| s3 | c3 |100.3|2013| china |+------------+-------------+-------------+------------+------------+
mapjoin
整个JOIN过程包含Map、Shuffle和Reduce三个阶段。通常情况下,join操作在Reduce阶段执行表连接。
mapjoin在Map阶段执行表连接,而非等到Reduce阶段才执行表连接,可以缩短大量数据传输时间,提升系统资源利用率,从而起到优化作业的作用。
在对大表和一个或多个小表执行join操作时,mapjoin会将您指定的小表全部加载到执行join操作的程序的内存中,在Map阶段完成表连接从而加快join的执行速度。
此外,MaxCompute SQL不支持在普通join的on条件中使用不等值表达式、or等逻辑复杂的join条件,但是在mapjoin中可以进行上述操作。
语法
/+ mapjoin(<table_name>) /,多张表操作 /+ mapjoin(a,b,c)/
select /*+ mapjoin(a) */
a.shop_name,
a.total_price,
b.total_price
from sale_detail_sj a join sale_detail b
on a.total_price < b.total_price or a.total_price + b.total_price <500;
distinctMapjoin
Join两侧的表数据量要求不同,大表侧数据在10 TB以上,中表侧数据在[1 GB, 100 GB]范围内。
语法
-- 推荐,指定shard_count(replica_count默认为1)
/*+distmapjoin(a(shard_count=5))*/-- 推荐,指定shard_count和replica_count
/*+distmapjoin(a(shard_count=5,replica_count=2))*/-- distmapjoin多个小表
/*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */-- distmapjoin和mapjoin混用
/*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/
使用示例
insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130')
select /*+ distmapjoin(t2(shard_count=35)) */ t1.*
from
(
select nid, doc, type
from search_ods.dump_lasttable where ds='20211203'
)t1
join
(
select distinct item_id
from tbcdm.dim_tb_itm
where ds='20211130'
and bc_type='B'
and is_online='Y')t2
on t1.nid=t2.item_id;
task执行流程
- 输入数据:对文本进行分片,将每片内的数据作为单个Map Worker的输入。分片完毕后,多个Map Worker便可以同时工作。在正式执行Map前,需要将输入数据进行分片。所谓分片,就是将输入数据切分为大小相等的数据块,每一块作为单个MapWorker的输入被处理,以便于多个Map Worker同时工作。
- Map阶段:每个Map Worker在读入各自的数据后,进行计算处理,最终输出给Reduce。MapWorker在输出数据时,需要为每一条输出数据指定一个Key,这个Key值决定了这条数据将会被发送给哪一个ReduceWorker。Key值和Reduce Worker是多对一的关系,具有相同Key的数据会被发送给同一个ReduceWorker,单个Reduce Worker有可能会接收到多个Key值的数据。
- 在进入Reduce阶段之前,MapReduce框架会对数据按照Key值排序,使得具有相同Key的数据彼此相邻。如果您指定了合并操作(Combiner),框架会调用Combiner,将具有相同Key的数据进行聚合。Combiner的逻辑可以由您自定义实现。与经典的MapReduce框架协议不同,在MaxCompute中,Combiner的输入、输出的参数必须与Reduce保持一致,这部分的处理通常也叫做洗牌(Shuffle)。
- Reduce阶段:进入Reduce阶段,相同Key的数据会传送至同一个Reduce Worker。同一个ReduceWorker会接收来自多个Map Worker的数据。每个ReduceWorker会对Key相同的多个数据进行Reduce操作。最后,一个Key的多条数据经过Reduce的作用后,将变成一个值。
- 输出结果数据。
操作步骤
- 输入数据:对文本进行分片,将每片内的数据作为单个Map Worker的输入。
- Map阶段:Map处理输入,每获取一个数字,将数字的Count设置为1,并将此<Word,Count>对输出,此时以Word作为输出数据的Key。
- Shuffle>合并排序:在Shuffle阶段前期,首先对每个Map Worker的输出,按照Key值(即Word值)进行排序。排序后进行Combiner操作,即将Key值(Word值)相同的Count累加,构成一个新的<Word,Count>对。此过程被称为合并排序。
- Shuffle>分配Reduce:在Shuffle阶段后期,数据被发送到Reduce。
- Reduce Worker收到数据后依赖Key值再次对数据排序。 Reduce阶段:每个Reduce Worker对数据进行处理时,采用与Combiner相同的逻辑,将Key值(Word值)相同的Count累加,得到输出结果。
- 输出结果数据。
logview示例
伏羲任务
M 读取数据
J join数据
R 聚合数据
C 动态调度
调度模式
离线执行
一体式执行
混合执行
1.减少j任务
关联小表,使用mapjoin
关联中表,使用distmapjoin
full join中如果有小表,使用left outer join,left anti join替代
2.提高并发度
调整m任务:在表别名后增加/+split_size(128)/
调整r任务:set odps.sql.reduce.instancces = 1000;
调整j任务:set odps.sql.joiner,instances = 1000;
3.合理设计链路
减少不必要的落库。
关键数据加工链路,减少不必要的节点
关联历史全量数据,确定业务是否合理
版权归原作者 川成先生 所有, 如有侵权,请联系我们删除。