数据管道与变更捕获

本章包含两类对象:Pipe(持续数据导入)和 Table Stream(变更数据捕获)。Pipe 负责将外部文件或消息流自动写入表;Table Stream 记录表上的增量变更,供下游增量计算消费。

Pipe 是 Lakehouse 的持续数据导入对象,创建后自动运行,持续从对象存储(OSS/COS/S3)或 Kafka 读取数据写入目标表,无需人工触发。

类比:Pipe 像一条持续运转的传送带——文件上传到 OSS 子目录后,Pipe 约 30 秒内自动检测并入库;Kafka 消息写入后,Pipe 按批次间隔持续消费写入。与定时任务不同,Pipe 是常驻运行的,新数据到达即处理。


Pipe 类型

类型数据源检测延迟典型场景
对象存储 PipeOSS / COS / S3~30 秒定期上传的 CSV/Parquet/JSON 文件自动入库
Kafka PipeKafka Topic按批次间隔(默认 60 秒)日志、业务事件实时写入

Pipe 与 Studio 同步任务功能等价,区别在于:Pipe 通过 SQL DDL 创建和管理,适合代码化管理数据管道;Studio 同步任务通过可视化界面配置,支持更多数据源(含关系型数据库)。


我要从对象存储持续导入

前置步骤:创建 Storage Connection → 创建 External Volume(必须指向具体子目录,不能是 bucket 根路径) → 创建目标表 → 创建 Pipe。

-- 第一步:创建存储连接 CREATE STORAGE CONNECTION my_oss_conn TYPE OSS ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com' ACCESS_ID = '...' ACCESS_KEY = '...'; -- 第二步:创建 External Volume(指向具体子目录) CREATE EXTERNAL VOLUME orders_pipe_vol LOCATION 'oss://my-bucket/orders_incoming/' USING CONNECTION my_oss_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE) RECURSIVE = TRUE; -- 第三步:创建目标表 CREATE TABLE IF NOT EXISTS orders ( order_id INT, amount DECIMAL(10,2), status STRING ); -- 第四步:创建 Pipe CREATE PIPE orders_oss_pipe VIRTUAL_CLUSTER = 'default' INGEST_MODE = 'LIST_PURGE' AS COPY INTO orders FROM VOLUME orders_pipe_vol (order_id INT, amount DECIMAL(10,2), status STRING) USING CSV OPTIONS('header' = 'true') PURGE = TRUE;

Pipe 创建后立即开始运行,约 30 秒检测一次 Volume 中的新文件。

两种导入模式

模式触发方式源文件处理适用场景
LIST_PURGE
LIST_PURGE
定期轮询扫描(约 30 秒)导入后删除源文件配置简单,适合大多数场景
EVENT_NOTIFICATION
EVENT_NOTIFICATION
对象存储事件通知(近实时)保留源文件需要保留原始文件;仅支持 OSS 和 S3

去重机制

Pipe 通过

load_history
load_history
记录已导入的文件路径。相同文件路径只导入一次,即使重新上传同一文件也不会重复导入。
load_history
load_history
记录保留 7 天。

-- 查看已导入文件记录 SELECT * FROM load_history('orders'); -- 结果包含:file_path、last_copy_time、file_size、status、first_error_message


我要从 Kafka 持续消费

Pipe 创建持久消费者组,按批次间隔从 Kafka Topic 拉取数据写入表。

CREATE PIPE kafka_orders_pipe VIRTUAL_CLUSTER = 'default' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO orders_raw FROM ( SELECT CAST(value AS STRING) AS raw_msg FROM TABLE(READ_KAFKA( 'kafka-host:9092', -- bootstrap.servers 'orders_topic', -- topic '', -- topic pattern(暂不支持,留空) 'pipe_orders_group', -- group_id(同 Topic 不同 Pipe 必须不同) '', '', '', '', -- 起止位点和时间戳,Pipe 自动管理 'raw', 'raw', -- key/value 格式 0, map() -- 最大错误数、额外 Kafka 配置 )) );


监控和管理

-- 查看所有 Pipe(含状态、类型、VCluster) SHOW PIPES; -- 查看 Pipe 详情 DESC PIPE orders_oss_pipe; -- 立即触发扫描(不等待下一轮检测周期) ALTER PIPE orders_oss_pipe REFRESH; -- 暂停 Pipe ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = TRUE; -- 恢复 Pipe ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = FALSE; -- 删除 Pipe(不影响目标表数据) DROP PIPE orders_oss_pipe;

DESC PIPE
DESC PIPE
输出的关键字段:

字段说明
pipe_status
pipe_status
RUNNING
RUNNING
/
PAUSED
PAUSED
pipe_kind
pipe_kind
VOLUME
VOLUME
(对象存储)或
KAFKA
KAFKA
properties
properties
ingest_mode、vcluster 等配置
input_name
input_name
数据来源(Volume 或 Kafka Topic)
output_name
output_name
目标表全路径
invalid_reason
invalid_reason
Pipe 异常时的错误原因

查看 Pipe 执行历史:在作业历史中按

query_tag
query_tag
过滤,格式为
pipe.workspace_name.schema_name.pipe_name
pipe.workspace_name.schema_name.pipe_name


注意事项

  • Volume 必须指向子目录
    LOCATION
    LOCATION
    不能是 bucket 根路径,否则 Pipe 创建报错
  • 每个 Pipe 对应独立的 Volume:不同 Pipe 不能共用同一个 Volume
  • COPY 语句不可修改:如需调整导入逻辑,删除 Pipe 后重新创建
  • 数据加载无法保证严格有序
  • 文件大小建议:gzip 压缩文件建议 50MB 以内;CSV/Parquet 未压缩文件建议 128MB~256MB
  • EVENT_NOTIFICATION 模式需要额外配置 MNS 消息队列,且只支持阿里云 OSS 和 AWS S3,并且必须使用 RoleARN 方式授权

相关文档

联系我们
预约咨询
微信咨询
电话咨询