Doris基础知识篇(3):数据导入导出
总结
导入方式 | 方式 | 协议 | 导入执行 | 流程 | |
---|---|---|---|---|---|
Insert INTO | Doris中表数据的转换 | 同步导入 | - | ||
Broker load | 外部数据源(HDFS/S3) +Broker进程 |
异步导入 | Mysql协议 | BE通过Broker拉取数据, 会对数据预处理 |
![]() |
Stream load | 本地文件/数据流 | 同步导入 | HTTP协议 | Coordinator BE将数据分发给Excutor BE | ![]() |
Multi load | 多个导入作业 | 原子性 | HTTP协议 | ||
Routine load | 外部数据源+常驻线程 | 异步导入 | Mysql协议 | 不断生成task,在BE上获取kafka数据,基于一批数据进行Stream Load; | ![]() |
Binlog load | Mysql Binlog数据 | Mysql CDC (Change Data Capture) |
通过Canal进行 | ![]() |
|
DataX | 离线数据源 Reader插件 Writer插件 |
![]() |
|||
Spark Load | 外部数据源(HDFS/S3) +Broker进程 +Spark ETL |
异步导入 | Mysql协议 | 复用Broker Load功能, 增加Spark预处理功能 |
![]() |
【数据导入】
Insert Into
INSERT - 百度数据仓库DORIS | 百度智能云文档
Insert Into 语句的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似。但在 Doris 中,所有的数据写入都是一个独立的导入作业。所以这里将 Insert Into 也作为一种导入方式介绍
语法
1 | INSERT INTO table_name [partition_info] [WITH LABEL label] |
Broker Load
适用场景
- 源数据在Broker可以访问到的系统中(比如HDFS)
- 数据量不是特别大,几十-百GB
原理
语法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
* broker_properties:
(key1=value1, ...)
Label:导入任务标识,一批数据最好用一个避免任务失败无法查询。数据描述类参数 data_desc
negative 取反导入
partition 指定待导入表的partition信息
set column mapping 设置列函数变化
preceding filter predicate 过滤原始数据
where predicate 在 data_desc 中的 WHERE 语句中负责过滤已经完成 transform 的数据
导入作业参数
timeout
1 | 推荐超时时间: |
max_filter_ratio 导入任务的最大容忍率
1 | max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) |
exec_mem_limit 内存限制
strict_mode 对于导入过程中的列类型转换进行严格过滤
merge_type
示例
创建表
1 | create table student_result |
导入
1 | LOAD LABEL test_db.student_result |
查看导入
1 | show load order by createtime desc limit 1\G |
【具体参数含义见PDF】
取消导入
1 | CANCEL LOAD |
Stream Load
Stream load 是一个同步的导入方式,用户通过**发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。 **Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
适用场景
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
目前 Stream Load 支持两个数据格式: CSV(文本)和 JSON。
原理
Stream load 中, Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
语法
1 | curl --location-trusted -u user:passwd [-H ""...] -T data.file - |
具体参数:PDF P63
Routine Load
例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能
适用场景
原理
语法
1 | CREATE ROUTINE LOAD [db.]job_name ON tbl_name |
【具体参数见PDF P67】
示例
1 | CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka |
查看HELP SHOW ROUTINE LOAD
修改HELP ALTER ROUTINE LOAD
作业控制 HELP STOP ROUTINE LOAD
; HELP PAUSE ROUTINE LOAD
; 以及 HELP RESUME ROUTINE LOAD
Binlog Load
Binlog Load 提供了一种使 Doris 增量同步用户在 Mysql 数据库的对数据更新操作的CDC(Change Data Capture) 功能
适用场景
INSERT/UPDATE/DELETE 支持。
过滤 Query。
暂不兼容 DDL 语句。
原理
在第一期的设计中, Binlog Load 需要依赖 canal 作为中间媒介,让 canal 伪造成一个从节点去获取 Mysql 主节点上的 Binlog 并解析,再由 Doris 去获取 Canal 上解析好的数据,主要涉及 Mysql 端、 Canal 端以及 Doris 端,总体数据流向如下:
【具体内容 PDF P75】
语法
1 | CREATE SYNC [db.]job_name |
示例
Spark Load
背景:Doris中现有的导入方式中,针对百G级别以上的数据的批量导入支持不是很好,功能上需要修改很多配置,而且可能无法完成导入,性能上会比较慢,并且由于没有读写分离,需要占用较多的cpu等资源。而这种大数据量导入会在用户迁移的时候遇到,所以需要实现基于spark集群的导入功能,利用spark集群的并发能力,完成导入时的ETL计算,排序、聚合等等,满足用户大数据量导入需求,降低用户导入时间和迁移成本。
方案:复用Broker Load的功能,增加Spark预处理数据的功能
- Spark ETL:负责导入过程中数据的ETL过程,包括全局字典构建(BITMAP)、分区、排序、聚合等。
- Broker:独立的无状态进程,封装了文件系统接口,提供读取远端存储系统中文件的能力
- 全局字典:保存了原始值到编码值映射结果的数据结构
详细流程:
- 用户创建Spark Load任务
- FE调度并提交ETL任务到Spark集群执行
- Spark集群(本地/YARN)执行ETL任务,完成对导入数据的预处理
- ETL任务完成后,FE获取预处理后每个分片的数据路径,调度相关的BE推送任务
- BE通过Broker读取数据,并将数据转换为Doris的存储格式
- FE调度将最新导入的数据设置为有效版本,完成导入任务
【数据导出】
export导出
原理
用户提交一个 Export 作业后。 Doris 会统计这个作业涉及的所有 Tablet。然后对这些Tablet 进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中,也可以通过 S3 协议直接导出到支持 S3 协议的远端存储上。
调度方式
查询计划拆分
查询计划执行
先导出到临时目录,然后在rename移动。
基本语法
1 | EXPORT TABLE db1.tbl1 |
查询作业导出状态:**show export**
1 | SHOW EXPORT |
查询结果导出
**SELECT INTO OUTFILE**
语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS, S3, BOS, COS(腾讯云)上。
语法
1 | query_stmt |
【P87】