DBT 实时数据管道实战


两种实时管道模式

dbt + ClickZetta 支持两种实时数据管道模式,适用于不同场景:

模式机制适用场景
Dynamic Table声明式 SQL,系统自动按周期增量刷新聚合、转换、多层级数仓管道
Table Stream + CDC捕获表的行级变更(INSERT/UPDATE/DELETE),消费变更流审计日志、维度变更追踪、下游实时同步

两者可以组合使用:用 Table Stream 捕获源表变更,用 Dynamic Table 对变更数据做聚合。


Dynamic Table

什么是 Dynamic Table

Dynamic Table 是 ClickZetta 的声明式增量计算对象。你只需定义一条 SQL,系统会按照设定的刷新周期自动计算并维护结果——不需要写调度脚本,不需要管理状态,刷新失败会自动重试。

增量计算机制:Dynamic Table 不是每次全量重算,而是分析 SQL 的依赖关系,只处理上游数据的变化部分。例如一个对订单表做 GROUP BY 的 Dynamic Table,当订单表新增 100 行时,系统只重算这 100 行涉及的分组,而不是扫描全部数据。这就是为什么首次创建时需要全量刷新(建立基准),而后续刷新通常只需几秒。

与普通

incremental
incremental
模型的区别:

incremental 模型Dynamic Table
触发方式手动运行
dbt run
dbt run
系统自动按周期刷新
刷新逻辑你写
{% if is_incremental() %}
{% if is_incremental() %}
过滤
系统自动做增量计算
适合场景需要精确控制运行时机需要持续自动刷新
查看刷新状态
SHOW DYNAMIC TABLE REFRESH HISTORY
SHOW DYNAMIC TABLE REFRESH HISTORY

基本用法

{{ config( materialized='dynamic_table', refresh_interval='5 MINUTE', refresh_vc='default' ) }} select customer_id, count(order_id) as order_count, sum(amount) as total_amount, max(updated_at) as last_order_time from {{ ref('stg_orders') }} group by customer_id

两个关键参数:

  • refresh_interval
    refresh_interval
    :刷新周期,支持
    '5 MINUTE'
    '5 MINUTE'
    '1 HOUR'
    '1 HOUR'
    '1 DAY'
    '1 DAY'
    等(单位用单数大写)
  • refresh_vc
    refresh_vc
    :执行刷新的计算集群名称。Dynamic Table 做的是聚合查询,推荐使用分析型集群(
    default_ap
    default_ap

dbt build
dbt build
创建 Dynamic Table 后,系统立即开始按周期自动刷新,无需任何额外操作。

手动触发刷新

需要立即获取最新数据时,可以手动触发:

dbt run-operation refresh_dynamic_table --args '{model_name: customer_stats_dynamic}'

实战示例:订单实时聚合

以下是 snowflake-dbt2lakehouse-dbt 里实际运行的 Dynamic Table,每小时自动刷新订单聚合指标:

{{ config( materialized='dynamic_table', refresh_vc='default', refresh_interval='1 HOUR' ) }} {% set order_statuses = ['O', 'F', 'P'] %} select date_trunc('day', o_orderdate) as order_date, extract(year from o_orderdate) as order_year, extract(quarter from o_orderdate) as order_quarter, o_custkey as customer_key, o_orderstatus as order_status, count(*) as order_count, sum(o_totalprice) as total_order_value, avg(o_totalprice) as avg_order_value, count(distinct o_custkey) as unique_customers, {% for status in order_statuses %} sum(case when o_orderstatus = '{{ status }}' then o_totalprice else 0 end) as revenue_{{ status.lower() }}_status, {% endfor %} current_timestamp() as last_updated from {{ ref('stg_orders_incremental') }} where processing_type in ('RECENT', 'FULL_LOAD') group by date_trunc('day', o_orderdate), extract(year from o_orderdate), extract(quarter from o_orderdate), o_custkey, o_orderstatus

Jinja 的

{% for %}
{% for %}
循环自动展开多个状态的聚合列,避免重复代码。

Dynamic Table 引用 Dynamic Table

Dynamic Table 可以引用另一个 Dynamic Table,构建多层级管道:

-- Silver 层:每小时刷新订单聚合 {{ config(materialized='dynamic_table', refresh_interval='1 HOUR', refresh_vc='default') }} select ... from {{ ref('stg_orders_incremental') }} ... -- Gold 层:引用 Silver 层,过滤最近一年数据 {{ config( materialized='dynamic_table', refresh_vc='default', refresh_interval='1 hour', alias='DIM_CURRENT_YEAR_ORDERS' ) }} select * from {{ ref('dim_orders') }} where order_date >= ( select dateadd(year, -1, date_trunc('day', max(order_date))) from {{ ref('dim_orders') }} ) order by order_key

查看刷新状态

SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'order_facts_dynamic' LIMIT 5;

返回每次刷新的开始时间、结束时间、状态(SUCCEED/FAILED)、刷新模式(INCREMENTAL/FULL)等信息。


Table Stream(CDC)

什么是 Table Stream

Table Stream 是 ClickZetta 的变更数据捕获(CDC)机制。在一张表上创建 Stream 后,对该表的每次 INSERT、UPDATE、DELETE 操作都会被记录下来,消费者可以读取这些变更记录。

Stream 的核心特性:

  • 不复制数据:Stream 只记录变更,不存储完整数据副本
  • offset 自动推进:只有 DML 操作(INSERT/MERGE 等)才会推进 offset,SELECT 不会
  • 系统列:每行变更附带三个系统列:
    __change_type
    __change_type
    __commit_timestamp
    __commit_timestamp
    __commit_version
    __commit_version

在 DBT 中使用 Table Stream

第一步:在

sources.yml
sources.yml
里声明 Stream

sources: - name: my_streams schema: my_schema tables: - name: orders_stream description: "orders 表的变更流"

第二步:在模型里用

source()
source()
引用

{{ config(materialized='view') }} select `__change_type`, `__commit_timestamp`, order_id, customer_id, amount, status from {{ source('my_streams', 'orders_stream') }}

SELECT * EXCEPT 过滤系统列

消费 Stream 时,通常只需要业务列,不需要系统列。用

SELECT * EXCEPT(...)
SELECT * EXCEPT(...)
可以过滤掉系统列,不需要硬编码所有业务列名:

select * except (`__change_type`, `__commit_timestamp`, `__commit_version`), `__change_type` as cdc_change_type, `__commit_timestamp` as cdc_commit_ts from {{ source('my_streams', 'orders_stream') }}

这样当源表加列时,不需要修改消费模型。

实战示例:客户维度变更追踪

以下是 snowflake-dbt2lakehouse-dbt 里实际运行的 CDC 管道,追踪

dim_customers
dim_customers
表的每次变更:

第一步:创建 Stream 的 macro

macros/get_table_stream.sql
macros/get_table_stream.sql

{%- macro get_table_stream(table, stream_name=( (this.alias or this.name) ~ "_ts" )) -%} {%- set stream_full = this.schema ~ "." ~ stream_name -%} {% if execute and flags.WHICH in ('run', 'build') %} {%- set stream_create_statement -%} create table stream if not exists {{ stream_full }} on table {{ table }} with properties ('TABLE_STREAM_MODE' = 'STANDARD') {%- endset -%} {%- do run_query(stream_create_statement) -%} {%- endif -%} {{ return(stream_full) }} {%- endmacro -%}

这个 macro 在模型运行时自动创建 Stream(如果不存在),并返回 Stream 的名称供 FROM 子句使用。

TABLE_STREAM_MODE = 'STANDARD'
TABLE_STREAM_MODE = 'STANDARD'
捕获 INSERT/UPDATE/DELETE 三种变更。

第二步:消费 Stream

models/gold/dim_customer_changes.sql
models/gold/dim_customer_changes.sql

{{ config( materialized='incremental', alias='DIM_CUSTOMER_CHANGES' ) }} select row_number() over (order by `__commit_timestamp`, customer_key) as log_id, * except (`__change_type`, `__commit_timestamp`, `__commit_version`), `__change_type` as cdc_change_type, `__commit_timestamp` as cdc_commit_ts, `__commit_version` as cdc_version, case when `__change_type` = 'DELETE' then 'Y' else 'N' end as delete_flag from {{ get_table_stream(ref('dim_customers')) }} as d where not (`__change_type` = 'DELETE' and `__change_type` != 'UPDATE_BEFORE') qualify 1 = row_number() over ( partition by customer_key order by `__commit_timestamp` desc, `__change_type` desc )

几个关键点:

  • get_table_stream(ref('dim_customers'))
    get_table_stream(ref('dim_customers'))
    自动在
    dim_customers
    dim_customers
    上创建 Stream,并返回 Stream 名称
  • * except (...)
    * except (...)
    过滤掉三个系统列,再单独 alias 出来
  • where not (__change_type = 'DELETE' and __change_type != 'UPDATE_BEFORE')
    where not (__change_type = 'DELETE' and __change_type != 'UPDATE_BEFORE')
    — 这个条件过滤掉纯 DELETE 行(
    __change_type = 'DELETE'
    __change_type = 'DELETE'
    且不是 UPDATE_BEFORE),保留 INSERT、UPDATE_BEFORE、UPDATE_AFTER
  • qualify row_number() = 1
    qualify row_number() = 1
    对同一
    customer_key
    customer_key
    只保留最新的变更记录

Stream 的 offset 管理

Stream 的 offset 记录了"消费者上次读到哪里"。只有 DML 操作(INSERT/MERGE 等)成功后才会推进 offset,SELECT 不会推进。这个设计保证了幂等性:如果消费失败,下次还能重新读到同样的数据,不会丢失变更。

-- 这个 SELECT 不会推进 offset,下次还能读到同样的数据 select * from my_schema.orders_stream; -- 这个 MERGE 成功后会推进 offset,下次只能读到新的变更 merge into target_table using my_schema.orders_stream as src on target_table.id = src.id when matched then update ... when not matched then insert ...;

dbt 的

incremental
incremental
模型在每次运行时会执行 MERGE INTO,所以每次
dbt run
dbt run
成功后 Stream 的 offset 会自动推进。如果
dbt run
dbt run
中途失败,offset 不会推进,下次运行会重新处理这批变更——这正是"至少一次"语义的保证。


组合使用:Stream + Dynamic Table

一个典型的实时管道架构:

源表(持续写入) ↓ Table Stream(捕获变更) 增量消费模型(incremental,消费 Stream,推进 offset) ↓ Dynamic Table(自动聚合,每小时刷新) 分析层(BI 查询)

这个架构的优点:

  • Stream 保证不遗漏任何变更
  • Dynamic Table 自动维护聚合结果,无需手动调度
  • 两者解耦,可以独立调整刷新频率

相关文档

联系我们
预约咨询
微信咨询
电话咨询