DBT 增量处理实战


什么是增量处理

数据仓库里的表通常有两种更新方式:

  • 全量刷新:每次把整张表重新计算一遍。数据量小时简单可靠,但数据量大时代价高昂——1 亿行的订单表,每次刷新都要扫描全部数据。
  • 增量处理:只处理"新增或变化"的数据,把结果合并到已有表里。首次运行全量,后续只处理增量,速度快得多。

dbt 的

incremental
incremental
materialization 就是增量处理的实现。它的核心逻辑是:

首次运行(is_incremental() = false)→ CREATE TABLE AS SELECT(全量) 后续运行(is_incremental() = true) → 只查询新数据,合并到已有表

is_incremental()
is_incremental()
怎么判断"首次"还是"后续"?

dbt 在运行前检查目标表是否已经存在:不存在则是首次运行,存在则是后续运行。这意味着:

  • 手动
    DROP TABLE
    DROP TABLE
    后再运行,会触发全量重建
  • dbt run --full-refresh
    dbt run --full-refresh
    会强制全量重建,忽略已有表

增量处理的前提条件

增量处理要求 source 数据有办法识别"哪些是新的或变化的"。常见的标识方式:

  • 时间戳
    updated_at
    updated_at
    created_at
    created_at
    等字段,每次更新时自动更新
  • 自增主键:新行的 ID 总是比已有行大
  • 两者结合:既有时间戳又有主键,互为补充

如果 source 数据有删除操作但没有

updated_at
updated_at
字段,增量模型无法感知删除,这种情况下应该用全量刷新,或者改用 Table Stream 捕获 DELETE 事件。

合并方式由

incremental_strategy
incremental_strategy
控制,ClickZetta Lakehouse 支持 4 种策略。


4 种增量策略

merge(默认)

MERGE INTO
MERGE INTO
语句,按
unique_key
unique_key
匹配:匹配到的行更新,没匹配到的行插入。适合有主键、需要处理数据更新的场景。

{{ config( materialized='incremental', incremental_strategy='merge', unique_key='order_id' ) }} select order_id, customer_id, amount, status, updated_at from {{ ref('stg_orders') }} {% if is_incremental() %} where updated_at >= (select max(updated_at) from {{ this }}) {% endif %}

{{ this }}
{{ this }}
指向当前模型对应的表,
is_incremental()
is_incremental()
在首次运行时返回 false,后续运行返回 true。

实测数据(来自 jaffle-shop-clickzetta):61,948 行订单,merge 策略首次运行约 20 秒。


append

只做

INSERT INTO
INSERT INTO
,不去重。适合日志类数据——每条记录天然唯一,不需要更新,只需追加。

{{ config( materialized='incremental', incremental_strategy='append' ) }} select customer_key, customer_name, account_balance, 'INSERT' as cdc_change_type, current_timestamp() as cdc_commit_ts from {{ source('TPC_H', 'CUSTOMER') }} {% if is_incremental() %} TABLESAMPLE SYSTEM(10) {% endif %}

这是 snowflake-dbt2lakehouse-dbt

customer_cdc_stream
customer_cdc_stream
模型的实际写法——模拟 CDC 数据流,每次增量运行采样 10% 的数据追加进去,不需要去重。


delete+insert

先按

unique_key
unique_key
删除目标表中匹配的行,再整体插入。适合需要按 key 替换一批数据的场景——例如回填历史分区、修正已有数据,或者 source 数据没有可靠的
updated_at
updated_at
字段但有主键。

{{ config( materialized='incremental', incremental_strategy='delete+insert', unique_key='order_id' ) }} select order_id, customer_id, amount, status, region, dt, updated_at from {{ ref('stg_orders') }} {% if is_incremental() %} where updated_at >= (select max(updated_at) from {{ this }}) {% endif %}

这是 dbt-clickzetta examples

fct_orders_delete_insert
fct_orders_delete_insert
的实际写法。


insert_overwrite

INSERT OVERWRITE
INSERT OVERWRITE
,动态分区模式。适合按分区组织的数据,每次只重算目标分区。

{{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by='dt' ) }} select dt, region, count(order_id) as order_count, sum(amount) as revenue from {{ ref('stg_orders') }} where status = 'completed' {% if is_incremental() %} and dt >= (select date_sub(max(dt), 3) from {{ this }}) {% endif %} group by dt, region

增量运行时只重算最近 3 天的分区,历史分区不动。这是 dbt-clickzetta examples

daily_revenue
daily_revenue
的实际写法。


策略选型

策略适用场景注意事项
merge
merge
有主键,需要处理更新需要
unique_key
unique_key
;source 中同一 key 不能有多行
append
append
日志、事件流,只追加不更新不去重,重复运行会产生重复数据
delete+insert
delete+insert
数据量大,merge 性能差;按分区替换需要
unique_key
unique_key
;先删后插,中间有短暂数据缺失
insert_overwrite
insert_overwrite
按分区组织,每次重算若干分区需要
partition_by
partition_by
;整个分区被替换

增量过滤的写法

增量处理的关键是:如何只查出"新增或变化"的数据。常见的三种写法:

按时间戳过滤(最常用):

{% if is_incremental() %} where updated_at >= (select max(updated_at) from {{ this }}) {% endif %}

按主键水位过滤(适合自增 ID):

{% if is_incremental() %} where order_key > (select coalesce(max(order_key), 0) from {{ this }}) {% endif %}

两者结合(来自 snowflake-dbt2lakehouse-dbt 的实际代码):

{% if is_incremental() %} where o_orderdate >= dateadd(day, -{{ var('prune_days') }}, current_date()) or o_orderkey > (select coalesce(max(o_orderkey), 0) from {{ this }}) {% endif %}

prune_days
prune_days
dbt_project.yml
dbt_project.yml
里定义的变量(默认 2),表示重处理最近 N 天的数据,同时也处理主键比当前最大值更大的新行。两个条件用
OR
OR
连接,确保不遗漏任何新数据。


schema 变更处理

当 source 表加了新列,增量模型如何处理?通过

on_schema_change
on_schema_change
控制:

{{ config( materialized='incremental', unique_key='o_orderkey', incremental_strategy='merge', on_schema_change='append_new_columns' ) }}

行为
ignore
ignore
(默认)
忽略新列,不报错
append_new_columns
append_new_columns
自动给目标表加新列,历史数据该列为 NULL
sync_all_columns
sync_all_columns
同步所有列变化(加列、删列)
fail
fail
有 schema 变化时直接报错

生产环境推荐

append_new_columns
append_new_columns
——既不会因为加列而报错,也不会静默丢失新列数据。


完整示例:TPC-H 订单增量管道

以下是 snowflake-dbt2lakehouse-dbt 里实际运行的增量管道,Bronze 层摄取 → Gold 层维度表:

Bronze 层

stg_orders_incremental.sql
stg_orders_incremental.sql
):

{{ config( materialized='incremental', unique_key='o_orderkey', incremental_strategy='merge', on_schema_change='append_new_columns' ) }} select o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, case when o_orderstatus = 'O' then 'OPEN' when o_orderstatus = 'F' then 'FULFILLED' when o_orderstatus = 'P' then 'PARTIAL' else 'UNKNOWN' end as order_status_desc, current_timestamp() as processed_at from {{ source('TPC_H', 'ORDERS') }} {% if is_incremental() %} where o_orderdate >= dateadd(day, -{{ var('prune_days') }}, current_date()) or o_orderkey > (select coalesce(max(o_orderkey), 0) from {{ this }}) {% endif %}

Gold 层

dim_orders.sql
dim_orders.sql
):

{{ config( materialized='incremental', incremental_strategy='merge', unique_key='order_key', alias='DIM_ORDERS' ) }} with orders_base as ( select * from {{ ref('stg_orders_incremental') }} ), customers as ( select customer_key, customer_name, nation_key from {{ ref('dim_customers') }} qualify row_number() over (partition by customer_key order by dbt_updated_ts desc) = 1 ), enriched_orders as ( select o.o_orderkey as order_key, o.o_custkey as customer_key, c.customer_name, o.order_status_desc, o.o_totalprice as total_price, o.o_orderdate as order_date, o.processed_at as _loaded_at, current_timestamp() as _updated_at from orders_base o left join customers c on o.o_custkey = c.customer_key ) select * from enriched_orders {% if is_incremental() %} where _loaded_at >= (select max(_loaded_at) from {{ this }}) {% endif %}


三种增量处理方式对比

本文介绍的

incremental
incremental
materialization 是 dbt 标准的增量处理方式,需要手动运行
dbt run
dbt run
触发。ClickZetta 还支持另外两种增量处理方式,适合不同场景:

方式触发机制适用场景
incremental
incremental
materialization(本文)
手动运行
dbt run
dbt run
需要精确控制运行时机;与调度系统集成
Dynamic Table系统按
refresh_interval
refresh_interval
自动刷新
需要持续自动刷新;不想管调度
Table Stream + incremental消费 CDC 变更流,每次
dbt run
dbt run
处理新变更
追踪源表的行级变更(INSERT/UPDATE/DELETE)

三种方式可以组合:用 Table Stream 捕获源表变更 → 用

incremental
incremental
消费变更 → 用 Dynamic Table 对结果做自动聚合。

详见 DBT 实时数据管道实战


相关文档

联系我们
预约咨询
微信咨询
电话咨询