{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by='dt'
) }}
select
dt,
region,
count(order_id) as order_count,
sum(amount) as revenue
from {{ ref('stg_orders') }}
where status = 'completed'
{% if is_incremental() %}
and dt >= (select date_sub(max(dt), 3) from {{ this }})
{% endif %}
group by dt, region
{% if is_incremental() %}
where o_orderdate >= dateadd(day, -{{ var('prune_days') }}, current_date())
or o_orderkey > (select coalesce(max(o_orderkey), 0) from {{ this }})
{% endif %}
prune_days
prune_days
是
dbt_project.yml
dbt_project.yml
里定义的变量(默认 2),表示重处理最近 N 天的数据,同时也处理主键比当前最大值更大的新行。两个条件用
{{ config(
materialized='incremental',
unique_key='o_orderkey',
incremental_strategy='merge',
on_schema_change='append_new_columns'
) }}
select
o_orderkey,
o_custkey,
o_orderstatus,
o_totalprice,
o_orderdate,
case
when o_orderstatus = 'O' then 'OPEN'
when o_orderstatus = 'F' then 'FULFILLED'
when o_orderstatus = 'P' then 'PARTIAL'
else 'UNKNOWN'
end as order_status_desc,
current_timestamp() as processed_at
from {{ source('TPC_H', 'ORDERS') }}
{% if is_incremental() %}
where o_orderdate >= dateadd(day, -{{ var('prune_days') }}, current_date())
or o_orderkey > (select coalesce(max(o_orderkey), 0) from {{ this }})
{% endif %}
Gold 层(
dim_orders.sql
dim_orders.sql
):
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='order_key',
alias='DIM_ORDERS'
) }}
with orders_base as (
select * from {{ ref('stg_orders_incremental') }}
),
customers as (
select customer_key, customer_name, nation_key
from {{ ref('dim_customers') }}
qualify row_number() over (partition by customer_key order by dbt_updated_ts desc) = 1
),
enriched_orders as (
select
o.o_orderkey as order_key,
o.o_custkey as customer_key,
c.customer_name,
o.order_status_desc,
o.o_totalprice as total_price,
o.o_orderdate as order_date,
o.processed_at as _loaded_at,
current_timestamp() as _updated_at
from orders_base o
left join customers c on o.o_custkey = c.customer_key
)
select * from enriched_orders
{% if is_incremental() %}
where _loaded_at >= (select max(_loaded_at) from {{ this }})
{% endif %}
⚠️ 注意:Gold 层 join
dim_customers
dim_customers
时加了
qualify row_number() = 1
qualify row_number() = 1
去重。这是因为
dim_customers
dim_customers
是 SCD 模型,同一个
customer_key
customer_key
可能有多个历史版本,不去重会导致
EXISTS_NONDETERMINISTIC_ROWS
EXISTS_NONDETERMINISTIC_ROWS
错误。
⚠️ NULL 陷阱:增量过滤
where _loaded_at >= (select max(_loaded_at) from {{ this }})
where _loaded_at >= (select max(_loaded_at) from {{ this }})