Pipe(数据管道)
Pipe 是 Lakehouse 的持续数据导入对象,通过 SQL DDL 创建后自动运行,持续从 Kafka 或对象存储(OSS/COS/S3)读取数据并写入目标表,无需人工触发。
类比:Pipe 像一条自动传送带——数据文件上传到 OSS 子目录,或消息写入 Kafka 后,Pipe 自动检测并入库。与定时任务不同,Pipe 是常驻运行的,文件上传后约 30 秒内即可完成导入。
与 Studio 同步任务的区别
| 维度 | Pipe | Studio 同步任务 |
|---|
| 创建方式 | SQL DDL | Studio 可视化界面 |
| 适用数据源 | Kafka、OSS/COS/S3 | 关系型数据库、Kafka、对象存储 |
| 管理方式 | SQL 命令 | Studio 界面 |
| 适合人群 | 习惯 SQL、代码化管理 | 偏好可视化配置 |
两者功能等价,根据使用习惯选择。
Pipe 类型
对象存储 Pipe(OSS/COS/S3)
持续扫描对象存储中的新文件并导入,支持两种模式:
| 模式 | 触发方式 | 源文件处理 | 额外配置 |
|---|
LIST_PURGE
LIST_PURGE | 定期轮询扫描(约 30 秒) | 导入后永久删除源文件 | 无需额外配置 |
EVENT_NOTIFICATION
EVENT_NOTIFICATION | 对象存储事件通知(近实时) | 保留源文件 | 需配置 MNS 消息队列;仅支持 OSS 和 S3 |
⚠️
LIST_PURGE
LIST_PURGE
模式导入成功后会
永久删除 OSS 中的源文件,不可恢复。如需保留文件,使用
EVENT_NOTIFICATION
EVENT_NOTIFICATION
模式。
⚠️ Volume 的
LOCATION
LOCATION
必须指向具体子目录(如
oss://bucket/data/
oss://bucket/data/
),不能是 bucket 根路径(
oss://bucket/
oss://bucket/
),否则 Pipe 创建时会报错。
-- 第一步:创建存储连接和 Volume(必须指向子目录)
CREATE STORAGE CONNECTION my_oss_conn
TYPE OSS
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'
ACCESS_ID = 'your-access-id'
ACCESS_KEY = 'your-access-key';
CREATE EXTERNAL VOLUME orders_vol
LOCATION 'oss://my-bucket/orders_incoming/'
USING CONNECTION my_oss_conn
DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE)
RECURSIVE = TRUE;
-- 第二步:创建目标表
CREATE TABLE IF NOT EXISTS orders (
order_id INT,
amount DECIMAL(10,2),
status STRING
);
-- 第三步:创建 Pipe
CREATE PIPE orders_oss_pipe
VIRTUAL_CLUSTER = 'default'
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO orders
FROM VOLUME orders_vol (order_id INT, amount DECIMAL(10,2), status STRING)
USING CSV OPTIONS('header' = 'true')
PURGE = TRUE;
去重机制:Pipe 通过
load_history
load_history
记录已导入的文件路径,相同路径的文件只会导入一次,即使重新上传也不会触发重复导入。记录保留 7 天。
-- 查看已导入文件记录
SELECT * FROM load_history('orders');
-- 返回:file_path、last_copy_time、file_size、status、first_error_message
Kafka Pipe
创建持久消费者组,按批次间隔从 Kafka Topic 持续拉取数据写入表。
⚠️ 一个 Pipe 只能消费一个 Kafka Topic。消费同一 Topic 的多个 Pipe 必须使用不同的
group_id
group_id
。
CREATE PIPE kafka_orders_pipe
VIRTUAL_CLUSTER = 'default'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO orders_raw
FROM (
SELECT CAST(value AS STRING) AS raw_msg
FROM TABLE(READ_KAFKA(
'kafka-host:9092', -- bootstrap.servers,填 2~3 个 broker 地址即可
'orders_topic', -- topic 名称
'', -- topic pattern(暂不支持,留空)
'pipe_orders_group', -- group_id,同 Topic 的不同 Pipe 必须不同
'', '', '', '', -- 起止位点和时间戳,Pipe 自动管理,留空
'raw', 'raw', -- key/value 格式
0, map() -- 最大错误数,额外 Kafka 配置(如 SSL/SASL)
))
);
Kafka Pipe 完整参数
| 参数 | 必填 | 默认值 | 说明 |
|---|
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER | 是 | — | 执行 COPY 作业的计算集群 |
INITIAL_DELAY_IN_SECONDS
INITIAL_DELAY_IN_SECONDS | 否 | 0 | 首个作业调度延迟(秒) |
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS | 否 | 60 | 批处理间隔时间(秒) |
BATCH_SIZE_PER_KAFKA_PARTITION
BATCH_SIZE_PER_KAFKA_PARTITION | 否 | 500000 | 每个 Kafka 分区的批处理最大消息数 |
MAX_SKIP_BATCH_COUNT_ON_ERROR
MAX_SKIP_BATCH_COUNT_ON_ERROR | 否 | 30 | 出错时跳过批次的最大重试次数 |
RESET_KAFKA_GROUP_OFFSETS
RESET_KAFKA_GROUP_OFFSETS | 否 | none | 启动时 Kafka 初始点位。可选值:none
none (无操作)、valid
valid (重置过期点位)、earliest
earliest 、latest
latest 、${TIMESTAMP_MILLISECONDS}
${TIMESTAMP_MILLISECONDS} |
COPY_JOB_HINT
COPY_JOB_HINT | 否 | — | 保留参数。支持 IGNORE_TMP_FILE
IGNORE_TMP_FILE (默认 true
true ),过滤 .
. 或 _temporary
_temporary 开头的文件 |
load_history 函数
查看表的 COPY 作业导入文件历史,记录保留 7 天。Pipe 通过
load_history
load_history
避免重复导入相同文件。
-- 查看已导入文件记录
SELECT * FROM load_history('schema_name.table_name');
-- 返回字段:file_path、last_copy_time、file_size、status、first_error_message
ALTER PIPE 完整语法
每次只能修改一个属性,如需修改多个属性需多次执行:
ALTER PIPE pipe_name SET
[VIRTUAL_CLUSTER = 'vc_name']
| [BATCH_INTERVAL_IN_SECONDS = '60']
| [BATCH_SIZE_PER_KAFKA_PARTITION = '500000']
| [MAX_SKIP_BATCH_COUNT_ON_ERROR = '30']
| [COPY_JOB_HINT = '{"cz.mapper.kafka.message.size": "2000000"}'];
DESC PIPE 字段说明
DESC PIPE
DESC PIPE
返回 key-value 格式,关键字段:
| 字段 | 说明 |
|---|
pipe_status
pipe_status | RUNNING
RUNNING / PAUSED
PAUSED / INVALID
INVALID |
pipe_kind
pipe_kind | VOLUME
VOLUME (对象存储)或 KAFKA
KAFKA |
properties
properties | 显示 ingest_mode
ingest_mode 和 virtual_cluster
virtual_cluster 配置 |
input_name
input_name | 数据来源,格式为 volume:catalog.schema.volume_name
volume:catalog.schema.volume_name 或 kafka_table_stream:workspace.schema.stream
kafka_table_stream:workspace.schema.stream |
output_name
output_name | 目标表全路径 catalog.schema.table
catalog.schema.table |
invalid_reason
invalid_reason | Pipe 异常时的错误原因,正常时为空 |
pipe_latency
pipe_latency | Kafka Pipe 的消费延迟(offsetLag
offsetLag 为 0 表示无积压) |
DESC PIPE EXTENDED kafka_pipe_stream;
+--------------------+-----------------------------------------------------+
| info_name | info_value |
+--------------------+-----------------------------------------------------+
| name | kafka_pipe_stream |
| pipe_status | RUNNING |
| pipe_kind | KAFKA |
| input_name | kafka_table_stream:workspace.pipe_schema.stream1 |
| output_name | workspace.pipe_schema.sink_table |
| invalid_reason | |
| pipe_latency | {"kafka":{"lags":{"0":0},"offsetLag":0}} |
+--------------------+-----------------------------------------------------+
约束与限制
- 对象存储 Pipe:COPY 语句不支持
FILES
FILES
、REGEXP
REGEXP
、SUBDIRECTORY
SUBDIRECTORY
参数
- 对象存储 Pipe:每个 Pipe 对应独立的 Volume,不同 Pipe 不能共用同一个 Volume
- Kafka Pipe:一个 Pipe 中只能有一个
READ_KAFKA
READ_KAFKA
函数
COPY 语句
COPY 语句
创建后不可修改,如需调整导入逻辑,删除 Pipe 后重新创建
-- 查看所有 Pipe(含状态、类型、VCluster)
SHOW PIPES;
-- 查看 Pipe 详情(key-value 格式,含 pipe_status、input_name、output_name 等)
DESC PIPE orders_oss_pipe;
-- 立即触发一次扫描(不等待下一轮检测周期)
ALTER PIPE orders_oss_pipe REFRESH;
-- 暂停 Pipe
ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = TRUE;
-- 恢复 Pipe
ALTER PIPE orders_oss_pipe SET PIPE_EXECUTION_PAUSED = FALSE;
-- 删除 Pipe(不影响目标表数据)
DROP PIPE orders_oss_pipe;
DESC PIPE
DESC PIPE
关键字段说明:
| 字段 | 说明 |
|---|
pipe_status
pipe_status | RUNNING
RUNNING / PAUSED
PAUSED |
pipe_kind
pipe_kind | VOLUME
VOLUME (对象存储)或 KAFKA
KAFKA |
properties
properties | 显示 ingest_mode
ingest_mode 和 virtual_cluster
virtual_cluster 配置 |
input_name
input_name | 数据来源,格式为 volume:catalog.schema.volume_name
volume:catalog.schema.volume_name |
output_name
output_name | 目标表全路径 catalog.schema.table
catalog.schema.table |
invalid_reason
invalid_reason | Pipe 异常时的错误原因,正常时为空 |
查看 Pipe 执行历史:在作业历史中按
query_tag
query_tag
过滤,格式为
pipe.workspace_name.schema_name.pipe_name
pipe.workspace_name.schema_name.pipe_name
。
注意事项
- Volume 不能指向 bucket 根路径:
LOCATION
LOCATION
必须是子目录,否则 Pipe 创建时报错
- 每个 Pipe 对应独立的 Volume:不同 Pipe 不能共用同一个 Volume
- COPY 语句不可修改:如需调整导入逻辑,删除 Pipe 后重新创建
- 数据加载无法保证严格有序
- 文件大小建议:gzip 压缩文件建议 50MB 以内;CSV/Parquet 未压缩文件建议 128MB~256MB
相关文档