Pipe(数据管道)

Pipe 是 Lakehouse 的持续数据导入对象,通过 SQL DDL 创建后自动运行,持续从 Kafka 或对象存储(OSS/COS/S3)读取数据并写入目标表,无需人工触发。

类比:Pipe 像一条自动传送带——数据文件上传到 OSS 子目录,或消息写入 Kafka 后,Pipe 自动检测并入库。与定时任务不同,Pipe 是常驻运行的,文件上传后约 30 秒内即可完成导入。

与 Studio 同步任务的区别

维度PipeStudio 同步任务
创建方式SQL DDLStudio 可视化界面
适用数据源Kafka、OSS/COS/S3关系型数据库、Kafka、对象存储
管理方式SQL 命令Studio 界面
适合人群习惯 SQL、代码化管理偏好可视化配置

两者功能等价,根据使用习惯选择。

Pipe 类型

对象存储 Pipe(OSS/COS/S3)

持续扫描对象存储中的新文件并导入,支持两种模式:

模式触发方式源文件处理额外配置
LIST_PURGE
LIST_PURGE
定期轮询扫描(约 30 秒)导入后永久删除源文件无需额外配置
EVENT_NOTIFICATION
EVENT_NOTIFICATION
对象存储事件通知(近实时)保留源文件需配置 MNS 消息队列;仅支持 OSS 和 S3

-- 第一步:创建存储连接和 Volume(必须指向子目录) CREATE STORAGE CONNECTION my_oss_conn TYPE OSS ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com' ACCESS_ID = 'your-access-id' ACCESS_KEY = 'your-access-key'; CREATE EXTERNAL VOLUME orders_vol LOCATION 'oss://my-bucket/orders_incoming/' USING CONNECTION my_oss_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE) RECURSIVE = TRUE; -- 第二步:创建目标表 CREATE TABLE IF NOT EXISTS orders ( order_id INT, amount DECIMAL(10,2), status STRING ); -- 第三步:创建 Pipe CREATE PIPE orders_oss_pipe VIRTUAL_CLUSTER = 'default' INGEST_MODE = 'LIST_PURGE' AS COPY INTO orders FROM VOLUME orders_vol (order_id INT, amount DECIMAL(10,2), status STRING) USING CSV OPTIONS('header' = 'true') PURGE = TRUE;

去重机制:Pipe 通过

load_history
load_history
记录已导入的文件路径,相同路径的文件只会导入一次,即使重新上传也不会触发重复导入。记录保留 7 天。

-- 查看已导入文件记录 SELECT * FROM load_history('orders'); -- 返回:file_path、last_copy_time、file_size、status、first_error_message

Kafka Pipe

创建持久消费者组,按批次间隔从 Kafka Topic 持续拉取数据写入表。

CREATE PIPE kafka_orders_pipe VIRTUAL_CLUSTER = 'default' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO orders_raw FROM ( SELECT CAST(value AS STRING) AS raw_msg FROM TABLE(READ_KAFKA( 'kafka-host:9092', -- bootstrap.servers,填 2~3 个 broker 地址即可 'orders_topic', -- topic 名称 '', -- topic pattern(暂不支持,留空) 'pipe_orders_group', -- group_id,同 Topic 的不同 Pipe 必须不同 '', '', '', '', -- 起止位点和时间戳,Pipe 自动管理,留空 'raw', 'raw', -- key/value 格式 0, map() -- 最大错误数,额外 Kafka 配置(如 SSL/SASL) )) );

Kafka Pipe 完整参数

参数必填默认值说明
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER
执行 COPY 作业的计算集群
INITIAL_DELAY_IN_SECONDS
INITIAL_DELAY_IN_SECONDS
0首个作业调度延迟(秒)
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
60批处理间隔时间(秒)
BATCH_SIZE_PER_KAFKA_PARTITION
BATCH_SIZE_PER_KAFKA_PARTITION
500000每个 Kafka 分区的批处理最大消息数
MAX_SKIP_BATCH_COUNT_ON_ERROR
MAX_SKIP_BATCH_COUNT_ON_ERROR
30出错时跳过批次的最大重试次数
RESET_KAFKA_GROUP_OFFSETS
RESET_KAFKA_GROUP_OFFSETS
none启动时 Kafka 初始点位。可选值:
none
none
(无操作)、
valid
valid
(重置过期点位)、
earliest
earliest
latest
latest
${TIMESTAMP_MILLISECONDS}
${TIMESTAMP_MILLISECONDS}
COPY_JOB_HINT
COPY_JOB_HINT
保留参数。支持
IGNORE_TMP_FILE
IGNORE_TMP_FILE
(默认
true
true
),过滤
.
.
_temporary
_temporary
开头的文件

load_history 函数

查看表的 COPY 作业导入文件历史,记录保留 7 天。Pipe 通过

load_history
load_history
避免重复导入相同文件。

-- 查看已导入文件记录 SELECT * FROM load_history('schema_name.table_name'); -- 返回字段:file_path、last_copy_time、file_size、status、first_error_message

ALTER PIPE 完整语法

每次只能修改一个属性,如需修改多个属性需多次执行:

ALTER PIPE pipe_name SET [VIRTUAL_CLUSTER = 'vc_name'] | [BATCH_INTERVAL_IN_SECONDS = '60'] | [BATCH_SIZE_PER_KAFKA_PARTITION = '500000'] | [MAX_SKIP_BATCH_COUNT_ON_ERROR = '30'] | [COPY_JOB_HINT = '{"cz.mapper.kafka.message.size": "2000000"}'];

DESC PIPE 字段说明

DESC PIPE
DESC PIPE
返回 key-value 格式,关键字段:

字段说明
pipe_status
pipe_status
RUNNING
RUNNING
/
PAUSED
PAUSED
/
INVALID
INVALID
pipe_kind
pipe_kind
VOLUME
VOLUME
(对象存储)或
KAFKA
KAFKA
properties
properties
显示
ingest_mode
ingest_mode
virtual_cluster
virtual_cluster
配置
input_name
input_name
数据来源,格式为
volume:catalog.schema.volume_name
volume:catalog.schema.volume_name
kafka_table_stream:workspace.schema.stream
kafka_table_stream:workspace.schema.stream
output_name
output_name
目标表全路径
catalog.schema.table
catalog.schema.table
invalid_reason
invalid_reason
Pipe 异常时的错误原因,正常时为空
pipe_latency
pipe_latency
Kafka Pipe 的消费延迟(
offsetLag
offsetLag
为 0 表示无积压)

DESC PIPE EXTENDED kafka_pipe_stream; +--------------------+-----------------------------------------------------+ | info_name | info_value | +--------------------+-----------------------------------------------------+ | name | kafka_pipe_stream | | pipe_status | RUNNING | | pipe_kind | KAFKA | | input_name | kafka_table_stream:workspace.pipe_schema.stream1 | | output_name | workspace.pipe_schema.sink_table | | invalid_reason | | | pipe_latency | {"kafka":{"lags":{"0":0},"offsetLag":0}} | +--------------------+-----------------------------------------------------+

约束与限制

  • 对象存储 Pipe:COPY 语句不支持
    FILES
    FILES
    REGEXP
    REGEXP
    SUBDIRECTORY
    SUBDIRECTORY
    参数
  • 对象存储 Pipe:每个 Pipe 对应独立的 Volume,不同 Pipe 不能共用同一个 Volume
  • Kafka Pipe:一个 Pipe 中只能有一个
    READ_KAFKA
    READ_KAFKA
    函数
  • COPY 语句
    COPY 语句
    创建后不可修改,如需调整导入逻辑,删除 Pipe 后重新创建

-- 查看所有 Pipe(含状态、类型、VCluster) SHOW PIPES; -- 查看 Pipe 详情(key-value 格式,含 pipe_status、input_name、output_name 等) DESC PIPE orders_oss_pipe; -- 立即触发一次扫描(不等待下一轮检测周期) ALTER PIPE orders_oss_pipe REFRESH; -- 暂停 Pipe ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = TRUE; -- 恢复 Pipe ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = FALSE; -- 删除 Pipe(不影响目标表数据) DROP PIPE orders_oss_pipe;

DESC PIPE
DESC PIPE
关键字段说明:

字段说明
pipe_status
pipe_status
RUNNING
RUNNING
/
PAUSED
PAUSED
pipe_kind
pipe_kind
VOLUME
VOLUME
(对象存储)或
KAFKA
KAFKA
properties
properties
显示
ingest_mode
ingest_mode
virtual_cluster
virtual_cluster
配置
input_name
input_name
数据来源,格式为
volume:catalog.schema.volume_name
volume:catalog.schema.volume_name
output_name
output_name
目标表全路径
catalog.schema.table
catalog.schema.table
invalid_reason
invalid_reason
Pipe 异常时的错误原因,正常时为空

查看 Pipe 执行历史:在作业历史中按

query_tag
query_tag
过滤,格式为
pipe.workspace_name.schema_name.pipe_name
pipe.workspace_name.schema_name.pipe_name

注意事项

  • Volume 不能指向 bucket 根路径
    LOCATION
    LOCATION
    必须是子目录,否则 Pipe 创建时报错
  • 每个 Pipe 对应独立的 Volume:不同 Pipe 不能共用同一个 Volume
  • COPY 语句不可修改:如需调整导入逻辑,删除 Pipe 后重新创建
  • 数据加载无法保证严格有序
  • 文件大小建议:gzip 压缩文件建议 50MB 以内;CSV/Parquet 未压缩文件建议 128MB~256MB

相关文档

联系我们
预约咨询
微信咨询
电话咨询