增量计算机制:Dynamic Table 不是每次全量重算,而是分析 SQL 的依赖关系,只处理上游数据的变化部分。例如一个对订单表做 GROUP BY 的 Dynamic Table,当订单表新增 100 行时,系统只重算这 100 行涉及的分组,而不是扫描全部数据。这就是为什么首次创建时需要全量刷新(建立基准),而后续刷新通常只需几秒。
与普通
incremental
incremental
模型的区别:
incremental 模型
Dynamic Table
触发方式
手动运行
dbt run
dbt run
系统自动按周期刷新
刷新逻辑
你写
{% if is_incremental() %}
{% if is_incremental() %}
过滤
系统自动做增量计算
适合场景
需要精确控制运行时机
需要持续自动刷新
查看刷新状态
无
SHOW DYNAMIC TABLE REFRESH HISTORY
SHOW DYNAMIC TABLE REFRESH HISTORY
基本用法
{{ config(
materialized='dynamic_table',
refresh_interval='5 MINUTE',
refresh_vc='default'
) }}
select
customer_id,
count(order_id) as order_count,
sum(amount) as total_amount,
max(updated_at) as last_order_time
from {{ ref('stg_orders') }}
group by customer_id
{{ config(
materialized='dynamic_table',
refresh_vc='default',
refresh_interval='1 HOUR'
) }}
{% set order_statuses = ['O', 'F', 'P'] %}
select
date_trunc('day', o_orderdate) as order_date,
extract(year from o_orderdate) as order_year,
extract(quarter from o_orderdate) as order_quarter,
o_custkey as customer_key,
o_orderstatus as order_status,
count(*) as order_count,
sum(o_totalprice) as total_order_value,
avg(o_totalprice) as avg_order_value,
count(distinct o_custkey) as unique_customers,
{% for status in order_statuses %}
sum(case when o_orderstatus = '{{ status }}' then o_totalprice else 0 end)
as revenue_{{ status.lower() }}_status,
{% endfor %}
current_timestamp() as last_updated
from {{ ref('stg_orders_incremental') }}
where processing_type in ('RECENT', 'FULL_LOAD')
group by
date_trunc('day', o_orderdate),
extract(year from o_orderdate),
extract(quarter from o_orderdate),
o_custkey,
o_orderstatus
Jinja 的
{% for %}
{% for %}
循环自动展开多个状态的聚合列,避免重复代码。
Dynamic Table 引用 Dynamic Table
Dynamic Table 可以引用另一个 Dynamic Table,构建多层级管道:
-- Silver 层:每小时刷新订单聚合
{{ config(materialized='dynamic_table', refresh_interval='1 HOUR', refresh_vc='default') }}
select ... from {{ ref('stg_orders_incremental') }} ...
-- Gold 层:引用 Silver 层,过滤最近一年数据
{{ config(
materialized='dynamic_table',
refresh_vc='default',
refresh_interval='1 hour',
alias='DIM_CURRENT_YEAR_ORDERS'
) }}
select *
from {{ ref('dim_orders') }}
where order_date >= (
select dateadd(year, -1, date_trunc('day', max(order_date)))
from {{ ref('dim_orders') }}
)
order by order_key
⚠️ 注意:ClickZetta 不支持 Snowflake 的
target_lag='DOWNSTREAM'
target_lag='DOWNSTREAM'
(下游触发刷新)。每个 Dynamic Table 需要独立配置
refresh_interval
refresh_interval
。
查看刷新状态
SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'order_facts_dynamic' LIMIT 5;
{%- macro get_table_stream(table, stream_name=( (this.alias or this.name) ~ "_ts" )) -%}
{%- set stream_full = this.schema ~ "." ~ stream_name -%}
{% if execute and flags.WHICH in ('run', 'build') %}
{%- set stream_create_statement -%}
create table stream if not exists {{ stream_full }}
on table {{ table }}
with properties ('TABLE_STREAM_MODE' = 'STANDARD')
{%- endset -%}
{%- do run_query(stream_create_statement) -%}
{%- endif -%}
{{ return(stream_full) }}
{%- endmacro -%}
这个 macro 在模型运行时自动创建 Stream(如果不存在),并返回 Stream 的名称供 FROM 子句使用。
TABLE_STREAM_MODE = 'STANDARD'
TABLE_STREAM_MODE = 'STANDARD'
捕获 INSERT/UPDATE/DELETE 三种变更。
第二步:消费 Stream(
models/gold/dim_customer_changes.sql
models/gold/dim_customer_changes.sql
)
{{ config(
materialized='incremental',
alias='DIM_CUSTOMER_CHANGES'
) }}
select
row_number() over (order by `__commit_timestamp`, customer_key) as log_id,
* except (`__change_type`, `__commit_timestamp`, `__commit_version`),
`__change_type` as cdc_change_type,
`__commit_timestamp` as cdc_commit_ts,
`__commit_version` as cdc_version,
case when `__change_type` = 'DELETE' then 'Y' else 'N' end as delete_flag
from {{ get_table_stream(ref('dim_customers')) }} as d
where not (`__change_type` = 'DELETE' and `__change_type` != 'UPDATE_BEFORE')
qualify 1 = row_number() over (
partition by customer_key
order by `__commit_timestamp` desc, `__change_type` desc
)
几个关键点:
get_table_stream(ref('dim_customers'))
get_table_stream(ref('dim_customers'))
自动在
dim_customers
dim_customers
上创建 Stream,并返回 Stream 名称
* except (...)
* except (...)
过滤掉三个系统列,再单独 alias 出来
where not (__change_type = 'DELETE' and __change_type != 'UPDATE_BEFORE')
where not (__change_type = 'DELETE' and __change_type != 'UPDATE_BEFORE')
-- 这个 SELECT 不会推进 offset,下次还能读到同样的数据
select * from my_schema.orders_stream;
-- 这个 MERGE 成功后会推进 offset,下次只能读到新的变更
merge into target_table
using my_schema.orders_stream as src
on target_table.id = src.id
when matched then update ...
when not matched then insert ...;