Pipe 持续导入

Pipe 是云器 Lakehouse 的持续数据导入管道对象,用于从 Kafka 或对象存储(OSS/S3/COS)持续导入数据到 Lakehouse 表中。

可以把 Pipe 理解为一条自动传送带:数据文件上传到 OSS/S3/COS,或消息写入 Kafka 后,Pipe 自动检测并入库,无需人工触发,也不需要配置定时任务。

选型参考:如果你习惯用 SQL 管理数据管道,选 Pipe;如果需要接入关系型数据库(MySQL/PostgreSQL 等),或者偏好可视化配置,选 Studio 同步任务。

什么是 Pipe

Pipe 是一个 SQL 对象,通过 DDL 语句创建。创建后 Pipe 会持续运行,自动从数据源读取数据并写入目标表。

与 Studio 同步任务的区别

维度PipeStudio 同步任务
创建方式SQL DDLStudio 可视化界面
管理方式SQL 命令Studio 界面 + cz-cli
适用场景熟悉 SQL、需要代码化管理数据管道偏好可视化配置,或需要接入关系型数据库
数据源Kafka、对象存储关系型数据库、Kafka、对象存储

两者功能等价,根据使用习惯选择。

Pipe 的类型

Kafka Pipe

从 Kafka Topic 持续消费数据并写入 Lakehouse 表。

CREATE PIPE kafka_pipe AS COPY INTO orders FROM READ_KAFKA(...) USING JSON;

两种接入路径

  1. READ_KAFKA Pipe(推荐):直接在 Pipe 中使用
    READ_KAFKA()
    READ_KAFKA()
    函数
  2. Kafka 外部表 + Table Stream:先创建 Kafka 外部表,再通过 Table Stream 消费

对象存储 Pipe

从 OSS/S3/COS 持续扫描新文件并导入。

CREATE PIPE oss_pipe VIRTUAL_CLUSTER = 'default' INGEST_MODE = 'LIST_PURGE' AS COPY INTO orders FROM VOLUME my_volume USING CSV PURGE = TRUE;

两种扫描模式对比

维度LIST_PURGEEVENT_NOTIFICATION
触发方式定期轮询扫描目录对象存储事件通知(近实时触发)
支持云OSS、S3、COS仅 OSS、S3
授权方式密钥或 Role ARN仅 Role ARN
源文件处理导入成功后自动删除源文件(需
PURGE = TRUE
PURGE = TRUE
保留源文件
配置复杂度简单,无需额外配置需要配置 MNS 队列

Pipe 生命周期

Create Pipe --> Auto Run --> Continuous Ingestion | | v v Suspend Pipe Monitor Status | v Resume Pipe

监控 Pipe

-- 查看 Pipe 状态 SHOW PIPES; -- 查看 Pipe 详情 DESC PIPE my_pipe;

适用场景

  • Kafka 实时接入:日志数据、业务事件实时写入
  • 对象存储批量导入:定期上传的 CSV/JSON/Parquet 文件自动入库
  • 替代定时任务:无需配置 Cron,Pipe 持续运行

相关文档

联系我们
预约咨询
微信咨询
电话咨询