数据管道与变更捕获
本章包含两类对象:Pipe(持续数据导入)和 Table Stream(变更数据捕获)。Pipe 负责将外部文件或消息流自动写入表;Table Stream 记录表上的增量变更,供下游增量计算消费。
Pipe 是 Lakehouse 的持续数据导入对象,创建后自动运行,持续从对象存储(OSS/COS/S3)或 Kafka 读取数据写入目标表,无需人工触发。
类比:Pipe 像一条持续运转的传送带——文件上传到 OSS 子目录后,Pipe 约 30 秒内自动检测并入库;Kafka 消息写入后,Pipe 按批次间隔持续消费写入。与定时任务不同,Pipe 是常驻运行的,新数据到达即处理。

Pipe 类型
| 类型 | 数据源 | 检测延迟 | 典型场景 |
|---|
| 对象存储 Pipe | OSS / COS / S3 | ~30 秒 | 定期上传的 CSV/Parquet/JSON 文件自动入库 |
| Kafka Pipe | Kafka Topic | 按批次间隔(默认 60 秒) | 日志、业务事件实时写入 |
Pipe 与 Studio 同步任务功能等价,区别在于:Pipe 通过 SQL DDL 创建和管理,适合代码化管理数据管道;Studio 同步任务通过可视化界面配置,支持更多数据源(含关系型数据库)。
我要从对象存储持续导入
前置步骤:创建 Storage Connection → 创建 External Volume(必须指向具体子目录,不能是 bucket 根路径) → 创建目标表 → 创建 Pipe。
⚠️ Volume 的
LOCATION
LOCATION
必须指向 OSS 子目录(如
oss://bucket/pipe_data/
oss://bucket/pipe_data/
),不能是 bucket 根路径(
oss://bucket/
oss://bucket/
),否则 Pipe 创建时会报错。
-- 第一步:创建存储连接
CREATE STORAGE CONNECTION my_oss_conn
TYPE OSS
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'
ACCESS_ID = '...'
ACCESS_KEY = '...';
-- 第二步:创建 External Volume(指向具体子目录)
CREATE EXTERNAL VOLUME orders_pipe_vol
LOCATION 'oss://my-bucket/orders_incoming/'
USING CONNECTION my_oss_conn
DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE)
RECURSIVE = TRUE;
-- 第三步:创建目标表
CREATE TABLE IF NOT EXISTS orders (
order_id INT,
amount DECIMAL(10,2),
status STRING
);
-- 第四步:创建 Pipe
CREATE PIPE orders_oss_pipe
VIRTUAL_CLUSTER = 'default'
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO orders
FROM VOLUME orders_pipe_vol (order_id INT, amount DECIMAL(10,2), status STRING)
USING CSV OPTIONS('header' = 'true')
PURGE = TRUE;
Pipe 创建后立即开始运行,约 30 秒检测一次 Volume 中的新文件。
两种导入模式
| 模式 | 触发方式 | 源文件处理 | 适用场景 |
|---|
LIST_PURGE
LIST_PURGE | 定期轮询扫描(约 30 秒) | 导入后删除源文件 | 配置简单,适合大多数场景 |
EVENT_NOTIFICATION
EVENT_NOTIFICATION | 对象存储事件通知(近实时) | 保留源文件 | 需要保留原始文件;仅支持 OSS 和 S3 |
⚠️
LIST_PURGE
LIST_PURGE
模式导入成功后会
永久删除 OSS 中的源文件,不可恢复。如需保留文件,使用
EVENT_NOTIFICATION
EVENT_NOTIFICATION
模式。
去重机制
Pipe 通过
load_history
load_history
记录已导入的文件路径。相同文件路径只导入一次,即使重新上传同一文件也不会重复导入。
load_history
load_history
记录保留 7 天。
-- 查看已导入文件记录
SELECT * FROM load_history('orders');
-- 结果包含:file_path、last_copy_time、file_size、status、first_error_message
我要从 Kafka 持续消费
Pipe 创建持久消费者组,按批次间隔从 Kafka Topic 拉取数据写入表。
CREATE PIPE kafka_orders_pipe
VIRTUAL_CLUSTER = 'default'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO orders_raw
FROM (
SELECT CAST(value AS STRING) AS raw_msg
FROM TABLE(READ_KAFKA(
'kafka-host:9092', -- bootstrap.servers
'orders_topic', -- topic
'', -- topic pattern(暂不支持,留空)
'pipe_orders_group', -- group_id(同 Topic 不同 Pipe 必须不同)
'', '', '', '', -- 起止位点和时间戳,Pipe 自动管理
'raw', 'raw', -- key/value 格式
0, map() -- 最大错误数、额外 Kafka 配置
))
);
⚠️ 一个 Pipe 只能消费一个 Kafka Topic,多个 Topic 需创建多个 Pipe,且每个 Pipe 的
group_id
group_id
必须唯一。
监控和管理
-- 查看所有 Pipe(含状态、类型、VCluster)
SHOW PIPES;
-- 查看 Pipe 详情
DESC PIPE orders_oss_pipe;
-- 立即触发扫描(不等待下一轮检测周期)
ALTER PIPE orders_oss_pipe REFRESH;
-- 暂停 Pipe
ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = TRUE;
-- 恢复 Pipe
ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = FALSE;
-- 删除 Pipe(不影响目标表数据)
DROP PIPE orders_oss_pipe;
DESC PIPE
DESC PIPE
输出的关键字段:
| 字段 | 说明 |
|---|
pipe_status
pipe_status | RUNNING
RUNNING / PAUSED
PAUSED |
pipe_kind
pipe_kind | VOLUME
VOLUME (对象存储)或 KAFKA
KAFKA |
properties
properties | ingest_mode、vcluster 等配置 |
input_name
input_name | 数据来源(Volume 或 Kafka Topic) |
output_name
output_name | 目标表全路径 |
invalid_reason
invalid_reason | Pipe 异常时的错误原因 |
查看 Pipe 执行历史:在作业历史中按
query_tag
query_tag
过滤,格式为
pipe.workspace_name.schema_name.pipe_name
pipe.workspace_name.schema_name.pipe_name
。
注意事项
- Volume 必须指向子目录:
LOCATION
LOCATION
不能是 bucket 根路径,否则 Pipe 创建报错
- 每个 Pipe 对应独立的 Volume:不同 Pipe 不能共用同一个 Volume
- COPY 语句不可修改:如需调整导入逻辑,删除 Pipe 后重新创建
- 数据加载无法保证严格有序
- 文件大小建议:gzip 压缩文件建议 50MB 以内;CSV/Parquet 未压缩文件建议 128MB~256MB
- EVENT_NOTIFICATION 模式需要额外配置 MNS 消息队列,且只支持阿里云 OSS 和 AWS S3,并且必须使用 RoleARN 方式授权
相关文档