CREATE PIPE

创建一个 Pipe 对象,用于持续将数据从对象存储或 Kafka 导入到 Lakehouse。

从对象存储导入数据

CREATE PIPE [IF NOT EXISTS] <pipe_name> VIRTUAL_CLUSTER = 'virtual_cluster_name' INGEST_MODE = 'LIST_PURGE' | 'EVENT_NOTIFICATION' [COPY_JOB_HINT = ''] AS <copy_statement>;

参数说明:

参数必填说明
pipe_name
pipe_name
Pipe 对象的名称
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER
指定执行 COPY 作业所用的计算集群名称
INGEST_MODE
INGEST_MODE
数据导入模式:
LIST_PURGE
LIST_PURGE
(轮询扫描)或
EVENT_NOTIFICATION
EVENT_NOTIFICATION
(事件通知触发)
COPY_JOB_HINT
COPY_JOB_HINT
Lakehouse 保留参数。支持
IGNORE_TMP_FILE
IGNORE_TMP_FILE
(取值
true
true
|
false
false
,默认
true
true
),用于过滤以
.
.
_temporary
_temporary
开头的文件或目录
copy_statement
copy_statement
标准
COPY INTO
COPY INTO
语句。支持
ON_ERROR=CONTINUE|ABORT
ON_ERROR=CONTINUE|ABORT
参数控制错误处理策略

使用限制:

  • Pipe 中的 COPY 语句不支持
    FILES
    FILES
    REGEXP
    REGEXP
    SUBDIRECTORY
    SUBDIRECTORY
    参数。
  • 每个 Pipe 需对应独立的 Volume,不可复用。

从 Kafka 导入数据

CREATE PIPE [IF NOT EXISTS] <pipe_name> VIRTUAL_CLUSTER = 'virtual_cluster_name' [INITIAL_DELAY_IN_SECONDS = ''] [BATCH_INTERVAL_IN_SECONDS = ''] [BATCH_SIZE_PER_KAFKA_PARTITION = ''] [MAX_SKIP_BATCH_COUNT_ON_ERROR = ''] [RESET_KAFKA_GROUP_OFFSETS = ''] [COPY_JOB_HINT = ''] AS <copy_statement>;

参数说明:

参数必填默认值说明
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER
指定执行 COPY 作业所用的计算集群名称
INITIAL_DELAY_IN_SECONDS
INITIAL_DELAY_IN_SECONDS
0首个作业调度延迟(秒)
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
60批处理间隔时间(秒)
BATCH_SIZE_PER_KAFKA_PARTITION
BATCH_SIZE_PER_KAFKA_PARTITION
500000每个 Kafka 分区的批处理最大消息数
MAX_SKIP_BATCH_COUNT_ON_ERROR
MAX_SKIP_BATCH_COUNT_ON_ERROR
30出错时跳过批次的最大重试次数
RESET_KAFKA_GROUP_OFFSETS
RESET_KAFKA_GROUP_OFFSETS
none启动 Pipe 时 Kafka 的初始点位。可选值:
none
none
(无操作)、
valid
valid
(重置过期点位)、
earliest
earliest
latest
latest
${TIMESTAMP_MILLISECONDS}
${TIMESTAMP_MILLISECONDS}

相关文档

联系我们
预约咨询
微信咨询
电话咨询