总结


导入方式 方式 协议 导入执行 流程
Insert INTO Doris中表数据的转换 同步导入 -
Broker load 外部数据源(HDFS/S3)
+Broker进程
异步导入 Mysql协议 BE通过Broker拉取数据,
会对数据预处理
image.png
Stream load 本地文件/数据流 同步导入 HTTP协议 Coordinator BE将数据分发给Excutor BE image.png
Multi load 多个导入作业 原子性 HTTP协议
Routine load 外部数据源+常驻线程 异步导入 Mysql协议 不断生成task,在BE上获取kafka数据,基于一批数据进行Stream Load; image.png
Binlog load Mysql Binlog数据 Mysql CDC
(Change Data Capture)
通过Canal进行 image.png
DataX 离线数据源
Reader插件
Writer插件
image.png
Spark Load 外部数据源(HDFS/S3)
+Broker进程
+Spark ETL
异步导入 Mysql协议 复用Broker Load功能,
增加Spark预处理功能
image.png

【数据导入】

Insert Into

INSERT - 百度数据仓库DORIS | 百度智能云文档
Insert Into 语句的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似。但在 Doris 中,所有的数据写入都是一个独立的导入作业。所以这里将 Insert Into 也作为一种导入方式介绍

语法

1
2
INSERT INTO table_name [partition_info] [WITH LABEL label]
[col_list] [query_stmt] [VALUES];

Broker Load

适用场景

  1. 源数据在Broker可以访问到的系统中(比如HDFS)
  2. 数据量不是特别大,几十-百GB

    原理

    image.png
    image.png

    语法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    LOAD LABEL db_name.label_name
    (data_desc, ...)
    WITH BROKER broker_name broker_properties
    [PROPERTIES (key1=value1, ... )]
    * data_desc:
    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator ]
    [(col1, ...)]
    [PRECEDING FILTER predicate]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
    * broker_properties:
    (key1=value1, ...)
    image.png
    Label:导入任务标识,一批数据最好用一个避免任务失败无法查询。

    数据描述类参数 data_desc

    negative 取反导入
    partition 指定待导入表的partition信息
    set column mapping 设置列函数变化
    preceding filter predicate 过滤原始数据
    where predicate 在 data_desc 中的 WHERE 语句中负责过滤已经完成 transform 的数据

导入作业参数

timeout

1
2
3
推荐超时时间:
总文件大小(MB) / 用户 Doris 集群最慢导入速度(MB/s) > timeout > ((总文件大
小(MB) * 待导入的表及相关 Roll up 表的个数) / (10 * 导入并发数) )

max_filter_ratio 导入任务的最大容忍率

1
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

exec_mem_limit 内存限制
strict_mode 对于导入过程中的列类型转换进行严格过滤

image.png

merge_type
image.png

示例

创建表

1
2
3
4
5
6
7
8
9
create table student_result
(
id int ,
name varchar(50),
age int ,
score decimal(10,4)
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;

导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
LOAD LABEL test_db.student_result
(
DATA INFILE("hdfs://my_cluster/student.csv")
INTO TABLE `student_result`
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(id, name, age, score)
)
WITH BROKER broker_name
( #
开启了 HA 的写法,其他 HDFS 参数可以在这里指定
"dfs.nameservices" = "my_cluster",
"dfs.ha.namenodes.my_cluster" = "nn1,nn2,nn3",
"dfs.namenode.rpc-address.my_cluster.nn1" = "hadoop1:8020",
"dfs.namenode.rpc-address.my_cluster.nn2" = "hadoop2:8020",
"dfs.namenode.rpc-address.my_cluster.nn3" = "hadoop3:8020",
"dfs.client.failover.proxy.provider" =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProx
yProvider"
)
PROPERTIES
(
"timeout" = "3600"
);

查看导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
show load order by createtime desc limit 1\G

*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15;
dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800;
max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL:
http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_
log_insert_stmt_4bb00753932c491aa6da6e2725415317_4bb00753932c491a_a6da6e2725415317
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-
8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"
All backends":{"9c3441027ff948a0-
8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

【具体参数含义见PDF】

取消导入

1
2
3
CANCEL LOAD
[FROM db_name]
WHERE LABEL=”load_label”;

Stream Load

Stream load 是一个同步的导入方式,用户通过**发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。 **Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

适用场景

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
目前 Stream Load 支持两个数据格式: CSV(文本)和 JSON。

原理

image.png
Stream load 中, Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。

语法

1
2
curl --location-trusted -u user:passwd [-H ""...] -T data.file -
XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

image.png

具体参数:PDF P63

Routine Load

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能

适用场景

image.png
image.png

原理

image.png

image.png

语法

1
2
3
4
5
6
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

【具体参数见PDF P67】

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES
( "
desired_concurrent_number"="3",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list"= "hadoop1:9092,hadoop2:9092,hadoop3:9092",
"kafka_topic" = "test_doris1",
"property.group.id"="test_doris_group",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);

查看HELP SHOW ROUTINE LOAD
修改HELP ALTER ROUTINE LOAD
作业控制 HELP STOP ROUTINE LOAD; HELP PAUSE ROUTINE LOAD; 以及 HELP RESUME ROUTINE LOAD

Binlog Load

Binlog Load 提供了一种使 Doris 增量同步用户在 Mysql 数据库的对数据更新操作的CDC(Change Data Capture) 功能

适用场景

INSERT/UPDATE/DELETE 支持。
过滤 Query。
暂不兼容 DDL 语句。

原理

在第一期的设计中, Binlog Load 需要依赖 canal 作为中间媒介,让 canal 伪造成一个从节点去获取 Mysql 主节点上的 Binlog 并解析,再由 Doris 去获取 Canal 上解析好的数据,主要涉及 Mysql 端、 Canal 端以及 Doris 端,总体数据流向如下:
image.png
image.png
【具体内容 PDF P75】

语法

1
2
3
4
5
6
7
CREATE SYNC [db.]job_name
(
channel_desc,
channel_desc
...
)
binlog_desc

示例

image.png

Spark Load

背景:Doris中现有的导入方式中,针对百G级别以上的数据的批量导入支持不是很好,功能上需要修改很多配置,而且可能无法完成导入,性能上会比较慢,并且由于没有读写分离,需要占用较多的cpu等资源。而这种大数据量导入会在用户迁移的时候遇到,所以需要实现基于spark集群的导入功能,利用spark集群的并发能力,完成导入时的ETL计算,排序、聚合等等,满足用户大数据量导入需求,降低用户导入时间和迁移成本。

方案:复用Broker Load的功能,增加Spark预处理数据的功能

  • Spark ETL:负责导入过程中数据的ETL过程,包括全局字典构建(BITMAP)、分区、排序、聚合等。
  • Broker:独立的无状态进程,封装了文件系统接口,提供读取远端存储系统中文件的能力
  • 全局字典:保存了原始值到编码值映射结果的数据结构

详细流程:

  1. 用户创建Spark Load任务
  2. FE调度并提交ETL任务到Spark集群执行
  3. Spark集群(本地/YARN)执行ETL任务,完成对导入数据的预处理
  4. ETL任务完成后,FE获取预处理后每个分片的数据路径,调度相关的BE推送任务
  5. BE通过Broker读取数据,并将数据转换为Doris的存储格式
  6. FE调度将最新导入的数据设置为有效版本,完成导入任务

【数据导出】

export导出

原理

用户提交一个 Export 作业后。 Doris 会统计这个作业涉及的所有 Tablet。然后对这些Tablet 进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中,也可以通过 S3 协议直接导出到支持 S3 协议的远端存储上。

调度方式

image.png

image.png

查询计划拆分

image.png

查询计划执行

image.png
image.png
先导出到临时目录,然后在rename移动。

基本语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);

image.png

查询作业导出状态:
**show export**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SHOW EXPORT
***********
JobId: 14008
Label: mylabel
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":["*"],"exec mem
limit":2147483648,"column separator":",","line
delimiter":"\n","tablet num":1,"broker":"hdfs","coord
num":1,"db":"default_cluster:db1","tbl":"tbl3"}
Path: bos://bj-test-cmy/export/
CreateTime: 2019-06-25 17:08:24
StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
Timeout: 3600
ErrorMsg: N/A

查询结果导出

**SELECT INTO OUTFILE** 语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS, S3, BOS, COS(腾讯云)上。

语法

1
2
3
4
query_stmt
INTO OUTFILE "file_path"
[format_as]
[properties]

【P87】

并发导出

image.png