0


Doris 入门:基本操作(三)

目录

一、创建用户和数据库

  • 创建用户 - CREATE USER ‘test’ IDENTIFIED BY ‘123456’;- 后续登录就可以直接使用命令登录 - mysql -h 192.168.1.101 -P9030 -utest -p12345
  • 创建数据库并赋予权限 - 初始可以通过 root 或 admin 用户创建数据库 - create database test_db;- 查看数据库 - show databases;- 授权 - grant all on test_db to test;- 注意 - 可以使用 help command 查看语法帮助,不清楚命令全名的话可以使用 ‘help 某一字段’ 进行模糊查询- information_schema 数据库是为了兼容 MySQL 协议而存在的

二、建表

1. 基本概念

  • Doris 数据都以表 Table 的形式进行逻辑上的描述,一张表包括 行 Row 和 列 Column,Row 就是用户一行数据
  • 从表的角度来看,一张 Table 会拆成多个 Tablet,Tablet 会存成多副本,存储在不同的 BE 中,BE 节点上物理数据的可靠性通过多副本来实现,默认 3 副本
  • Tablet 和 Partition - Doris 存储引擎中,用户数据被水平划分为若干数据分片 Tablet,也称为数据分桶,每个 Tablet 包含若干行数据,和其他 Tablet 没有交集,物理上独立存储- 多个 Tablet 在逻辑上归属于不同的分区 Partition,一个 Tablet 只属于一个 Partition,一个 Partition 包含若干个 Tablet,若干个 Partition 组成一个 Table- Tablet 是数据移动、复制等操作的最小物理存储单元,Partition 可以视为逻辑上最小的管理单元,数据导入和删除,都可以针对一个 Partition 进行
  • Doris 存储引擎规则- 用户数据首先被划分成若干个分区 Partition,划分对规则通常是按照用户指定的分区进行范围划分,比如按时间划分- 在每个分区内,数据被进一步按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶,每个分桶就是一个 Tablet,也是最小数据划分逻辑单元- Partition 可以视为逻辑上最小的管理单元,数据的导入与删除,都可以针对一个 Partition 进行- Tablet 直接的数据是没有交集的,独立存储,Tablet 也是数据移动、复制操作的最小物理存储单元

2. 创建表

  • 首先需要切换数据库(use test_db
  • 使用帮助命令可以查看很多案例 help create table
  • 数据类型 - tinyint、smallint、int、bigint、largeint、float、double、decimal(precision,scale)、date、datetime、char(length)、varchar(length)、hll、bitmap、agg_type
  • 建表方式 - 单分区 - 即数据不分区,只做 HASH 分布,也就是分桶- 复合分区 - 第一级为 Partition,即分区,用户可以指定某一纬度列作为分区列(整型、时间类型),并指定分区取值范围- 第二级为 Distribution,即分桶,用户指定一个或多个纬度列以及桶数对数据进行 HASH 分布- 使用复合分区的场景 - 有时间纬度或类似带有有序值的纬度:可以以这类纬度列作为分区列,分区粒度可以根据导入频次、分区数量等进行评估- 历史数据删除需求:可以通过删除历史分区来达到目的,也可以通过在指定分区内发送 DELETE 语句进行数据删除- 解决数据倾斜问题:每个分区可以单独指定分桶数量,如按天分区,当天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列

3. 数据导入

  • 相关文档:https://doris.apache.org/zh-CN/docs/data-operate/import/load-manual
  • 所有导入方式都支持 CSV 数据格式,Broker load 还支持 parquet 和 orc 数据格式
  • Stream load- 用户通过 HTTP 协议提交请求并携带原始数据创建导入,主要用于快速将本地文件或数据流中的数据导入到 Doris,导入命令同步返回导入结果- curl --location-trusted -u root:123456 -H “label:table1_20221121” -H “column_separator:,” -T table1_data http://hybrid01:8030/api/test_db/table1/_stream_load![在这里插入图片描述](https://img-blog.csdnimg.cn/43758e084cfd472b8048d28bf300c488.png)
  • Insert- 类似 MySQL 中的 Insert 语句,Doris 提供 inssert into tbl SELECT …; 的方式从 Doris 表中读取数据并导入到另一张表或 insert into tbl values(…) 插入- insert into table1 values(1,1,‘user1’,10);
  • Broker load- 通过 Broker 进程访问并读取外部数据源导入到 Doris,用户通过 MySQL 协议提交导入任务后,异步执行,show load 命令查看导入结果- 具体语法案例见代码
  • Multi load- 用户通过 HTTP 协议提交多个导入作业,Multi Load 可以保证多个导入作业的原子生效
  • Routine load- 通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断从数据源中读取数据并导入到 Doris 中- 目前仅支持 Kafka 进行导入,支持无认证、SSL 认证的 Kafka 集群,支持的格式为 csv 文本格式,每个 message 为一行,行尾不包括换行符- 原理 - FE 通过 JobScheduler 将一个导入任务拆分成若干个 Task,每个 Task 负责导入指定的一部分数据,Task 被 TaskScheduler 分配到指定的 BE 上执行- 在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入,导入完成后向 FE 汇报- FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试- 整个例行导入作业通过不断的产生新的 Task 来完成数据不间断的导入- 具体语法案例见代码
  • 通过 S3 协议直接导入- 用法和 Broker Load 类似

三、代码案例

-- 建表语句案例CREATE[EXTERNAL]TABLE[IFNOTEXISTS][database.]table_name
(column_definition1[, column_definition2,...][, index_definition1[, index_definition2,...]])[ENGINE=[olap|mysql|broker|hive|iceberg]][key_desc][COMMENT"table comment"];[partition_desc][distribution_desc][rollup_index][PROPERTIES ("key"="value",...)][BROKER PROPERTIES ("key"="value",...)]-- 创建表-单分区表CREATETABLE test_db.table1
(
  siteid INTDEFAULT'10',
  citycode SMALLINT,
  username VARCHAR(32)DEFAULT'',
  pv BIGINT SUM DEFAULT'0')
AGGREGATE KEY(siteid, citycode, username)DISTRIBUTEDBYHASH(siteid) BUCKETS 10
PROPERTIES("replication_num"="1");-- insert 导入insertinto table1 values(1,1,'user1',10);insertinto table1 values(1,1,'user1',10);insertinto table1 values(1,2,'user1',10);-- 创建表-复合分区表-- event_day 作为分区列,建立三个分区,每个分区使用 siteid 进行哈希分桶,桶数为10-- p202209: 范围[最小值,2022-10-01]-- p202210: 范围[2022-10-01, 2022-11-01]-- p202211: 范围[2022-11-01, 2022-12-01]-- 左闭右开CREATETABLE test_db.table2
(
    event_day DATE,
    siteid INTDEFAULT'10',
    citycode SMALLINT,
    username VARCHAR(32)DEFAULT'',
    pv BIGINT SUM DEFAULT'0')
AGGREGATE KEY(event_day,siteid,citycode,username)PARTITIONBY RANGE(event_day)(PARTITION p202209 VALUES LESS THAN ('2022-10-01'),PARTITION p202210 VALUES LESS THAN ('2022-11-01'),PARTITION p202211 VALUES LESS THAN ('2022-12-01'))DISTRIBUTEDBYHASH(siteid) BUCKETS 10
PROPERTIES("replication_num"="3");-- Broker 导入,broker_id 就是 broker 的名字,最后一个是能容忍的错误数据量LOAD LABEL test_db.table2
(DATAINFILE("hdfs://hybrid01:8020/data/table2_data.csv")INTOTABLE`table2`COLUMNSTERMINATEDBY","
  FORMAT AS"csv"(
    event_day,siteid,citycode,username,pv
  ))WITH BROKER broker_id
("dfs.nameservices"="my_cluster","dfs.ha.namenodes.my_cluster"="nn1,nn2,nn3","dfs.namenode.rpc-address.my_cluster.nn1"="hybrid01:8020","dfs.namenode.rpc-address.my_cluster.nn2"="hybrid02:8020","dfs.namenode.rpc-address.my_cluster.nn3"="hybrid03:8020","dfs.client.failover.proxy.provider"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
PROPERTIES
("max_filter_ratio"="0.00002");-- Routine load 案例CREATETABLE student_kafka
(
  id int,
  name varchar(50),
  age int)DUPLICATEKEY(id)DISTRIBUTEDBYHASH(id) BUCKETS 10;-- 创建导入任务,任务名字 kafka_job1CREATEROUTINELOAD test_db.kafka_job1 on student_kafka
PROPERTIES
("desired_concurrent_number"="1","strict_mode"="false","format"="json")FROM KAFKA
("kafka_broker_list"="hybrid01:9092,hybrid02:9092,hybrid03:9092","kafka_topic"="test","property.group.id"="test_group_1","property.kafka_default_offsets"="OFFSET_BEGINNING","property.enable.auto.commit"="false");

在这里插入图片描述

标签: 大数据

本文转载自: https://blog.csdn.net/baidu_40468340/article/details/127982933
版权归原作者 AcWare 学习笔记 所有, 如有侵权,请联系我们删除。

“Doris 入门:基本操作(三)”的评论:

还没有评论