Lakehouse 持续数据导入指南(Pipe)

概述

Pipe 是云器 Lakehouse 提供的持续数据摄取管道,支持从 Kafka 或对象存储(OSS/S3/COS)自动、持续地将数据导入到 Lakehouse 表中。无需手动配置调度任务,Pipe 会保持消费位点并持续运行。本指南按业务场景分类,帮助你快速掌握 Pipe 的创建与监控方法。

涉及的 SQL 命令

命令用途适用场景
CREATE PIPE ... AS COPY INTO ... FROM READ_KAFKA(...)
CREATE PIPE ... AS COPY INTO ... FROM READ_KAFKA(...)
创建 Kafka 管道实时日志、消息队列接入
CREATE PIPE ... AS COPY INTO ... FROM VOLUME ...
CREATE PIPE ... AS COPY INTO ... FROM VOLUME ...
创建 Volume 管道对象存储文件持续导入
SHOW PIPES
SHOW PIPES
查看管道列表监控 Pipe 状态
DESC PIPE
DESC PIPE
查看管道详情检查配置和运行信息
DROP PIPE
DROP PIPE
删除管道清理不再使用的管道

前置准备

以下示例使用模拟的 Kafka 主题和 Volume 路径:

-- 创建目标表(Kafka 导入) CREATE TABLE IF NOT EXISTS kafka_logs ( log_time TIMESTAMP, level STRING, message STRING ); -- 创建目标表(Volume 导入) CREATE TABLE IF NOT EXISTS oss_data ( id INT, value STRING );


创建 Kafka Pipe

使用

CREATE PIPE
CREATE PIPE
配合
READ_KAFKA
READ_KAFKA
函数,定义从 Kafka 持续导入的逻辑。

-- 创建 Kafka Pipe CREATE PIPE pipe_kafka_logs VIRTUAL_CLUSTER = 'default' BATCH_INTERVAL_IN_SECONDS = '30' AS COPY INTO kafka_logs FROM ( SELECT CAST(PARSE_JSON(VALUE::STRING)['time'] AS TIMESTAMP) as log_time, PARSE_JSON(VALUE::STRING)['level']::STRING as level, PARSE_JSON(VALUE::STRING)['message']::STRING as message FROM READ_KAFKA( 'kafka-broker:9092', -- bootstrap_servers 'app_logs', -- topic '', -- topic_prefix 'lakehouse_consumer', -- group_id '', '', '', '', -- 偏移量参数(Pipe 自动管理) 'raw', 'raw', -- key/value format 0, -- max_errors map() -- kafka_configs ) );

参数说明

  • BATCH_INTERVAL_IN_SECONDS
    BATCH_INTERVAL_IN_SECONDS
    :每 30 秒执行一次批量导入。
  • READ_KAFKA
    READ_KAFKA
    中的偏移量参数留空,由 Pipe 自动管理消费位点。

创建 OSS Pipe

使用

CREATE PIPE
CREATE PIPE
配合 Volume,定义从对象存储持续导入文件的逻辑。

-- 创建 OSS Pipe CREATE PIPE pipe_oss_data VIRTUAL_CLUSTER = 'default' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO oss_data FROM VOLUME my_oss_volume USING CSV OPTIONS ('header' = 'true');

运行模式

  • LIST_PURGE(默认):扫描 Volume 中的文件,导入后自动删除。
  • LIST:扫描并导入,但保留源文件。

查看 Pipe 状态

使用

SHOW PIPES
SHOW PIPES
查看所有 Pipe 的运行状态。

-- 查看 Pipe 列表 SHOW PIPES LIKE 'pipe_kafka_logs';

关键字段说明

  • status
    status
    :运行状态(RUNNING / SUSPENDED / FAILED)
  • pipe_kind
    pipe_kind
    :管道类型(READ_KAFKA / VOLUME)
  • copy_statement
    copy_statement
    :底层的 COPY INTO 语句

手动触发执行

Pipe 默认按

BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
自动运行,也可通过
ALTER PIPE
ALTER PIPE
手动触发。

-- 立即执行一次数据导入 ALTER PIPE pipe_kafka_logs EXECUTE;


删除 Pipe

使用

DROP PIPE
DROP PIPE
删除不再需要的管道。

-- 删除 Pipe DROP PIPE pipe_kafka_logs;


清理测试数据

完成 Pipe 验证后,建议清理测试表:

-- 删除测试表 DROP TABLE IF EXISTS kafka_logs; DROP TABLE IF EXISTS oss_data; DROP PIPE IF EXISTS pipe_kafka_logs; DROP PIPE IF EXISTS pipe_oss_data;


注意事项

  1. 消费位点管理:Kafka Pipe 自动管理消费者组位点,重启或故障恢复后从上次位点继续消费。
  2. 错误处理:通过
    max_errors
    max_errors
    参数控制容忍的错误记录数,超过阈值 Pipe 会暂停并报错。
  3. VCluster 选择:建议使用
    INTEGRATION
    INTEGRATION
    GENERAL
    GENERAL
    类型的 VCluster 运行 Pipe。
  4. 权限要求:创建 Pipe 需要对目标表具有
    INSERT
    INSERT
    权限,对 Kafka/Volume 具有读取权限。
  5. 幂等性:Pipe 默认保证至少一次(At-Least-Once)语义,目标表建议设计主键或使用
    MERGE INTO
    MERGE INTO
    去重。

相关文档

联系我们
预约咨询
微信咨询
电话咨询