实时数据管道选型指南

云器 Lakehouse 提供了三类实时数据处理对象:Pipe(持续导入)、Table Stream(变化数据捕获)、Dynamic Table(增量计算)。它们解决的问题不同,经常组合使用,但选错了会走弯路。

本文回答两个问题:每种对象适合什么场景,以及常见业务需求该怎么选。


三类对象的定位

对象解决什么问题数据流方向典型用法
Pipe外部数据源持续流入 Lakehouse外部 → 表Kafka 消息入库、OSS 文件落库
Table Stream感知表内数据变化(CDC)表 → 下游处理变更同步、增量 ETL 驱动
Dynamic Table自动维护加工结果的实时性表 → 加工结果表ODS→DWD→DWS 增量加工链路

三者不是竞争关系,而是流水线的不同环节:

外部数据源 ↓ Pipe(持续入库) ODS 表 ↓ Dynamic Table(增量加工) DWD / DWS 表 ↓ Table Stream(变更感知,驱动下游同步) 下游系统 / 目标表


选型决策树

第一个问题:数据从哪里来?

  • 来自 Kafka 或对象存储(OSS/S3/COS)→ 用 Pipe
  • 已经在 Lakehouse 的表里 → 看下一个问题

第二个问题:你要做什么?

  • 对表数据做加工转换,结果要保持实时更新 → 用 Dynamic Table
  • 需要感知某张表的行级变更(INSERT/UPDATE/DELETE),驱动下游处理 → 用 Table Stream
  • 只是一次性或低频导入 → 用 COPY INTO,不需要 Pipe

第三个问题(Pipe 场景下):触发方式?

  • 文件上传时实时触发(分钟级延迟)→ EVENT_NOTIFICATION 模式(需配置消息队列)
  • 不需要实时,可以接受定期扫描并删除原文件 → LIST_PURGE 模式(配置简单)

Pipe:适合持续入库,不适合批量导入

适合用 Pipe 的场景

  • Kafka Topic 数据持续写入 Lakehouse 表
  • OSS/S3 有文件持续上传,需要自动落库
  • 数据源不间断产生数据,需要系统自动维护读取位置

不适合用 Pipe 的场景

  • 一次性导入历史数据 → 直接用
    COPY INTO
    COPY INTO
  • 低频(每天一次)的批量同步 → 调度
    COPY INTO
    COPY INTO
    任务更简单
  • 需要严格按顺序处理数据 → Pipe 不保证加载顺序

Pipe vs COPY INTO 对比

PipeCOPY INTO(定时调度)
适合场景数据持续流入,自动处理新文件低频批量导入,一次性迁移
触发方式文件事件驱动或定时扫描手动或外部调度系统触发
读取位置管理系统自动维护(load_history)需要调用方自行管理
文件去重自动去重(同路径同文件名只导入一次)无内置去重,重复执行会重复导入
运维复杂度创建后自动运行,无需干预需要配置调度,监控执行
计算资源需要 VCluster 在线需要 VCluster 在线

文件去重行为:Pipe 通过

load_history
load_history
记录已导入的文件(保留 7 天),相同路径和文件名的文件只会导入一次。如果你需要重新导入某个文件,需要手动执行
COPY INTO
COPY INTO

文件大小建议

  • gzip 压缩文件:建议 50MB 以内
  • CSV / Parquet 未压缩:建议 128MB 到 256MB

文件过小会增加调度开销,文件过大会影响单批次加载时间。

Pipe 的两种模式选择

EVENT_NOTIFICATIONLIST_PURGE
延迟分钟级(事件触发)取决于扫描间隔
配置复杂度需要配置消息队列(阿里云 MNS / AWS SQS)简单,无需额外配置
原文件处理保留原文件导入后删除原文件
适合场景对延迟敏感,文件需要保留延迟要求不高,可接受文件被删除
云厂商支持阿里云 OSS、AWS S3所有对象存储

Table Stream:适合感知变更,不适合直接查询

适合用 Table Stream 的场景

  • 需要捕获源表的 INSERT / UPDATE / DELETE 变更,同步到目标系统
  • 增量 ETL:只处理"上次处理之后新增/变更"的数据,避免全量扫描
  • 驱动 SCD(缓慢变化维)维护
  • 监听 OSS 文件新增事件(Volume Stream)

不适合用 Table Stream 的场景

  • 只需要查询当前数据 → 直接查表,不需要 Stream
  • 只需要捕获新增数据,不关心更新和删除 → 用 APPEND_ONLY 模式,性能更好
  • 需要实时数据加工并保持结果更新 → 用 Dynamic Table

STANDARD 还是 APPEND_ONLY?

STANDARDAPPEND_ONLY
捕获的操作INSERT、UPDATE、DELETE仅 INSERT
UPDATE 的表示UPDATE_BEFORE + UPDATE_AFTER 两行不记录(只记录原始插入值)
适合场景需要处理更新和删除(如 MERGE 同步)日志类仅追加数据,追求性能
性能较高开销(需要跟踪所有变更)更轻量

偏移量推进规则(重要)

Table Stream 的偏移量只在包含该 Stream 的 DML 事务成功提交后才推进。

  • SELECT * FROM stream
    SELECT * FROM stream
    不推进偏移量,数据下次查还在
  • INSERT INTO target SELECT ... FROM stream
    INSERT INTO target SELECT ... FROM stream
    → 事务提交后推进,数据被消费
  • 事务回滚 → 不推进,数据还在

这意味着:如果你只是 SELECT 查看 Stream 数据,偏移量不会变化;必须通过包含 Stream 的 DML 操作才能消费数据并推进偏移量。

多消费者模式

一个 Stream 只能被一个消费者完整消费。当 A 任务通过 DML 消费了 Stream,偏移量就推进了,B 任务再查同一个 Stream 就看不到那批变更了。

如果多个下游任务都需要消费同一张表的变更,为每个消费者单独创建一个 Stream

-- 表上开启变更跟踪 ALTER TABLE orders SET PROPERTIES ('change_tracking' = 'true'); -- 为不同消费者各建一个 Stream CREATE TABLE STREAM orders_stream_for_dw ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD'); CREATE TABLE STREAM orders_stream_for_notify ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

每个 Stream 独立维护自己的偏移量,互不影响。Stream 本身只存储偏移量,不复制数据,创建多个 Stream 的额外成本很低。

数据保留期与 Stream 失效

Table Stream 依赖源表的历史版本数据(Time Travel)来返回变更记录。如果 Stream 长时间未被消费,且源表的历史版本被清理(超过

DATA_RETENTION_DAYS
DATA_RETENTION_DAYS
,默认 7 天),Stream 会变为不可用。

日常建议:Stream 的消费频率应该远短于源表的

DATA_RETENTION_DAYS
DATA_RETENTION_DAYS
,避免 Stream 因历史版本丢失而失效。


Dynamic Table:适合声明式增量加工,不适合高实时要求

适合用 Dynamic Table 的场景

  • ODS → DWD → DWS 的多层 ETL 加工链路
  • 查询性能优化:将复杂计算结果物化,避免每次查询重新计算
  • 固定维度的近实时分析,可以接受分钟级延迟

不适合用 Dynamic Table 的场景

  • 需要秒级实时性 → 当前最小刷新间隔 1 分钟
  • SQL 中有大量 ORDER BY → 增量刷新受限,会退化为全量刷新
  • 涉及大量 Outer Join 且右表频繁变动 → 增量计算效率低
  • 需要直接修改数据(UPDATE/DELETE/TRUNCATE) → Dynamic Table 不支持

刷新模式选择

云器 Dynamic Table 支持三种刷新调度方式:

调度方式适合场景特点
DDL 定义刷新间隔(
REFRESH INTERVAL
REFRESH INTERVAL
简单场景,快速上线不支持上下游依赖,最小间隔 1 分钟
Lakehouse Studio 调度多层 DT 链路,需要依赖控制支持任务依赖(A 完成后触发 B),有监控告警
第三方调度引擎已有调度体系,需要灵活控制时间间隔不受限,但引入外部依赖

多层链路的关键约束:上游 DT 的刷新频率决定了下游 DT 能达到的最小延迟。如果上游每 5 分钟刷新一次,下游即使设置为 1 分钟,实际数据延迟也至少是 5 分钟。

新鲜度与成本的权衡

刷新越频繁,数据越新鲜,但计算成本越高。判断合理刷新间隔的方法:

  1. 评估业务对数据延迟的容忍度("5 分钟前的数据"是否满足需求)
  2. 查看单次刷新耗时(
    SHOW DYNAMIC TABLE REFRESH HISTORY
    SHOW DYNAMIC TABLE REFRESH HISTORY
  3. 刷新间隔 > 单次刷新耗时,避免任务积压
  4. 如果刷新耗时接近间隔时间,说明增量计算退化为了全量,需要优化 SQL 或扩大间隔

增量刷新 vs 全量刷新

Dynamic Table 会自动选择增量或全量模式:

  • 增量刷新(INCREMENTAL):只计算自上次刷新以来变化的数据,效率高
  • 全量刷新(FULL):重新计算所有数据,适合首次刷新或增量条件不满足时

以下情况会触发全量刷新或无法使用增量计算:

  • SQL 中包含无法增量处理的算子(如带 ORDER BY 的窗口函数)
  • 首次
    REFRESH DYNAMIC TABLE
    REFRESH DYNAMIC TABLE
  • 源表变化量过大(接近全量数据)

通过

SHOW DYNAMIC TABLE REFRESH HISTORY
SHOW DYNAMIC TABLE REFRESH HISTORY
可以查看每次刷新的
refresh_mode
refresh_mode
字段,确认是否成功使用增量模式。


典型架构模式

模式一:Kafka 实时入库 + 增量加工

适合:日志分析、实时大屏、用户行为分析

Kafka Topic ↓ Pipe(持续入库) ods.events 表 ↓ Dynamic Table(每 1 分钟增量聚合) dwd.event_stats 表 ↓ Dynamic Table(每 5 分钟汇总) dws.daily_summary 表

端到端 SQL 示例(表名和连接名需替换为实际值):

-- 模式一完整示例:Kafka 日志实时入库 + 动态表增量聚合 -- 假设已有 Kafka Connection: kafka_conn,Topic: app_events -- Step 1:创建目标表(ODS 层) CREATE TABLE IF NOT EXISTS ods.app_events ( event_id STRING, user_id BIGINT, event_type STRING, page STRING, ts TIMESTAMP ); -- Step 2:创建 Pipe 持续消费 Kafka CREATE PIPE ods.kafka_events_pipe AS COPY INTO ods.app_events FROM READ_KAFKA( CONNECTION => 'kafka_conn', TOPIC => 'app_events' ) USING JSON; -- Step 3:创建 Dynamic Table 做分钟级聚合(DWD 层) CREATE DYNAMIC TABLE dwd.event_stats REFRESH INTERVAL 1 MINUTE VCLUSTER default AS SELECT DATE_TRUNC('minute', ts) AS minute, event_type, COUNT(*) AS event_cnt, COUNT(DISTINCT user_id) AS uv FROM ods.app_events GROUP BY 1, 2; -- 查询结果(刷新后) SELECT * FROM dwd.event_stats ORDER BY minute DESC LIMIT 5;

Pipe 创建后自动运行,无需手动触发。Dynamic Table 每分钟自动增量刷新,只处理上次刷新后新增的 events。

模式二:OSS 文件自动落库 + 增量处理

适合:IoT 设备数据、日志归档、批量文件处理

OSS Bucket(新文件持续上传) ↓ Pipe EVENT_NOTIFICATION(分钟级触发) ods.raw_files 表 ↓ Dynamic Table(增量解析/清洗) dwd.cleaned_data 表

模式三:Table Stream 驱动跨系统同步

适合:Lakehouse 内部表变更同步到外部系统、SCD 维护

source_table(业务表) ↓ Table Stream(STANDARD 模式) MERGE INTO target(Studio 定时任务,每分钟消费) target_table(同步目标)

端到端 SQL 示例(表名需替换为实际值):

-- 模式三完整示例:Table Stream 驱动增量同步 -- 场景:将 orders 表的变更实时同步到 orders_replica(跨 Schema 同步) -- Step 1:在源表上创建 STANDARD 模式 Stream CREATE TABLE STREAM orders_sync_stream ON TABLE ods.orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD'); -- Step 2:创建目标副本表(结构与源表一致) CREATE TABLE IF NOT EXISTS dwd.orders_replica AS SELECT * FROM ods.orders WHERE 1=0; -- 只复制结构,不复制数据 -- Step 3:Studio 定时任务(每分钟执行一次) MERGE INTO dwd.orders_replica t USING orders_sync_stream s ON t.order_id = s.order_id WHEN MATCHED AND s.__change_type = 'UPDATE_AFTER' THEN UPDATE SET t.status = s.status, t.amount = s.amount WHEN MATCHED AND s.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED AND s.__change_type = 'INSERT' THEN INSERT VALUES (s.order_id, s.customer_id, s.product, s.amount, s.status, s.order_date); -- UPDATE_BEFORE 行自动忽略

Stream 的 offset 在 MERGE 成功提交后自动推进。如果 MERGE 失败(事务回滚),offset 不推进,下次执行会重新消费同一批变更,保证不丢数据。

模式四:Volume Stream 监听文件变更

适合:图片/文档 AI 处理、增量文件分析

OSS Bucket(新图片上传) ↓ Volume Stream(基于 Directory Table) INSERT INTO(消费 Stream,调用 AI 函数处理) results 表


相关文档

联系我们
预约咨询
微信咨询
电话咨询