Volume + Pipe + Dynamic Table 端到端实践

"数据湖加速"是指在不搬迁数据的前提下,通过对象存储挂载(Volume)、持续数据导入(Pipe)和增量计算(Dynamic Table)三大能力,用 Serverless 算力直接查询、加工和消费对象存储中的文件数据,替代传统 Spark/Hive ETL 和 Presto/Trino Ad hoc 查询。

适用场景:

  • 文件自动入库:定期上传到 OSS/COS/S3 的 CSV/Parquet 文件,Pipe 自动检测并入库,无需人工触发
  • 增量 ETL:文件入库后,Dynamic Table 自动增量计算聚合指标,T+1 报表生成无延迟
  • 存量数据激活:已有大量历史文件在对象存储中,通过 Volume 挂载即可直接查询,无需数据迁移

核心数据流:

OSS/COS/S3 文件 → External Volume (挂载) → Pipe (持续导入) → 目标表 → Dynamic Table (增量聚合) ↕ ↕ COPY INTO/SELECT FROM COPY INTO/SELECT FROM


核心概念

对象说明类比
外部 Volume挂载 OSS/COS/S3 路径,零复制访问Lakehouse 的"文件系统"
Pipe持续运行的数据导入管道,自动检测新文件传送带——文件上传即入库
Dynamic Table自动增量刷新的物化聚合表替代定时 ETL 任务

三者配合形成一个 自驱动数据管道:文件上传 → 自动入库 → 自动聚合,全程无需人工调度。


涉及的 SQL 命令

命令 / 函数用途适用场景
CREATE STORAGE CONNECTION
CREATE STORAGE CONNECTION
建立对象存储认证通道一次性配置,所有 Volume 共享
CREATE EXTERNAL VOLUME
CREATE EXTERNAL VOLUME
挂载对象存储路径到 Schema每个 Bucket 子目录配一次
COPY INTO VOLUME
COPY INTO VOLUME
导出数据到 Volume生成文件供下游消费
SELECT FROM VOLUME
SELECT FROM VOLUME
直接查询 Volume 中的文件临时查询、数据探查
DIRECTORY()
DIRECTORY()
列出 Volume 中的文件查看文件列表、验证导出
ALTER VOLUME REFRESH
ALTER VOLUME REFRESH
手动刷新 Volume 目录缓存
AUTO_REFRESH=FALSE
AUTO_REFRESH=FALSE
时使用
CREATE PIPE
CREATE PIPE
创建持续数据导入管道文件自动入库
ALTER PIPE
ALTER PIPE
暂停/恢复 Pipe运维操作
DESC PIPE EXTENDED
DESC PIPE EXTENDED
查看 Pipe 状态和配置监控、排障
load_history()
load_history()
查询表的历史加载记录验证 Pipe 加载、排查去重
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建自动增量刷新的聚合表替代定时 ETL 任务
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发 Dynamic Table 刷新首次创建后立即刷新
SHOW DYNAMIC TABLE REFRESH HISTORY
SHOW DYNAMIC TABLE REFRESH HISTORY
查看刷新历史监控增量刷新状态

前置准备

以下以阿里云 OSS 为例,使用

semantic_model_test
semantic_model_test
Schema 和
DEFAULT
DEFAULT
VCluster 完成完整链路。


端到端实践

步骤一:创建 Storage Connection

建立 Lakehouse 与 OSS 之间的认证通道。

-- 创建 OSS 存储连接 CREATE STORAGE CONNECTION IF NOT EXISTS my_oss_conn TYPE OSS access_id = '<your_access_key_id>' access_key = '<your_access_key_secret>' ENDPOINT = 'oss-cn-shanghai.aliyuncs.com';

步骤二:创建外部 Volume

将 OSS Bucket 子目录挂载为 Lakehouse Volume。

CREATE EXTERNAL VOLUME IF NOT EXISTS my_data_vol LOCATION 'oss://my-bucket/data/' USING CONNECTION my_oss_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = FALSE) RECURSIVE = TRUE COMMENT '数据湖加速专用 Volume';

关键参数说明:

参数说明
LOCATION
LOCATION
OSS 路径,必须指向具体子目录,不能是 bucket 根路径
USING CONNECTION
USING CONNECTION
引用步骤一创建的 Storage Connection
DIRECTORY.ENABLE
DIRECTORY.ENABLE
启用目录元数据索引,后续可用
DIRECTORY()
DIRECTORY()
函数查询文件列表
AUTO_REFRESH
AUTO_REFRESH
设为
TRUE
TRUE
可自动刷新目录;设为
FALSE
FALSE
时需手动
ALTER VOLUME REFRESH
ALTER VOLUME REFRESH
RECURSIVE
RECURSIVE
递归扫描子目录

步骤三:创建源表并导出到 Volume

验证 Volume 的双向读写能力。

-- 1. 创建源表并插入测试数据 CREATE TABLE IF NOT EXISTS sales_source ( id BIGINT COMMENT '订单ID', product STRING COMMENT '商品名', category STRING COMMENT '品类', amount DECIMAL(10,2) COMMENT '金额', dt STRING COMMENT '日期' ) COMMENT '数据湖加速测试源表'; INSERT INTO sales_source VALUES (1, 'iPhone 15', 'Electronics', 8999.00, '2026-06-01'), (2, 'MacBook Pro', 'Electronics', 14999.00, '2026-06-01'), (3, 'AirPods', 'Electronics', 1299.00, '2026-06-01'), (4, 'Nike Air Max', 'Sports', 899.00, '2026-06-01'), (5, 'Yoga Mat', 'Sports', 199.00, '2026-06-01'); -- 2. 导出为 CSV 到 Volume COPY INTO VOLUME my_data_vol SUBDIRECTORY 'export/' FROM TABLE sales_source FILE_FORMAT = (TYPE = CSV); -- 3. 导出为 Parquet 到 Volume COPY INTO VOLUME my_data_vol SUBDIRECTORY 'export/' FROM TABLE sales_source FILE_FORMAT = (TYPE = PARQUET);

步骤四:验证 Volume 文件读写

-- 刷新目录缓存(AUTO_REFRESH=FALSE 时需手动刷新) ALTER VOLUME my_data_vol REFRESH; -- 查看导出的文件 SELECT relative_path, size, last_modified_time FROM DIRECTORY(VOLUME my_data_vol) WHERE relative_path LIKE 'export/%'; -- 直接查询 CSV 文件 SELECT * FROM VOLUME my_data_vol USING CSV FILES('export/part00001.csv'); -- 直接查询 Parquet 文件(保留列名) SELECT id, product, category, amount, dt FROM VOLUME my_data_vol USING PARQUET FILES('export/part00001.parquet');

步骤五:创建 Pipe 持续导入

Pipe 持续监控 Volume 中的新文件,自动导入到目标表。

-- 1. 为 Pipe 创建专用 Volume(必须指向独立子目录) CREATE EXTERNAL VOLUME IF NOT EXISTS pipe_vol LOCATION 'oss://my-bucket/data/incoming/' USING CONNECTION my_oss_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE) RECURSIVE = TRUE COMMENT 'Pipe 持续导入专用 Volume'; -- 2. 创建目标表 CREATE TABLE IF NOT EXISTS sales_ods ( id BIGINT COMMENT '订单ID', product STRING COMMENT '商品名', category STRING COMMENT '品类', amount DECIMAL(10,2) COMMENT '金额', dt STRING COMMENT '日期' ) COMMENT 'ODS 层——Pipe 导入目标表'; -- 3. 创建 Pipe(LIST_PURGE 模式) CREATE PIPE IF NOT EXISTS sales_pipe INGEST_MODE = 'LIST_PURGE' VIRTUAL_CLUSTER = 'DEFAULT' COMMENT '销售数据持续导入管道' AS COPY INTO sales_ods FROM VOLUME pipe_vol USING CSV PURGE = TRUE;

Pipe 管理

-- 查看 Pipe 状态 DESC PIPE EXTENDED sales_pipe; -- 关键字段:pipe_status (RUNNING/PAUSED)、ingest_mode、input_name、output_name -- 暂停 Pipe(停止扫描新文件) ALTER PIPE sales_pipe SET PIPE_EXECUTION_PAUSED = TRUE; -- 恢复 Pipe(重新开始扫描) ALTER PIPE sales_pipe SET PIPE_EXECUTION_PAUSED = FALSE; -- 查看已导入的文件记录(7 天保留期) SELECT * FROM load_history('sales_ods'); -- 返回:file_path、last_copy_time、file_size、status、first_error_message

触发 Pipe 加载

Pipe 创建后立即开始运行(约 30 秒轮询一次)。向 Volume 路径写入新文件即可触发加载:

-- 通过 COPY INTO VOLUME 模拟"新文件到达" COPY INTO VOLUME pipe_vol SUBDIRECTORY '/' FROM (SELECT * FROM sales_source WHERE dt = '2026-06-01') FILE_FORMAT = (TYPE = CSV); -- 稍等片刻后验证数据已加载 SELECT COUNT(*) FROM sales_ods; -- 应返回 5

步骤六:创建 Dynamic Table 增量消费

基于 Pipe 导入后的表,创建 Dynamic Table 实现自动增量聚合。

-- 开启源表变更跟踪(增量刷新前提) ALTER TABLE sales_ods SET PROPERTIES ('change_tracking' = 'true'); -- 创建 Dynamic Table,按品类汇总 CREATE OR REPLACE DYNAMIC TABLE sales_summary REFRESH INTERVAL 1 HOUR vcluster DEFAULT COMMENT '按品类汇总——增量刷新' AS SELECT category, COUNT(*) AS order_cnt, SUM(amount) AS total_amount, AVG(amount) AS avg_amount, MIN(dt) AS min_date, MAX(dt) AS max_date FROM sales_ods GROUP BY category; -- 立即触发首次刷新(重置刷新基准时间) REFRESH DYNAMIC TABLE sales_summary; -- 查询结果 SELECT * FROM sales_summary ORDER BY category;

查看 DT 刷新历史

SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'sales_summary'; -- 关注字段:state (SUCCEED)、refresh_mode (INCREMENTAL/FULL)、duration、source_tables


完整数据流验证

-- 验证各环节数据一致性 SELECT 'Source' AS stage, COUNT(*) AS rows FROM sales_source UNION ALL SELECT 'ODS' AS stage, COUNT(*) AS rows FROM sales_ods UNION ALL SELECT 'Summary' AS stage, COUNT(*) AS rows FROM sales_summary;

Stage数据量说明
Source5 行原始数据(INSERT)
ODS5 行Pipe 导入(CSV → 表)
Summary3 行Dynamic Table 聚合(按品类 3 组)

最佳实践

文件大小建议

格式建议大小说明
gzip 压缩~50 MB单文件过大影响并行度
CSV 未压缩128-256 MB平衡扫描速度和文件数量
Parquet 未压缩128-256 MB列式存储,查询更高效

Volume 与 Pipe 设计原则

  1. 每个 Pipe 独立 Volume:不同 Pipe 不能共用同一 Volume,避免互相干扰
  2. Volume 指向子目录:不要指向 bucket 根路径,否则 Pipe 创建报错
  3. LIST_PURGE vs EVENT_NOTIFICATION
    • LIST_PURGE
      LIST_PURGE
      :配置简单,适合大多数场景,加载后删除源文件
    • EVENT_NOTIFICATION
      EVENT_NOTIFICATION
      :低延迟,保留源文件,但仅支持 OSS+S3,且需要额外配置 MNS/SQS

Dynamic Table 设计原则

  1. 使用 GP 型 VCluster(如
    DEFAULT
    DEFAULT
    ):GP 型支持小文件合并,AP 型不支持
  2. 开启 change_tracking:源表未开启时 DT 每次全量刷新,无法增量
  3. 创建后立即 REFRESH:确保首次数据可用,同时重置刷新基准时间

数据生命周期

文件上传 → Pipe 扫描 → COPY INTO 入库 → PURGE 删除 → Dynamic Table 增量刷新 ↓ ↓ ↓ ↓ ↓ OSS 写入 30 秒轮询 load_history 记录 源文件删除 聚合更新


测试验证结果

以下结果来自阿里云上海实例(

f8866243
f8866243
)实际测试:

测试项结果细节
Storage Connection 创建OSS 连接正常
External Volume 挂载目录访问正常,
AUTO_REFRESH=FALSE
AUTO_REFRESH=FALSE
需手动刷新
SELECT FROM VOLUME (CSV)无 header 时列名 f0-f4,Parquet 保留列名
SELECT FROM VOLUME (Parquet)列名和类型均保留
COPY INTO TABLE (CSV)5 行数据正确导入
COPY INTO TABLE (Parquet)5 行数据正确导入
COPY INTO VOLUME 导出⚠️必须带
SUBDIRECTORY
SUBDIRECTORY
,否则报 syntax error
Pipe LIST_PURGE 创建状态立即变 RUNNING
Pipe 加载触发约 30 秒后自动加载,load_history 记录完整
Pipe PURGE 删除加载成功后源文件自动删除
Pipe 暂停/恢复暂停期间文件不加载;恢复后重新扫描
Pipe 去重同名文件被 load_history 正确拦截(7 天保留)
Dynamic Table 增量刷新INCREMENTAL 模式,346ms 完成聚合

注意事项

注意点影响建议
COPY INTO VOLUME
COPY INTO VOLUME
必须带
SUBDIRECTORY
SUBDIRECTORY
不加会报语法错误根路径用
SUBDIRECTORY '/'
SUBDIRECTORY '/'
CSV 列名泛型化无 header 时列名为 f0-f4
OPTIONS('header'='true')
OPTIONS('header'='true')
或改用 Parquet
AUTO_REFRESH=FALSE
AUTO_REFRESH=FALSE
时需手动刷新
Directory 不更新执行
ALTER VOLUME name REFRESH
ALTER VOLUME name REFRESH
Pipe 同名文件去重暂停恢复后同名文件不加载改名上传,或等 7 天过期
load_history
load_history
列名
last_copy_time
last_copy_time
而非
last_load_time
last_load_time
查询时注意列名
VCluster 自动休眠无查询时 60 秒挂起Serverless 模式下按需付费,无需担心
Pipe COPY 语句不可修改需调整逻辑时DROP PIPE 后重新 CREATE
AP 型 VCluster 不支持小文件合并长期查询性能下降始终使用 GP 型(
DEFAULT
DEFAULT

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询