CREATE PIPE
创建一个 Pipe 对象,用于持续将数据从对象存储或 Kafka 导入到 Lakehouse。
从对象存储导入数据
CREATE PIPE [IF NOT EXISTS] <pipe_name>
VIRTUAL_CLUSTER = 'virtual_cluster_name'
INGEST_MODE = 'LIST_PURGE' | 'EVENT_NOTIFICATION'
[COPY_JOB_HINT = '']
AS <copy_statement>;
参数说明:
| 参数 | 必填 | 说明 |
|---|
pipe_name
pipe_name | 是 | Pipe 对象的名称 |
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER | 是 | 指定执行 COPY 作业所用的计算集群名称 |
INGEST_MODE
INGEST_MODE | 是 | 数据导入模式:LIST_PURGE
LIST_PURGE (轮询扫描)或 EVENT_NOTIFICATION
EVENT_NOTIFICATION (事件通知触发) |
COPY_JOB_HINT
COPY_JOB_HINT | 否 | Lakehouse 保留参数。支持 IGNORE_TMP_FILE
IGNORE_TMP_FILE (取值 true
true |false
false ,默认 true
true ),用于过滤以 .
. 或 _temporary
_temporary 开头的文件或目录 |
copy_statement
copy_statement | 是 | 标准 COPY INTO
COPY INTO 语句。支持 ON_ERROR=CONTINUE|ABORT
ON_ERROR=CONTINUE|ABORT 参数控制错误处理策略 |
使用限制:
- Pipe 中的 COPY 语句不支持
FILES
FILES
、REGEXP
REGEXP
、SUBDIRECTORY
SUBDIRECTORY
参数。
- 每个 Pipe 需对应独立的 Volume,不可复用。
从 Kafka 导入数据
CREATE PIPE [IF NOT EXISTS] <pipe_name>
VIRTUAL_CLUSTER = 'virtual_cluster_name'
[INITIAL_DELAY_IN_SECONDS = '']
[BATCH_INTERVAL_IN_SECONDS = '']
[BATCH_SIZE_PER_KAFKA_PARTITION = '']
[MAX_SKIP_BATCH_COUNT_ON_ERROR = '']
[RESET_KAFKA_GROUP_OFFSETS = '']
[COPY_JOB_HINT = '']
AS <copy_statement>;
参数说明:
| 参数 | 必填 | 默认值 | 说明 |
|---|
VIRTUAL_CLUSTER
VIRTUAL_CLUSTER | 是 | — | 指定执行 COPY 作业所用的计算集群名称 |
INITIAL_DELAY_IN_SECONDS
INITIAL_DELAY_IN_SECONDS | 否 | 0 | 首个作业调度延迟(秒) |
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS | 否 | 60 | 批处理间隔时间(秒) |
BATCH_SIZE_PER_KAFKA_PARTITION
BATCH_SIZE_PER_KAFKA_PARTITION | 否 | 500000 | 每个 Kafka 分区的批处理最大消息数 |
MAX_SKIP_BATCH_COUNT_ON_ERROR
MAX_SKIP_BATCH_COUNT_ON_ERROR | 否 | 30 | 出错时跳过批次的最大重试次数 |
RESET_KAFKA_GROUP_OFFSETS
RESET_KAFKA_GROUP_OFFSETS | 否 | none | 启动 Pipe 时 Kafka 的初始点位。可选值:none
none (无操作)、valid
valid (重置过期点位)、earliest
earliest 、latest
latest 、${TIMESTAMP_MILLISECONDS}
${TIMESTAMP_MILLISECONDS} |
相关文档