Volume + Pipe + Dynamic Table 端到端实践
"数据湖加速"是指在不搬迁数据的前提下,通过对象存储挂载(Volume)、持续数据导入(Pipe)和增量计算(Dynamic Table)三大能力,用 Serverless 算力直接查询、加工和消费对象存储中的文件数据,替代传统 Spark/Hive ETL 和 Presto/Trino Ad hoc 查询。
适用场景:
文件自动入库 :定期上传到 OSS/COS/S3 的 CSV/Parquet 文件,Pipe 自动检测并入库,无需人工触发
增量 ETL :文件入库后,Dynamic Table 自动增量计算聚合指标,T+1 报表生成无延迟
存量数据激活 :已有大量历史文件在对象存储中,通过 Volume 挂载即可直接查询,无需数据迁移
核心数据流:
OSS/COS/S3 文件 → External Volume (挂载) → Pipe (持续导入) → 目标表 → Dynamic Table (增量聚合)
↕ ↕
COPY INTO/SELECT FROM COPY INTO/SELECT FROM
核心概念
对象 说明 类比 外部 Volume 挂载 OSS/COS/S3 路径,零复制访问 Lakehouse 的"文件系统" Pipe 持续运行的数据导入管道,自动检测新文件 传送带——文件上传即入库 Dynamic Table 自动增量刷新的物化聚合表 替代定时 ETL 任务
三者配合形成一个 自驱动数据管道 :文件上传 → 自动入库 → 自动聚合,全程无需人工调度。
涉及的 SQL 命令
命令 / 函数 用途 适用场景 CREATE STORAGE CONNECTIONCREATE STORAGE CONNECTION
建立对象存储认证通道 一次性配置,所有 Volume 共享 CREATE EXTERNAL VOLUMECREATE EXTERNAL VOLUME
挂载对象存储路径到 Schema 每个 Bucket 子目录配一次 COPY INTO VOLUMECOPY INTO VOLUME
导出数据到 Volume 生成文件供下游消费 SELECT FROM VOLUMESELECT FROM VOLUME
直接查询 Volume 中的文件 临时查询、数据探查 DIRECTORY()DIRECTORY()
列出 Volume 中的文件 查看文件列表、验证导出 ALTER VOLUME REFRESHALTER VOLUME REFRESH
手动刷新 Volume 目录缓存 AUTO_REFRESH=FALSEAUTO_REFRESH=FALSE
时使用CREATE PIPECREATE PIPE
创建持续数据导入管道 文件自动入库 ALTER PIPEALTER PIPE
暂停/恢复 Pipe 运维操作 DESC PIPE EXTENDEDDESC PIPE EXTENDED
查看 Pipe 状态和配置 监控、排障 load_history()load_history()
查询表的历史加载记录 验证 Pipe 加载、排查去重 CREATE DYNAMIC TABLECREATE DYNAMIC TABLE
创建自动增量刷新的聚合表 替代定时 ETL 任务 REFRESH DYNAMIC TABLEREFRESH DYNAMIC TABLE
手动触发 Dynamic Table 刷新 首次创建后立即刷新 SHOW DYNAMIC TABLE REFRESH HISTORYSHOW DYNAMIC TABLE REFRESH HISTORY
查看刷新历史 监控增量刷新状态
前置准备
以下以阿里云 OSS 为例,使用
semantic_model_testsemantic_model_test
Schema 和
DEFAULTDEFAULT
VCluster 完成完整链路。
⚠️ 前置条件 :OSS Bucket 已创建,并持有 AccessKey ID / AccessKey Secret。VCluster 需处于 RUNNING 状态(Serverless 模式下查询会自动唤醒)。
端到端实践
步骤一:创建 Storage Connection
建立 Lakehouse 与 OSS 之间的认证通道。
-- 创建 OSS 存储连接
CREATE STORAGE CONNECTION IF NOT EXISTS my_oss_conn
TYPE OSS
access_id = '<your_access_key_id>'
access_key = '<your_access_key_secret>'
ENDPOINT = 'oss-cn-shanghai.aliyuncs.com';
参数注意 :阿里云 OSS 使用小写
access_idaccess_id
/
access_keyaccess_key
。大写形式
ACCESS_KEY_IDACCESS_KEY_ID
/
ACCESS_KEY_SECRETACCESS_KEY_SECRET
也可用。不要使用
ACCESS_KEYACCESS_KEY
/
SECRET_KEYSECRET_KEY
(缺少后缀会导致报错)。
步骤二:创建外部 Volume
将 OSS Bucket 子目录挂载为 Lakehouse Volume。
CREATE EXTERNAL VOLUME IF NOT EXISTS my_data_vol
LOCATION 'oss://my-bucket/data/'
USING CONNECTION my_oss_conn
DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = FALSE)
RECURSIVE = TRUE
COMMENT '数据湖加速专用 Volume';
关键参数说明:
参数 说明 LOCATIONLOCATION
OSS 路径,必须指向具体子目录,不能是 bucket 根路径 USING CONNECTIONUSING CONNECTION
引用步骤一创建的 Storage Connection DIRECTORY.ENABLEDIRECTORY.ENABLE
启用目录元数据索引,后续可用 DIRECTORY()DIRECTORY()
函数查询文件列表 AUTO_REFRESHAUTO_REFRESH
设为 TRUETRUE
可自动刷新目录;设为 FALSEFALSE
时需手动 ALTER VOLUME REFRESHALTER VOLUME REFRESH
RECURSIVERECURSIVE
递归扫描子目录
步骤三:创建源表并导出到 Volume
验证 Volume 的双向读写能力。
-- 1. 创建源表并插入测试数据
CREATE TABLE IF NOT EXISTS sales_source (
id BIGINT COMMENT '订单ID',
product STRING COMMENT '商品名',
category STRING COMMENT '品类',
amount DECIMAL(10,2) COMMENT '金额',
dt STRING COMMENT '日期'
) COMMENT '数据湖加速测试源表';
INSERT INTO sales_source VALUES
(1, 'iPhone 15', 'Electronics', 8999.00, '2026-06-01'),
(2, 'MacBook Pro', 'Electronics', 14999.00, '2026-06-01'),
(3, 'AirPods', 'Electronics', 1299.00, '2026-06-01'),
(4, 'Nike Air Max', 'Sports', 899.00, '2026-06-01'),
(5, 'Yoga Mat', 'Sports', 199.00, '2026-06-01');
-- 2. 导出为 CSV 到 Volume
COPY INTO VOLUME my_data_vol
SUBDIRECTORY 'export/'
FROM TABLE sales_source
FILE_FORMAT = (TYPE = CSV);
-- 3. 导出为 Parquet 到 Volume
COPY INTO VOLUME my_data_vol
SUBDIRECTORY 'export/'
FROM TABLE sales_source
FILE_FORMAT = (TYPE = PARQUET);
⚠️ COPY INTO VOLUMECOPY INTO VOLUME
必须带 SUBDIRECTORYSUBDIRECTORY
:不带此子句会报
Syntax error at or near 'FROM'Syntax error at or near 'FROM'
。导出到 Volume 根路径时使用
SUBDIRECTORY '/'SUBDIRECTORY '/'
。
⚠️ 导出语法 :
COPY INTO VOLUMECOPY INTO VOLUME
使用
FILE_FORMAT = (TYPE = CSV/PARQUET)FILE_FORMAT = (TYPE = CSV/PARQUET)
,不是
USING CSVUSING CSV
。
USINGUSING
仅用于
SELECT FROM VOLUMESELECT FROM VOLUME
查询文件。
步骤四:验证 Volume 文件读写
-- 刷新目录缓存(AUTO_REFRESH=FALSE 时需手动刷新)
ALTER VOLUME my_data_vol REFRESH;
-- 查看导出的文件
SELECT relative_path, size, last_modified_time
FROM DIRECTORY(VOLUME my_data_vol)
WHERE relative_path LIKE 'export/%';
-- 直接查询 CSV 文件
SELECT * FROM VOLUME my_data_vol
USING CSV
FILES('export/part00001.csv');
-- 直接查询 Parquet 文件(保留列名)
SELECT id, product, category, amount, dt
FROM VOLUME my_data_vol
USING PARQUET
FILES('export/part00001.parquet');
CSV 与 Parquet 列名差异 :无表头的 CSV 文件列名自动生成为
f0, f1, f2...f0, f1, f2...
;Parquet 文件保留原始列名。如需 CSV 使用原始列名,导入时加
OPTIONS('header'='true')OPTIONS('header'='true')
。
步骤五:创建 Pipe 持续导入
Pipe 持续监控 Volume 中的新文件,自动导入到目标表。
-- 1. 为 Pipe 创建专用 Volume(必须指向独立子目录)
CREATE EXTERNAL VOLUME IF NOT EXISTS pipe_vol
LOCATION 'oss://my-bucket/data/incoming/'
USING CONNECTION my_oss_conn
DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE)
RECURSIVE = TRUE
COMMENT 'Pipe 持续导入专用 Volume';
-- 2. 创建目标表
CREATE TABLE IF NOT EXISTS sales_ods (
id BIGINT COMMENT '订单ID',
product STRING COMMENT '商品名',
category STRING COMMENT '品类',
amount DECIMAL(10,2) COMMENT '金额',
dt STRING COMMENT '日期'
) COMMENT 'ODS 层——Pipe 导入目标表';
-- 3. 创建 Pipe(LIST_PURGE 模式)
CREATE PIPE IF NOT EXISTS sales_pipe
INGEST_MODE = 'LIST_PURGE'
VIRTUAL_CLUSTER = 'DEFAULT'
COMMENT '销售数据持续导入管道'
AS
COPY INTO sales_ods
FROM VOLUME pipe_vol
USING CSV PURGE = TRUE;
⚠️ Pipe 关键约束 :
每个 Pipe 需要独立的 Volume,不能多 Pipe 共用
LOCATIONLOCATION
必须指向具体子目录,不能是 bucket 根路径
LIST_PURGELIST_PURGE
模式导入成功后删除源文件 (不可恢复),如需保留文件使用 EVENT_NOTIFICATIONEVENT_NOTIFICATION
模式
PURGE = TRUEPURGE = TRUE
必须放在 USING <format>USING <format>
之后,不能写在 OPTIONS 里面
Pipe 管理
-- 查看 Pipe 状态
DESC PIPE EXTENDED sales_pipe;
-- 关键字段:pipe_status (RUNNING/PAUSED)、ingest_mode、input_name、output_name
-- 暂停 Pipe(停止扫描新文件)
ALTER PIPE sales_pipe SET PIPE_EXECUTION_PAUSED = TRUE;
-- 恢复 Pipe(重新开始扫描)
ALTER PIPE sales_pipe SET PIPE_EXECUTION_PAUSED = FALSE;
-- 查看已导入的文件记录(7 天保留期)
SELECT * FROM load_history('sales_ods');
-- 返回:file_path、last_copy_time、file_size、status、first_error_message
去重机制 :Pipe 通过
load_historyload_history
按文件路径去重(7 天内),同名文件不会重复导入。如需重新加载同一文件,等待 7 天或改名后上传。
触发 Pipe 加载
Pipe 创建后立即开始运行(约 30 秒轮询一次)。向 Volume 路径写入新文件即可触发加载:
-- 通过 COPY INTO VOLUME 模拟"新文件到达"
COPY INTO VOLUME pipe_vol
SUBDIRECTORY '/'
FROM (SELECT * FROM sales_source WHERE dt = '2026-06-01')
FILE_FORMAT = (TYPE = CSV);
-- 稍等片刻后验证数据已加载
SELECT COUNT(*) FROM sales_ods; -- 应返回 5
⚠️ 暂停期间写入的文件 :Pipe 暂停时写入的文件不会被加载,恢复后会在下一轮扫描中检测到。如果文件名与已加载文件相同,会被去重机制跳过。
步骤六:创建 Dynamic Table 增量消费
基于 Pipe 导入后的表,创建 Dynamic Table 实现自动增量聚合。
-- 开启源表变更跟踪(增量刷新前提)
ALTER TABLE sales_ods SET PROPERTIES ('change_tracking' = 'true');
-- 创建 Dynamic Table,按品类汇总
CREATE OR REPLACE DYNAMIC TABLE sales_summary
REFRESH INTERVAL 1 HOUR vcluster DEFAULT
COMMENT '按品类汇总——增量刷新'
AS
SELECT
category,
COUNT(*) AS order_cnt,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount,
MIN(dt) AS min_date,
MAX(dt) AS max_date
FROM sales_ods
GROUP BY category;
-- 立即触发首次刷新(重置刷新基准时间)
REFRESH DYNAMIC TABLE sales_summary;
-- 查询结果
SELECT * FROM sales_summary ORDER BY category;
刷新频率说明 :
REFRESH INTERVAL 1 HOURREFRESH INTERVAL 1 HOUR
以创建时间为基准计算下次触发,不对齐整点。如需特定时间触发,在目标时间点附近创建,或创建后立即执行
REFRESHREFRESH
重置基准。
查看 DT 刷新历史
SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'sales_summary';
-- 关注字段:state (SUCCEED)、refresh_mode (INCREMENTAL/FULL)、duration、source_tables
完整数据流验证
-- 验证各环节数据一致性
SELECT 'Source' AS stage, COUNT(*) AS rows FROM sales_source
UNION ALL
SELECT 'ODS' AS stage, COUNT(*) AS rows FROM sales_ods
UNION ALL
SELECT 'Summary' AS stage, COUNT(*) AS rows FROM sales_summary;
Stage 数据量 说明 Source 5 行 原始数据(INSERT) ODS 5 行 Pipe 导入(CSV → 表) Summary 3 行 Dynamic Table 聚合(按品类 3 组)
最佳实践
文件大小建议
格式 建议大小 说明 gzip 压缩 ~50 MB 单文件过大影响并行度 CSV 未压缩 128-256 MB 平衡扫描速度和文件数量 Parquet 未压缩 128-256 MB 列式存储,查询更高效
Volume 与 Pipe 设计原则
每个 Pipe 独立 Volume :不同 Pipe 不能共用同一 Volume,避免互相干扰
Volume 指向子目录 :不要指向 bucket 根路径,否则 Pipe 创建报错
LIST_PURGE vs EVENT_NOTIFICATION :
LIST_PURGELIST_PURGE
:配置简单,适合大多数场景,加载后删除源文件
EVENT_NOTIFICATIONEVENT_NOTIFICATION
:低延迟,保留源文件,但仅支持 OSS+S3,且需要额外配置 MNS/SQS
Dynamic Table 设计原则
使用 GP 型 VCluster (如 DEFAULTDEFAULT
):GP 型支持小文件合并,AP 型不支持
开启 change_tracking :源表未开启时 DT 每次全量刷新,无法增量
创建后立即 REFRESH :确保首次数据可用,同时重置刷新基准时间
数据生命周期
文件上传 → Pipe 扫描 → COPY INTO 入库 → PURGE 删除 → Dynamic Table 增量刷新
↓ ↓ ↓ ↓ ↓
OSS 写入 30 秒轮询 load_history 记录 源文件删除 聚合更新
测试验证结果
以下结果来自阿里云上海实例(
f8866243f8866243
)实际测试:
测试项 结果 细节 Storage Connection 创建 ✅ OSS 连接正常 External Volume 挂载 ✅ 目录访问正常,AUTO_REFRESH=FALSEAUTO_REFRESH=FALSE
需手动刷新 SELECT FROM VOLUME (CSV) ✅ 无 header 时列名 f0-f4,Parquet 保留列名 SELECT FROM VOLUME (Parquet) ✅ 列名和类型均保留 COPY INTO TABLE (CSV) ✅ 5 行数据正确导入 COPY INTO TABLE (Parquet) ✅ 5 行数据正确导入 COPY INTO VOLUME 导出 ⚠️ 必须带 SUBDIRECTORYSUBDIRECTORY
,否则报 syntax errorPipe LIST_PURGE 创建 ✅ 状态立即变 RUNNING Pipe 加载触发 ✅ 约 30 秒后自动加载,load_history 记录完整 Pipe PURGE 删除 ✅ 加载成功后源文件自动删除 Pipe 暂停/恢复 ✅ 暂停期间文件不加载;恢复后重新扫描 Pipe 去重 ✅ 同名文件被 load_history 正确拦截(7 天保留) Dynamic Table 增量刷新 ✅ INCREMENTAL 模式,346ms 完成聚合
注意事项
注意点 影响 建议 COPY INTO VOLUMECOPY INTO VOLUME
必须带 SUBDIRECTORYSUBDIRECTORY
不加会报语法错误 根路径用 SUBDIRECTORY '/'SUBDIRECTORY '/'
CSV 列名泛型化 无 header 时列名为 f0-f4 用 OPTIONS('header'='true')OPTIONS('header'='true')
或改用 Parquet AUTO_REFRESH=FALSEAUTO_REFRESH=FALSE
时需手动刷新Directory 不更新 执行 ALTER VOLUME name REFRESHALTER VOLUME name REFRESH
Pipe 同名文件去重 暂停恢复后同名文件不加载 改名上传,或等 7 天过期 load_historyload_history
列名last_copy_timelast_copy_time
而非 last_load_timelast_load_time
查询时注意列名 VCluster 自动休眠 无查询时 60 秒挂起 Serverless 模式下按需付费,无需担心 Pipe COPY 语句不可修改 需调整逻辑时 DROP PIPE 后重新 CREATE AP 型 VCluster 不支持小文件合并 长期查询性能下降 始终使用 GP 型(DEFAULTDEFAULT
)
相关文档