Lakehouse 持续数据导入指南(Pipe)
概述
Pipe 是云器 Lakehouse 提供的持续数据摄取管道,支持从 Kafka 或对象存储(OSS/S3/COS)自动、持续地将数据导入到 Lakehouse 表中。无需手动配置调度任务,Pipe 会保持消费位点并持续运行。本指南按业务场景分类,帮助你快速掌握 Pipe 的创建与监控方法。
涉及的 SQL 命令
| 命令 | 用途 | 适用场景 |
|---|
CREATE PIPE ... AS COPY INTO ... FROM READ_KAFKA(...)
CREATE PIPE ... AS COPY INTO ... FROM READ_KAFKA(...) | 创建 Kafka 管道 | 实时日志、消息队列接入 |
CREATE PIPE ... AS COPY INTO ... FROM VOLUME ...
CREATE PIPE ... AS COPY INTO ... FROM VOLUME ... | 创建 Volume 管道 | 对象存储文件持续导入 |
SHOW PIPES
SHOW PIPES | 查看管道列表 | 监控 Pipe 状态 |
DESC PIPE
DESC PIPE | 查看管道详情 | 检查配置和运行信息 |
DROP PIPE
DROP PIPE | 删除管道 | 清理不再使用的管道 |
前置准备
以下示例使用模拟的 Kafka 主题和 Volume 路径:
-- 创建目标表(Kafka 导入)
CREATE TABLE IF NOT EXISTS kafka_logs (
log_time TIMESTAMP,
level STRING,
message STRING
);
-- 创建目标表(Volume 导入)
CREATE TABLE IF NOT EXISTS oss_data (
id INT,
value STRING
);
创建 Kafka Pipe
使用
CREATE PIPE
CREATE PIPE
配合
READ_KAFKA
READ_KAFKA
函数,定义从 Kafka 持续导入的逻辑。
-- 创建 Kafka Pipe
CREATE PIPE pipe_kafka_logs
VIRTUAL_CLUSTER = 'default'
BATCH_INTERVAL_IN_SECONDS = '30'
AS
COPY INTO kafka_logs
FROM (
SELECT
CAST(PARSE_JSON(VALUE::STRING)['time'] AS TIMESTAMP) as log_time,
PARSE_JSON(VALUE::STRING)['level']::STRING as level,
PARSE_JSON(VALUE::STRING)['message']::STRING as message
FROM READ_KAFKA(
'kafka-broker:9092', -- bootstrap_servers
'app_logs', -- topic
'', -- topic_prefix
'lakehouse_consumer', -- group_id
'', '', '', '', -- 偏移量参数(Pipe 自动管理)
'raw', 'raw', -- key/value format
0, -- max_errors
map() -- kafka_configs
)
);
参数说明:
创建 OSS Pipe
使用
CREATE PIPE
CREATE PIPE
配合 Volume,定义从对象存储持续导入文件的逻辑。
-- 创建 OSS Pipe
CREATE PIPE pipe_oss_data
VIRTUAL_CLUSTER = 'default'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO oss_data
FROM VOLUME my_oss_volume
USING CSV OPTIONS ('header' = 'true');
运行模式:
- LIST_PURGE(默认):扫描 Volume 中的文件,导入后自动删除。
- LIST:扫描并导入,但保留源文件。
查看 Pipe 状态
使用
SHOW PIPES
SHOW PIPES
查看所有 Pipe 的运行状态。
-- 查看 Pipe 列表
SHOW PIPES LIKE 'pipe_kafka_logs';
关键字段说明:
status
status
:运行状态(RUNNING / SUSPENDED / FAILED)
pipe_kind
pipe_kind
:管道类型(READ_KAFKA / VOLUME)
copy_statement
copy_statement
:底层的 COPY INTO 语句
手动触发执行
Pipe 默认按
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
自动运行,也可通过
ALTER PIPE
ALTER PIPE
手动触发。
-- 立即执行一次数据导入
ALTER PIPE pipe_kafka_logs EXECUTE;
💡 提示:手动触发适用于调试或补数据场景,不影响自动调度周期。
删除 Pipe
使用
DROP PIPE
DROP PIPE
删除不再需要的管道。
-- 删除 Pipe
DROP PIPE pipe_kafka_logs;
💡 提示:删除 Pipe 不会删除已导入到目标表的数据。
清理测试数据
完成 Pipe 验证后,建议清理测试表:
-- 删除测试表
DROP TABLE IF EXISTS kafka_logs;
DROP TABLE IF EXISTS oss_data;
DROP PIPE IF EXISTS pipe_kafka_logs;
DROP PIPE IF EXISTS pipe_oss_data;
💡 提示:Lakehouse 支持
UNDROP TABLE
UNDROP TABLE
,误删后可在保留期内恢复。
注意事项
- 消费位点管理:Kafka Pipe 自动管理消费者组位点,重启或故障恢复后从上次位点继续消费。
- 错误处理:通过
max_errors
max_errors
参数控制容忍的错误记录数,超过阈值 Pipe 会暂停并报错。
- VCluster 选择:建议使用
INTEGRATION
INTEGRATION
或 GENERAL
GENERAL
类型的 VCluster 运行 Pipe。
- 权限要求:创建 Pipe 需要对目标表具有
INSERT
INSERT
权限,对 Kafka/Volume 具有读取权限。
- 幂等性:Pipe 默认保证至少一次(At-Least-Once)语义,目标表建议设计主键或使用
MERGE INTO
MERGE INTO
去重。
相关文档