增量计算机制

增量计算(Incremental Computing)是云器 Lakehouse 的核心能力之一。它通过仅处理自上次更新以来变化的数据,而非每次都重新计算全部数据,大幅降低计算资源消耗,同时保持结果与全量计算完全一致。

GIC:通用增量计算模型

GIC(Generic Incremental Computation) 是云器自研的增量计算模型,是 Dynamic Table 的底层驱动引擎。

传统增量计算方案通常只能处理特定算子(如简单聚合)或特定数据模式(如 Append-Only),遇到 JOIN、嵌套子查询、UPDATE/DELETE 等场景就退化为全量重算。GIC 的设计目标是通用性:将任意标准 SQL 查询的执行过程分解为算子级别的增量计划,每个算子独立处理上游 Delta,向下游输出自身的 Delta,形成完整的增量执行链路。

GIC 的三个核心特性:

  • 通用性:支持 Filter、Project、Join、Aggregate、Window 等全部主流 SQL 算子的增量执行,不限制查询复杂度
  • 代价感知:每次刷新时基于数据统计信息动态选择增量或全量执行计划,而非静态规则绑定
  • 语义一致性:增量执行结果与全量重算结果严格一致,基于 MVCC 版本管理保证 Exactly-Once 语义

正是因为 GIC 的通用性,Dynamic Table 才能用标准 SQL 覆盖批处理和流处理两种场景,同时将 AI 处理结果(

AI_COMPLETE()
AI_COMPLETE()
AI_EMBEDDING()
AI_EMBEDDING()
的输出)实时回流到数据加工链路——AI 处理完成后,下游 Dynamic Table 自动感知并增量刷新,无需手动触发或额外管道。


为什么需要增量计算

在增量计算出现之前,数据处理主要有两种模式:

模式特点问题
批处理每次全量重算,T+1 天级交付数据量增长后,计算时间和资源线性增长,无法满足近实时需求
流处理常驻服务,逐行处理,秒级交付需要理解 Watermark/Window/Trigger 等复杂概念;状态存储随窗口扩大而膨胀;Retraction(回撤)处理困难

企业为了同时满足"低延迟"和"低成本",往往被迫采用 Lambda 架构:一套批处理链路 + 一套流处理链路,维护两套代码,数据一致性难以保证。

增量计算的目标:用一套标准 SQL,同时覆盖批处理和流处理的场景,淘汰 Lambda 架构,实现 Kappa 架构(一套引擎,全链路实时化)。


三种计算范式对比

维度批计算流计算增量计算
设计目标吞吐率优先新鲜度优先吞吐率、新鲜度、延迟三者灵活平衡
驱动方式主动计算(定时触发)被动计算(数据驱动)主动计算(支持定时 + 依赖触发 + 实时触发)
计算模型全量计算流计算模型(增量的一种特化)通用增量计算模型(GIC)
存储模型静态存储Pipe 管道数据模型(不含全量)动态数据(Dynamic Data,支持全量/增量读写)
数据建模标准维度建模(ODS/DWD/ADS)无(流水线模式,中间数据不可回溯)标准维度建模(支持分层,中间数据可查询)
语言表达标准 SQL + UDF非标准 SQL(需引入 EventTime、Watermark 等)标准 SQL + UDF
版本管理支持 MVCC不支持支持 MVCC(基于版本可重算)
Retraction 处理无此概念需状态存储保存窗口内全量数据统一为增量数据的一种形态(增量减)

何时选哪种?

  • 选批处理:一次性历史数据加载、全量修复、对延迟不敏感(T+1 可接受)
  • 选流处理:需要秒级响应、涉及复杂时间窗口(滑动窗口、会话窗口)、乱序数据处理
  • 选增量计算:数据分批到达、希望分钟/小时级新鲜度、要求资源成本显著低于流计算、希望用标准 SQL 开发

核心概念

增量计算建立在三个基本数据概念之上:

1. 全量数据快照(Snapshot)

一个数据集在某一时刻的所有数据的集合。

Snapshot at T0: [A, B, C, D, E] Snapshot at T1: [A, B, C, D, E, F, G]

2. 增量数据集合(Delta)

一个数据集在两个快照之间产生的数据变更记录。

Delta(T0 → T1): [+F, +G]

3. 变更数据捕获(CDC)

标准的数据库概念,用于捕获数据更改。每一行变更由三种操作之一产生:

操作含义示例
Insert新增一行
+F
+F
Update修改一行
-B(old) → +B(new)
-B(old) → +B(new)
Delete删除一行
-C
-C

三者关系

Snapshot(T0) ──[CDC]──> Delta(T0→T1) ──[Apply]──> Snapshot(T1)

系统通过 CDC 将输入数据抽象为 Delta 集合,作业感知并消费上游 Delta,向下游输出自身的 Delta,形成完整的增量 Pipeline。


增量计算原理

MVCC 机制

Lakehouse 为每张表维护多个历史版本。每次数据变更(INSERT/UPDATE/DELETE)都会生成新版本,旧版本仍然可访问。Dynamic Table 通过记录上次刷新的源表版本位点,精准定位 Delta 数据。

Version 1: [A, B, C] <-- Initial snapshot Version 2: [A, B, C, D] <-- Insert D, Delta = [+D] Version 3: [A, B', C, D, E] <-- Update B, Insert E, Delta = [-B(old), +B(new), +E]

增量算法

不同算子处理增量数据的方式不同:

算子增量处理方式是否需要读历史数据
Filter(过滤)仅对增量数据执行过滤条件❌ 不需要
Project(投影)仅对增量数据执行列裁剪/计算❌ 不需要
Join(连接)增量数据与右表历史数据连接✅ 需要读右表历史
Aggregate(聚合)增量数据与自身历史聚合结果合并✅ 需要读自身历史结果

示例:咖啡销量统计

假设咖啡店有两张表:

-- 产品表(Append-Only) CREATE TABLE products ( product_id STRING, product_name STRING ); -- 订单表(持续新增) CREATE TABLE orders ( order_id STRING, product_id STRING, price DOUBLE );

业务需求:统计除拿铁外,每种咖啡的销售总额。

声明式定义(用户只需写这个):

CREATE DYNAMIC TABLE dt_coffee_sales REFRESH INTERVAL 10 MINUTE VCLUSTER default AS SELECT p.product_name, SUM(o.price) AS total_sales FROM orders o JOIN products p ON o.product_id = p.product_id WHERE p.product_name != 'Latte' GROUP BY p.product_name;

系统内部增量执行过程:

1. 记录上次刷新位点:Version 5 2. 捕获 Delta(T5→T6):orders 新增 3 行 3. 执行增量计划: - Filter: 过滤掉 product_name = 'Latte' 的增量行 - Join: 新增订单行 JOIN products 历史表 - Aggregate: 新增行的聚合值 MERGE INTO dt_coffee_sales 历史结果 4. 更新位点:Version 6

整个过程只处理新增的 3 行,而非重新扫描 orders 全表。


调度模式

增量计算支持三种调度模式,同一份 SQL 可无缝切换,无需修改查询逻辑:

1. 数据到达立即消费(实时触发)

源表发生数据变更时,立即触发增量计算。

  • 延迟:秒级
  • 适用:风控、实时监控等对新鲜度要求极高的场景
  • 注意:计算开销较高,查询复杂度不宜过高

2. 按时间周期性调度

按预设间隔(分钟/小时/天)周期性触发。

CREATE DYNAMIC TABLE dt_name REFRESH INTERVAL 10 MINUTE VCLUSTER default AS SELECT ...;

  • 延迟:取决于刷新间隔
  • 适用:绝大多数近实时场景(分钟级/小时级报表)
  • 优势:可积攒数据批量处理,利用集群弹性能力,便于容量规划

3. 根据依赖触发调度(DAG 级联)

上游任务完成后自动触发下游计算,形成数据流水线。

ODS 层加载完成 → 触发 DWD 层清洗 → 触发 DWS 层聚合 → 触发 ADS 层报表

  • 适用:数仓分层场景,希望监控叶子结点新鲜度,上游自动适配
  • 优势:只需配置叶子结点调度,上游自动安排

性能与成本

与流计算对比

在典型场景下,增量计算比 Apache Flink 等流计算引擎节省资源,主要原因:

  1. 无常驻状态存储:流计算需维护窗口内全量状态 + Checkpoint,增量计算基于 MVCC,按需读取
  2. 向量化执行引擎:Native Vector Engine 比 Java 行式处理效率高数倍至数十倍
  3. 批量 Join 优化:增量 Join 是左表增量行与右表一次性 Hash Join,右表只读一次,利用存储缓存

影响性能的 4 个因素

因素影响说明
查询复杂度越高代价越大Outer Join + 大量历史数据连接时,可能产生大量回撤操作
数据变化类型Append-Only 代价最低Update/Delete 需要与历史数据交互,计算代价更高
数据变化速率越快代价越大每秒新增 100 万行 vs 每秒新增 1 万行,资源消耗差异显著
调度频率越高代价越大频率越高,系统固定开销(Plan 生成、资源分配)占比越大

资源消耗 | . (高频调度,系统开销占比大) | . | . (最佳平衡点) | . | . (低频调度,单次 Delta 较大) +------------------> 调度频率

Cost-based 优化框架

云器 Lakehouse 采用**基于代价(Cost-based)**的增量优化,而非基于规则(Rule-based):

  • Rule-based:SQL 定义时固定增量执行计划,无法适应数据动态变化
  • Cost-based:每次刷新时,综合数据统计信息、集群资源、增量数据量,动态选择最优执行计划(全量 or 增量、算子算法选择)

系统会在以下情况自动回退到全量计算:

  • 源表历史版本超出 Time Travel 保留周期
  • 增量数据量过大,全量计算反而更优
  • Dynamic Table 的 SQL 定义发生变更

适用场景

当业务同时满足以下条件时,增量计算是最优选择:

  • 计算逻辑可用标准 SQL 或 UDF 描述
  • 数据分批到达,形成持续增量数据集
  • 希望分钟/小时级新鲜度,同时要求成本显著低于流计算
  • 不涉及复杂时间窗口(滑动窗口、会话窗口)、Watermark 机制
  • 希望渐进式升级数仓架构,逐步验证近实时化效果

不适用场景

  • 需要秒级以下延迟(考虑流处理)
  • 涉及复杂乱序数据处理和 Watermark 机制
  • 一次性全量数据加载(直接用 INSERT INTO 或 COPY INTO)

限制项

非确定性函数

包含随机函数的场景会削弱增量计算优势,因为已计算结果不再稳定,存量数据被迫重新计算。

函数影响workaround
CURRENT_TIMESTAMP()
CURRENT_TIMESTAMP()
每次刷新结果都变,接近全量重算避免在 Dynamic Table 中直接使用
CURRENT_DATE()
CURRENT_DATE()
每天变化一次,当天内稳定可在 T+1 场景中使用
RAND()
RAND()
每次执行结果不同避免使用
DATE_FORMAT(NOW(), 'yyyy-MM')
DATE_FORMAT(NOW(), 'yyyy-MM')
月内稳定,跨月变化可接受,月内增量计算正常

建议:如果随机函数能在某个时间范围内保持稳定(如按月/按天),增量计算仍能有效工作。

其他限制

  • 源表变更量过大时,增量计算可能接近全量计算的负载
  • 部分复杂查询(如多层嵌套子查询 + 非等值 Join)可能无法增量执行,系统会自动回退到全量

完整示例:用户行为日志分析

业务场景

互联网公司希望实时了解用户在不同页面的操作情况,并将操作与用户基本信息(年龄、性别、地区)关联,进行精准分析。

数据源

  • 用户行为日志:Kafka 实时事件流
  • 用户信息维表:MySQL CDC → Kafka

Step 1:日志数据实时加载

CREATE PIPE PIPE_USER_BEHAVIOR_LOG VIRTUAL_CLUSTER = 'CZCODE_DI' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO USER_BEHAVIOR_LOG_RT FROM ( SELECT PARSE_JSON(VALUE::STRING)['EVENT_TIME'] AS EVENT_TIME, PARSE_JSON(VALUE::STRING)['USER_ID'] AS USER_ID, PARSE_JSON(VALUE::STRING)['EVENT_TYPE'] AS EVENT_TYPE, PARSE_JSON(VALUE::STRING)['PAGE_ID'] AS PAGE_ID, PARSE_JSON(VALUE::STRING)['ACTION'] AS ACTION FROM READ_KAFKA( 'host01:9092,host02:9092,host03:9092', -- bootstrap_servers 'user_behavior_topic', -- topic '', -- topic_prefix 'pipe_user_behavior_group', -- group_id '', -- starting_offsets (Pipe 自动管理) '', -- ending_offsets '', -- starting_timestamp '', -- ending_timestamp 'raw', -- key format 'raw', -- value format 0, -- max errors map() -- kafka configs ) );

Step 2:维度表增量更新

CREATE PIPE PIPE_USER_PROFILE VIRTUAL_CLUSTER = 'CZCODE_DI' BATCH_INTERVAL_IN_SECONDS = '60' AS MERGE INTO USER_PROFILE_RT A USING ( SELECT USER_ID, AGE, GENDER, REGION, EVENT_TIME, ROW_NUMBER() OVER (PARTITION BY USER_ID ORDER BY EVENT_TIME DESC) AS row_num FROM READ_KAFKA( 'host01:9092,host02:9092,host03:9092', -- bootstrap_servers 'user_profile_topic', -- topic '', -- topic_prefix 'pipe_user_profile_group', -- group_id '', -- starting_offsets (Pipe 自动管理) '', -- ending_offsets '', -- starting_timestamp '', -- ending_timestamp 'raw', -- key format 'raw', -- value format 0, -- max errors map() -- kafka configs ) ) B ON A.USER_ID = B.USER_ID WHEN MATCHED AND B.row_num = 1 AND B.EVENT_TIME > A.UPDATED_AT THEN UPDATE SET A.AGE = B.AGE, A.GENDER = B.GENDER, A.REGION = B.REGION, A.UPDATED_AT = B.EVENT_TIME WHEN NOT MATCHED AND B.row_num = 1 THEN INSERT (USER_ID, AGE, GENDER, REGION, UPDATED_AT) VALUES (B.USER_ID, B.AGE, B.GENDER, B.REGION, B.EVENT_TIME);

Step 3:日志与维表关联(DWD 层)

CREATE DYNAMIC TABLE DT_USER_BEHAVIOR_ENRICHED REFRESH INTERVAL 1 MINUTE VCLUSTER default AS SELECT A.EVENT_TIME, A.USER_ID, A.EVENT_TYPE, A.PAGE_ID, A.ACTION, B.AGE, B.GENDER, B.REGION FROM USER_BEHAVIOR_LOG_RT A LEFT JOIN USER_PROFILE_RT B ON A.USER_ID = B.USER_ID;

Step 4:聚合分析(DWS 层)

CREATE DYNAMIC TABLE DT_USER_BEHAVIOR_ANALYTICS REFRESH INTERVAL 1 MINUTE VCLUSTER default_ap AS SELECT DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd HH:00:00') AS EVENT_HOUR, PAGE_ID, EVENT_TYPE, REGION, COUNT(*) AS TOTAL_EVENTS, COUNT(DISTINCT USER_ID) AS UNIQUE_USERS, SUM(CASE WHEN EVENT_TYPE = 'CLICK' THEN 1 ELSE 0 END) AS CLICK_EVENTS, SUM(CASE WHEN EVENT_TYPE = 'VIEW' THEN 1 ELSE 0 END) AS VIEW_EVENTS FROM DT_USER_BEHAVIOR_ENRICHED GROUP BY EVENT_HOUR, PAGE_ID, EVENT_TYPE, REGION;

架构总结

Kafka (行为日志) ──[Pipe]──> USER_BEHAVIOR_LOG_RT (ODS) Kafka (用户维度) ──[Pipe]──> USER_PROFILE_RT (ODS) │ ├──[DT 1min]──> DT_USER_BEHAVIOR_ENRICHED (DWD) │ │ │ └──[DT 1min]──> DT_USER_BEHAVIOR_ANALYTICS (DWS)

  • 数据导入:Pipe 自动从 Kafka 批量加载,支持 Append 和 CDC 写入
  • ETL 处理:Dynamic Table 声明式定义,自动增量刷新
  • 数仓分层:ODS → DWD → DWS,完整体现分层思想
  • 无需人工干预:系统自动完成调度、增量优化、依赖传递

常见问题

Q: 增量计算和物化视图有什么区别?

特性Dynamic Table(增量计算)Materialized View
设计目标构建多层数据管道透明提升单表查询性能
查询重写不会自动重写查询,需显式引用优化器自动重写查询使用 MV
数据源支持多表 JOIN、UNION 等复杂查询仅支持单表
刷新方式增量刷新,仅处理 Delta全量或增量(受限)
适用场景ETL 管道、数仓分层加速特定查询

Q: 如何确认刷新是增量还是全量?

SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'dt_name' LIMIT 10;

查看输出中的刷新模式和耗时。如果某次刷新耗时显著高于平时,可能是触发了全量刷新。

Q: 全量刷新后,下游 Dynamic Table 会怎样?

下游 Dynamic Table 会感知上游的全量刷新,并在下次刷新时自动适配。如果上游数据全部变化,下游也可能执行接近全量的计算。

Q: 如何避免不必要的全量刷新?

  • 确保
    data_retention_days
    data_retention_days
    足够长,避免源表历史版本过期
  • 避免在 Dynamic Table 的 SQL 中使用非确定性函数
  • 修改 SQL 定义前,评估是否真的需要变更

Q: 增量计算能保证 Exactly-Once 语义吗?

能。基于 MVCC 的版本管理,每次刷新基于确定的源表版本位点,故障恢复后可从上次位点重算,保证结果与全量计算一致。


相关文档

联系我们
预约咨询
微信咨询
电话咨询