0


Doris全方位教程+应用实例

Impala性能稍领先于presto,但是presto在数据源支持上非常丰富,包括hive、图数据库、传统关系型数据库、Redis等

缺点:这两种对hbase支持的都不好,presto 不支持,但是对hdfs、hive兼容性很好,其实这也是顺理成章的,所以数据源的处理很重要,针对hbase的二级索引查询可以用phoenix,效果也不错

Impala的优缺点

1.2.1 优点

1)基于内存运算,不需要把中间结果写入磁盘,省掉了大量的I/O开销。

2)无需转换为Mapreduce,直接访问存储在HDFS,HBase中的数据进行作业调度,速度快。

3)使用了支持Data locality的I/O调度机制,尽可能地将数据和计算分配在同一台机器上进行,减少了网络开销。

4)支持各种文件格式,如TEXTFILE 、SEQUENCEFILE 、RCFile、Parquet。

5)可以访问hive的metastore,对hive数据直接做数据分析。

1.2.2 缺点

1)对内存的依赖大,且完全依赖于hive。

2)实践中,分区超过1万,性能严重下降。

3)只能读取文本文件,而不能直接读取自定义二进制文件。

每当新的记录/文件被添加到HDFS中的数据目录时,该表需要被刷新

Doris 的架构很简洁,只设 FE(Frontend)、BE(Backend)两种角色、两个进程,不依赖于

外部组件,方便部署和运维,FE、BE 都可线性扩展。

FE(Frontend):存储、维护集群元数据;负责接收、解析查询请求,规划查询计划, 调度查询执行,返回查询结果。

Leader 和 Follower:主要是用来达到元数据的高可用

Observer:用来扩展查询节点,同时起到元数据备份的作用,observer 不参与任何的写入,只参与读取

BE(Backend):负责物理****数据的存储和计算;依据 FE 生成的物理计划,分布式地执行查询。

**MySQL Client **

Doris 借助 MySQL 协议,用户使用任意 MySQL 的 ODBC/JDBC 以及 MySQL 的客户

端,都可以直接访问 Doris。

**Broker **

Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统

中文件的能力,包括 HDFS,S3,BOS 等。

**Partition & Tablet **

数据首先被划分成若干个分区(Partition),划分的规则通

常是按照用户指定的分区列进行范围划分

。而在每个分区内,数据被进一

步的按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶。

每个分桶就是一个数据分片(Tablet),也是数据划分的最小逻辑单元。

建表

Doris 支持支持单分区和复合分区两种建表方式。

复合分区:既有分区也有分桶

单分区:只做 HASH 分布,即只分桶。

HLL

1~16385 个字节 。hll 列类型,不需要指定长度和默认值、 长度根据数据的聚合 程度系统内控制,并且 HLL 列只能通过配套的 hll_union_agg 、 Hll_cardinality、hll_hash 进行查询或使用

BITMAP

bitmap 列类型,不需要指定长度和默 认值。表示整型的集合,元素最大支持 到 2^64 - 1

**3.3.2.1 Range Partition **

CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl

(

user_id LARGEINT NOT NULL COMMENT "用户 id",

date DATE NOT NULL COMMENT "数据灌入日期时间",

timestamp DATETIME NOT NULL COMMENT "数据灌入的时间戳",

city VARCHAR(20) COMMENT "用户所在城市",

age SMALLINT COMMENT "用户年龄",

sex TINYINT COMMENT "用户性别",

last_visit_date DATETIME REPLACE DEFAULT "1970-01-01

00:00:00" COMMENT "用户最后一次访问时间",

cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费",

max_dwell_time INT MAX DEFAULT "0" COMMENT "用户最大停留时间",

min_dwell_time INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"

)

ENGINE=olap

AGGREGATE KEY(user_id, date, timestamp, city, age, sex)

PARTITION BY RANGE(date)

(

PARTITION p201701 VALUES LESS THAN ("2017-02-01"),

PARTITION p201702 VALUES LESS THAN ("2017-03-01"),

PARTITION p201703 VALUES LESS THAN ("2017-04-01")

)

DISTRIBUTED BY HASH(user_id) BUCKETS 16

PROPERTIES

(

"replication_num" = "3",

"storage_medium" = "SSD",

"storage_cooldown_time" = "2018-01-01 12:00:00"

);

分区的删除不会改变已存在分区的范围。删除分区可能出现空洞

**3.3.2.2 List Partition **

CREATE TABLE IF NOT EXISTS example_db.expamle_list_tbl

(

user_id LARGEINT NOT NULL COMMENT "用户 id",

date DATE NOT NULL COMMENT "数据灌入日期时间",

timestamp DATETIME NOT NULL COMMENT "数据灌入的时间戳",

city VARCHAR(20) COMMENT "用户所在城市",

age SMALLINT COMMENT "用户年龄",

sex TINYINT COMMENT "用户性别",

last_visit_date DATETIME REPLACE DEFAULT "1970-01-01

00:00:00" COMMENT "用户最后一次访问时间",

cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费",

max_dwell_time INT MAX DEFAULT "0" COMMENT "用户最大停留时间",

min_dwell_time INT MIN DEFAULT "99999" COMMENT "用户最小停留时

间"

)

ENGINE=olap

AGGREGATE KEY(user_id, date, timestamp, city, age, sex)

PARTITION BY LIST(city)

(

PARTITION p_cn VALUES IN ("Beijing", "Shanghai", "Hong Kong"),

PARTITION p_usa VALUES IN ("New York", "San Francisco"),

PARTITION p_jp VALUES IN ("Tokyo")

)

DISTRIBUTED BY HASH(user_id) BUCKETS 16

PROPERTIES

(

"replication_num" = "3",

"storage_medium" = "SSD",

"storage_cooldown_time" = "2018-01-01 12:00:00"

);

分区值为枚举值

不论分区列是什么类型,在写分区值时,都需要加双引号

分桶列可以是多列,但必须为 Key 列

**多列分区 **

Doris 支持指定多列作为分区列

PARTITION BY RANGE(date, id)

(

PARTITION p201701_1000 VALUES LESS THAN ("2017-02-01", "1000"),

PARTITION p201702_2000 VALUES LESS THAN ("2017-03-01", "2000"),

PARTITION p201703_all VALUES LESS THAN ("2017-04-01")

)

PARTITION BY LIST(id, city)

(

PARTITION p1_city VALUES IN (("1", "Beijing"), ("1", "Shanghai")),

PARTITION p2_city VALUES IN (("2", "Beijing"), ("2", "Shanghai")),

PARTITION p3_city VALUES IN (("3", "Beijing"), ("3", "Shanghai"))

)

AGGREGATE KEY 数据模型中,所有没有指定聚合方式(SUM、REPLACE、MAX、 MIN)的列视为 Key 列。而其余则为 Value 列。

Key 列必须在所有 Value 列之前。

在建表语句的最后 PROPERTIES 中,可以指定以下两个参数:

3.4.3.1** replication_num **

每个 Tablet 的副本数量。默认为 3,建议保持默认即可。

Doris 中副本分布的

原则是,不允许同一个 Tablet 的副本分布在同一台物理机上

即使在同一台物理机上部署了 3 个或更多 BE 实例,如果这些 BE 的 IP 相同,则依然只

能设置副本数为 1

**3.4.3.2 ****storage_medium & storage_cooldown_time **

BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀

区分)。建

默认初始存储介质可通过 fe 的配置文件 fe.conf 中指定 default_storage_medium=xxx,

如果没有指定,则默认为 HDD。如果指定为 SSD,则数据初始存放在 SSD 上。

如果没有指定 storage_cooldown_time,则默认 30 天后,数据会从 SSD 自动迁移到 HDD

上。如果指定了 storage_cooldown_time,则在到达 storage_cooldown_time 时间后,数据才会

迁移。

ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。

**数据模型 **

Doris 的数据模型主要分为 3 类:Aggregate、Uniq、Duplicate

Aggregate 模型

CREATE TABLE IF NOT EXISTS test_db.example_site_visit2

(

user_id LARGEINT NOT NULL COMMENT "用户 id",

date DATE NOT NULL COMMENT "数据灌入日期时间",

timestamp DATETIME COMMENT "数据灌入时间,精确到秒",

city VARCHAR(20) COMMENT "用户所在城市",

age SMALLINT COMMENT "用户年龄",

sex TINYINT COMMENT "用户性别",

last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",

cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费",

max_dwell_time INT MAX DEFAULT "0" COMMENT "用户最大停留时间",

min_dwell_time INT MIN DEFAULT "99999" COMMENT "用户最小停留时

间"

)

AGGREGATE KEY(user_id, date, timestamp, city, age, sex)

DISTRIBUTED BY HASH(user_id) BUCKETS 10;

分为 Key(维度列)和 Value(指标列)。

没有设置 AggregationType 的称为 Key,设置了 AggregationType 的称为 Value。

对于 Key 列相同的行会聚合成一行,而 Value 列会按照设置的 AggregationType 进行聚合。

SUM:求和

REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。

MAX:保留最大值。

MIN:保留最小值。

数据的聚合,在 Doris 中有如下三个阶段发生:

(1)每一批次数据导入的 ETL 阶段。

(2)底层 BE 进行数据 Compaction 的阶段。该阶段,BE 会对已导入的不同批次

数据进行进一步聚合

(3)数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。

Uniq 模型

CREATE TABLE IF NOT EXISTS test_db.user

(

user_id LARGEINT NOT NULL COMMENT "用户 id",

username VARCHAR(50) NOT NULL COMMENT "用户昵称",

city VARCHAR(20) COMMENT "用户所在城市",

age SMALLINT COMMENT "用户年龄",

sex TINYINT COMMENT "用户性别",

phone LARGEINT COMMENT "用户电话",

address VARCHAR(500) COMMENT "用户地址",

register_time DATETIME COMMENT "用户注册时间"

)

UNIQUE KEY(user_id, username)

DISTRIBUTED BY HASH(user_id) BUCKETS 10;

保证 Key 的唯一性 获得 Primary Key 唯一性约束。

**Duplicate 模型 **

在某些多维分析场景下,数据既没有主键,也没有聚合需求。Duplicate 数据模型可以

满足这类需求。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据 完全相同,也都会保留。

而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序

CREATE TABLE IF NOT EXISTS test_db.example_log

(

timestamp DATETIME NOT NULL COMMENT "日志时间",

type INT NOT NULL COMMENT "日志类型",

error_code INT COMMENT "错误码",

error_msg VARCHAR(1024) COMMENT "错误详细信息",

op_id BIGINT COMMENT "负责人 id", s

op_time DATETIME COMMENT "处理时间"

)

DUPLICATE KEY(timestamp, type)

DISTRIBUTED BY HASH(timestamp) BUCKETS 10;

**动态分区 **

分区列 time 类型为 DATE,创建一个动态分区规则。按天分区,只保留最近 7 天的分区,并且预先创建未来 3 天的分区。

create table student_dynamic_partition1

(id int,

time date,

name varchar(50),

age int

)

duplicate key(id,time)

PARTITION BY RANGE(time)()

DISTRIBUTED BY HASH(id) buckets 10

PROPERTIES(

"dynamic_partition.enable" = "true",

"dynamic_partition.time_unit" = "DAY",

"dynamic_partition.start" = "-7",

"dynamic_partition.end" = "3",

"dynamic_partition.prefix" = "p",

"dynamic_partition.buckets" = "10",

"replication_num" = "1"

);

建表时指定:

CREATE TABLE tbl1

(...)

PROPERTIES

(

"dynamic_partition.prop1" = "value1",

"dynamic_partition.prop2" = "value2",

...

)

运行时修改

ALTER TABLE tbl1 SET

(

"dynamic_partition.prop1" = "value1",

"dynamic_partition.prop2" = "value2",

...

)

**Rollup **

在 Doris 中,我们将用户通过建表语句创建出来的表称为 Base 表(Base Table)。Base

表中保存着按用户建表语句指定的方式存储的基础数据。

在 Base 表之上,我们可以创建任意多个 ROLLUP 表。这些 ROLLUP 的数据是基于 Base

表产生的,并且在物理上是独立存储的。

ROLLUP 表的基本作用,在于在 Base 表的基础上,获得更粗粒度的聚合数据。

比如需要查看某个用户的总消费,那么可以建立一个只有 user_id 和 cost 的 rollup

alter table example_site_visit2 add rollup rollup_cost_userid(user_id,cost);

SELECT user_id, sum(cost) FROM example_site_visit2 GROUP BY user_id;

Doris 会自动命中这个 ROLLUP 表,从而只需扫描极少的数据量,即可完成这次聚合查询。

因为 Duplicate 模型没有聚合的语意。所以该模型中的 ROLLUP,已经失去了“上卷” 这一层含义。而仅仅是作为调整列顺序,以命中前缀索引的作用。

是否命中 ROLLUP 完全由 Doris 系统自动决定

ROLLUP 的数据更新与 Base 表是完全同步的。

**前缀索引 **

Doris 不支持在任意列上创建索引。

我们将一行数据的前 36 个字节 作为这行数据的前缀索引。当遇到 VARCHAR 类型时,前缀索引会直接截断。

因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。

因此,我们可以通过创建 ROLLUP 来人为的调整列顺序。

物化视图

查询结果预先存储起来的特殊的表。

Rollup 具有一定的局限性,他不能基于明细模型做预聚合。

物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。

create materialized view store_amt as

select store_id, sum(sale_amt)

from sales_records

group by store_id;

提交完创建物化视图的任务后,Doris就会异步在后台生成物化视图的数据,构建物化视图。

物化视图创建完成后,用户的查询会根据规则自动匹配到最优的物化视图

**删除数据(Delete) **

Doris 目前可以通过两种方式删除数据:DELETE FROM 语句和 ALTER TABLE DROP PARTITION 语句。

delete from student_kafka where id=1;

该语句只能针对 Partition 级别进行删除。如果一个表有多个 partition 含有需要 删除的数据,则需要执行多次针对不同 Partition 的 delete 语句。而如果是没有使用 Partition 的表,partition 的名称即表名。

where 后面的条件谓词只能针对 Key 列,并且谓词之间,只能通过 AND 连接。 如果想实现 OR 的语义,需要执行多条 delete。

数据的真正删除是在 BE 进行数据 Compaction 时进行的。所以执行完 delete 命 令后,并不会立即释放磁盘空间

数据导入

导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过 Mysql 客户端查询数据。为适配不同的数据导入需求,Doris 系统提供了 6 种不同的导入方 式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。

Broker Load

源数据在 Broker 可以访问的存储系统中,如 HDFS。 数据量在几十到百 GB 级别。

LOAD LABEL test_db.student_result

(

DATA INFILE("hdfs://my_cluster/student.csv")

INTO TABLE student_result

COLUMNS TERMINATED BY ","

FORMAT AS "csv"

(id, name, age, score)

)

WITH BROKER broker_name

(

#开启了 HA 的写法,其他 HDFS 参数可以在这里指定

"dfs.nameservices" = "my_cluster",

"dfs.ha.namenodes.my_cluster" = "nn1,nn2,nn3",

"dfs.namenode.rpc-address.my_cluster.nn1" = "hadoop1:8020",

"dfs.namenode.rpc-address.my_cluster.nn2" = "hadoop2:8020",

"dfs.namenode.rpc-address.my_cluster.nn3" = "hadoop3:8020",

"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

)

PROPERTIES

(

"timeout" = "3600"

);

每个导入任务,都有一个在单 database 内部唯一的 Label。Label 是 用户在导入命令中自定义的名称。通过这个 Label,用户可以查看对应导入任务的执行情况。

Label 的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once 语 义

**Stream Load **

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

目前 Stream Load 支持两个数据格式:CSV(文本)和 JSON。

curl

--location-trusted

-u root

-H "label:123"

-H"column_separator:,"

-T student.csv

-X PUT

http://hadoop1:8030/api/test_db/student_result/_stream_load

3** Routine Load **

当前仅支持从 Kafka 系统进行例行导入

CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka

COLUMNS TERMINATED BY ",",

COLUMNS(id, name, age)

PROPERTIES

(

"desired_concurrent_number"="3",

"strict_mode" = "false"

)

FROM KAFKA

(

"kafka_broker_list"= "hadoop1:9092,hadoop2:9092,hadoop3:9092",

"kafka_topic" = "test_doris1",

"property.group.id"="test_doris_group",

"property.kafka_default_offsets" = "OFFSET_BEGINNING",

"property.enable.auto.commit"="false"

);

**Binlog Load **

增量同步用户在 Mysql 数据库的对数据更新操作的 CDC(Change Data Capture)功能。

INSERT/UPDATE/DELETE 支持。

过滤 Query。

暂不兼容 DDL 语句。

CREATE SYNC test_db.job1

(

FROM test.tbl1 INTO binlog_test

)

FROM BINLOG

(

"type" = "canal", // 目前可支持的对接类型只有 canal 方式

"canal.server.ip" = "hadoop1",

"canal.server.port" = "11111",

"canal.destination" = "doris-load",

"canal.username" = "canal",

"canal.password" = "canal"

);

Insert Into 的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似

**数据导出 **

EXPORT TABLE db1.tbl1

PARTITION (p1,p2)

[WHERE [expr]]

TO "hdfs://host/path/to/export/"

PROPERTIES

(

"label" = "mylabel",

"column_separator"=",",

"columns" = "col1,col2",

"exec_mem_limit"="2147483648",

"timeout" = "3600"

)

WITH BROKER "hdfs"

(

"username" = "user",

"password" = "passwd"

);

**查询结果导出 **

SELECT * FROM example_site_visit

INTO OUTFILE "hdfs://hadoop1:8020/doris-out/broker_a_"

FORMAT AS CSV

PROPERTIES

(

"broker.name" = "broker_name",

"column_separator" = ",",

"line_delimiter" = "\n",

"max_file_size" = "100MB"

);

**Join ****查询 **

**Broadcast Join **

系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节

点上,形成一个内存 Hash 表,然后流式读出大表的数据进行 Hash Join。

FROM example_site_visit

JOIN [broadcast] example_site_visit2

Shuffle Join****(Partitioned Join)

如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成可以显式指定 Shuffle Join。

即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join

FROM example_site_visit

JOIN [shuffle] example_site_visit2

**Colocation Join **

保证这些表对应的数据分片会落在同一个 be 节点上,那么使得两表再进行 join 的时候,可以通过本地数据进行直接join,减少数据在节点之间的网络传输时间。

建表时两张表的分桶列的类型和数量需要完全一致,并且桶数一致

**Bucket Shuffle Join **

SQL 语句为 A 表 join B 表,并且 join 的等值表达式命中了 A 的数据分布列。而 Bucket Shuffle Join 会根据 A 表的数据分布信息,将 B 表的数据发送到对应的 A 表的数据存储计算节点。

与 Colocate Join 不同,它对于表的数据分布方式并没有侵入性,这对于用户来说是透明的。对于表的数据分布没有强制性的要求,不容易导致数据倾斜的问题

指定 RuntimeFilter 类型

旨在为某些 Join 查询在运行时动态生成过滤条件,来减少扫描的数据量,避免不必要的 I/O 和网络传输,从而加速查询。

set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";

tableEnv.executeSql("CREATE TABLE flink_doris (\n" +

" siteid INT,\n" +

" citycode SMALLINT,\n" +

" username STRING,\n" +

" pv BIGINT\n" +

" ) \n" +

" WITH (\n" +

" 'connector' = 'doris',\n" +

" 'fenodes' = 'hadoop1:8030',\n" +

" 'table.identifier' = 'test_db.table1',\n" +

" 'username' = 'test',\n" +

" 'password' = 'test'\n" +

")\n");

ODBC 外部表

CREATE EXTERNAL RESOURCE oracle_odbc

PROPERTIES (

"type" = "odbc_catalog",

"host" = "192.168.0.1",

"port" = "8086",

"user" = "test",

"password" = "test",

"database" = "test",

"odbc_type" = "oracle",

"driver" = "Oracle 19 ODBC driver"

);

CREATE EXTERNAL TABLE baseall_oracle (

k1 decimal(9, 3) NOT NULL COMMENT "",

k2 char(10) NOT NULL COMMENT "",

k3 datetime NOT NULL COMMENT "",

k5 varchar(20) NOT NULL COMMENT "",

k6 double NOT NULL COMMENT ""

) ENGINE=ODBC

COMMENT "ODBC"

PROPERTIES (

"odbc_catalog_resource" = "oracle_odbc",

"database" = "test",

"table" = "baseall"

);

6)Doris 建 Resource

通过 ODBC_Resource 来创建 ODBC 外表,这是推荐的方式,这样 resource 可以复用。

CREATE EXTERNAL RESOURCE mysql_5_3_11

PROPERTIES (

"host" = "hadoop1",

"port" = "3306",

"user" = "root",

"password" = "000000",

"database" = "test",

"table" = "test_cdc",

"driver" = "MySQL ODBC 5.3.11", --名称要和上面[]里的名称一致

"odbc_type" = "mysql",

"type" = "odbc_catalog")

7)基于 Resource 创建 Doris 外表

CREATE EXTERNAL TABLE test_odbc_5_3_11 (

id int NOT NULL ,

name varchar(255) null

) ENGINE=ODBC

COMMENT "ODBC"

PROPERTIES (

"odbc_catalog_resource" = "mysql_5_3_11", --名称就是 resource 的名称

"database" = "test",

"table" = "test_cdc"

);

CREATE EXTERNAL TABLE es_test (

k1 bigint(20) COMMENT "",

k2 datetime COMMENT "",

k3 varchar(20) COMMENT "",

k4 varchar(100) COMMENT "",

k5 float COMMENT ""

) ENGINE=ELASTICSEARCH // ENGINE 必须是 Elasticsearch

PROPERTIES (

"hosts" =

"http://hadoop1:9200,http://hadoop2:9200,http://hadoop3:9200",

"index" = "test",

"type" = "doc",

"user" = "",

"password" = ""

);

标签: 大数据

本文转载自: https://blog.csdn.net/qq_38646027/article/details/140656588
版权归原作者 豆沙糕 所有, 如有侵权,请联系我们删除。

“Doris全方位教程+应用实例”的评论:

还没有评论