DBT Snowflake 迁移实战:TPC-H 数仓管道
如果你在 Snowflake 上用 dbt 构建了数仓管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 6 处平台专有特性上,标准 SQL 模型一行不用改。
本文用一个真实迁移项目演示完整过程:将 sfc-gh-dflippo/snowflake-dbt-demo (Snowflake 官方工程师维护的 dbt 演示项目)迁移到 ClickZetta Lakehouse,数据源使用两个平台共有的 TPC-H 标准数据集,全部模型经过实际运行验证,24/24 通过。
完整代码见 GitHub:clickzetta/snowflake-dbt2lakehouse-dbt
原始项目
sfc-gh-dflippo/snowflake-dbt-demo 是 Snowflake 官方工程师维护的 dbt 功能演示项目。业务场景是 TPC-H 客户订单分析:从原始订单数据出发,经过 Bronze(清洗)→ Silver(聚合、分群)→ Gold(维度表、指标)三层处理,最终输出客户价值分层、订单事实表、日历维度等分析模型。
项目覆盖了 dbt 在 Snowflake 上的大多数高级特性:Dynamic Tables、Streams(CDC)、Sequences、Python models、Incremental 策略等,是验证迁移完整性的理想对象。
迁移后的代码在
03_lakehouse/03_lakehouse/
目录,原始 Snowflake 代码保留在
01_snowflake/01_snowflake/
供对照,迁移说明在
02_migration/MIGRATION_NOTES.md02_migration/MIGRATION_NOTES.md
。
结论先行
你的 dbt 项目可以迁移,业务逻辑不需要重写。 这次迁移改动了 6 处,全部是平台配置和函数名替换,没有一处涉及数据处理逻辑本身。如果你的项目只用了标准 SQL(没有 Dynamic Tables、Streams、Sequences),换一个
profiles.ymlprofiles.yml
就能跑通。
改动项 工作量 说明 profiles.ymlprofiles.yml
连接配置极低 字段名对照替换,5 分钟完成 Dynamic Table 参数名 极低 target_lagtarget_lag
→ refresh_intervalrefresh_interval
,snowflake_warehousesnowflake_warehouse
→ refresh_vcrefresh_vc
CDC Stream 列名 低 METADATA$ACTIONMETADATA$ACTION
→ `__change_type``__change_type`
,消费时用 SELECT * EXCEPT(...)SELECT * EXCEPT(...)
Surrogate key 低 SEQUENCE .nextvalSEQUENCE .nextval
→ IDENTITYIDENTITY
列或 row_number() over (...)row_number() over (...)
,注意语义差异(见第四步)多列哈希 低 hash()hash()
→ hash_combine(crc32(col), ...)hash_combine(crc32(col), ...)
,需要了解 ClickZetta 函数体系日历表行生成 低 table(generator(...))table(generator(...))
→ explode(sequence(...))explode(sequence(...))
,一行替换
Snowflake 专有 config(
transienttransient
、
merge_exclude_columnsmerge_exclude_columns
、
copy_grantscopy_grants
)直接删除即可,ClickZetta 不支持但也不需要。
技术栈对比
原始项目(Snowflake) 迁移后(Lakehouse) dbt adapter dbt-snowflakedbt-snowflake
dbt-clickzetta >= 1.7.8dbt-clickzetta >= 1.7.8
计算资源配置 snowflake_warehouse: target.warehousesnowflake_warehouse: target.warehouse
refresh_vc: defaultrefresh_vc: default
Dynamic Table 刷新周期 target_lag: '1 hour'target_lag: '1 hour'
refresh_interval: '1 HOUR'refresh_interval: '1 HOUR'
配置变更策略 on_configuration_change: applyon_configuration_change: apply
不支持,用 ALTER DYNAMIC TABLEALTER DYNAMIC TABLE
Stream 变更类型列 METADATA$ACTIONMETADATA$ACTION
`__change_type``__change_type`
Stream 消费模式 SELECT *SELECT *
SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)
Surrogate key SEQUENCE .nextvalSEQUENCE .nextval
IDENTITYIDENTITY
列(建表时)或 row_number() over (...)row_number() over (...)
多列哈希 hash(col1, col2, ...)hash(col1, col2, ...)
hash_combine(crc32(col1), crc32(col2), ...)hash_combine(crc32(col1), crc32(col2), ...)
行生成 table(generator(rowcount => N))table(generator(rowcount => N))
explode(sequence(0, N-1))explode(sequence(0, N-1))
采样 sample (10)sample (10)
TABLESAMPLE SYSTEM(10)TABLESAMPLE SYSTEM(10)
数据源 SNOWFLAKE_SAMPLE_DATA.TPCH_SF1SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
clickzetta_sample_data.tpch_100gclickzetta_sample_data.tpch_100g
FX 汇率数据 Cybersyn Marketplace(Snowflake 专属) mock seed CSV(2020–2024,5 种货币)
标准 SQL 操作——SELECT、JOIN、GROUP BY、窗口函数、CTE、QUALIFY——语法完全一致,不需要改动。
准备工作
需要 Python 3.10+(推荐 3.12)和 dbt-clickzetta >= 1.7.8。
git clone https://github.com/clickzetta/snowflake-dbt2lakehouse-dbt.git
cd snowflake-dbt2lakehouse-dbt
python3 -m venv .venv
source .venv/bin/activate
pip install "dbt-clickzetta>=1.7.8"
复制连接配置模板,填入你的连接信息:
cp 03_lakehouse/profiles.yml.example ~/.dbt/profiles.yml
profiles.ymlprofiles.yml
关键字段差异:
accountaccount
→
serviceservice
(API 地址),
warehousewarehouse
→
vclustervcluster
(计算集群名)。完整字段说明见
dbt ClickZetta adapter 使用指南 。
验证连接:
cd 03_lakehouse
dbt debug
快速验证模式 :TPC-H SF100 数据量大(customers 3000万行、orders 1.5亿行),首次全量运行约 10 分钟。如需快速验证迁移正确性,可用
sample_limitsample_limit
变量限制 staging 层数据量:
# 采样 1 万行,约 1 分钟
bash run.sh --limit 10000
# 全量运行
bash run.sh --full
或者直接用 dbt:
dbt build --vars '{"sample_limit": 10000}' # 采样模式,~1 分钟
dbt build # 全量模式,~10 分钟
迁移步骤
第一步:数据源替换
原始项目依赖两个 Snowflake 专属数据源:
TPC-H 数据 :
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
→
clickzetta_sample_data.tpch_100gclickzetta_sample_data.tpch_100g
ClickZetta 内置共享数据集,无需导入,直接在
_sources.yml_sources.yml
里修改
schemaschema
:
- name: TPC_H
schema: TPCH_SF1
database: SNOWFLAKE_SAMPLE_DATA
改为(需要 dbt-clickzetta >= 1.7.8):
- name: TPC_H
database: clickzetta_sample_data
schema: tpch_100g
Cybersyn FX 汇率数据 :Snowflake Marketplace 专属,ClickZetta 没有对应数据源。项目里已提供 mock seed 替代(2020–2024,USD 基准,5 种货币,9135 行):
dbt seed
第二步:Dynamic Table 参数
只需替换两个参数名:
{{ config(
materialized='dynamic_table',
snowflake_warehouse=target.warehouse,
target_lag='1 hour',
on_configuration_change='apply'
) }}
改为:
{{ config(
materialized='dynamic_table',
refresh_vc='default',
refresh_interval='1 HOUR'
) }}
on_configuration_change='apply'on_configuration_change='apply'
直接删除——ClickZetta 不支持,需要修改 Dynamic Table 配置时用
ALTER DYNAMIC TABLEALTER DYNAMIC TABLE
手动执行。
原始项目还有一个
target_lag='DOWNSTREAM'target_lag='DOWNSTREAM'
(下游触发刷新),ClickZetta 没有这个概念,改成显式的
refresh_intervalrefresh_interval
:
{{ config(materialized='dynamic_table', target_lag='DOWNSTREAM') }}
改为:
{{ config(materialized='dynamic_table', refresh_vc='default', refresh_interval='1 HOUR') }}
第三步:CDC Stream
Snowflake Streams 和 ClickZetta Table Stream 概念相同,但创建语法和系统列名不同:
Snowflake Stream ClickZetta Table Stream 创建语法 CREATE STREAM ... ON TABLE t SHOW_INITIAL_ROWS = TRUECREATE STREAM ... ON TABLE t SHOW_INITIAL_ROWS = TRUE
CREATE TABLE STREAM ... ON TABLE t WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')CREATE TABLE STREAM ... ON TABLE t WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')
变更类型列 METADATA$ACTIONMETADATA$ACTION
(INSERT / DELETE)`__change_type``__change_type`
(INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE)是否 UPDATE METADATA$ISUPDATEMETADATA$ISUPDATE
(布尔值)通过 __change_type = 'UPDATE_BEFORE'__change_type = 'UPDATE_BEFORE'
判断 行标识 METADATA$ROW_IDMETADATA$ROW_ID
__commit_version__commit_version
提交时间 无直接对应 __commit_timestamp__commit_timestamp
消费 Stream 的推荐写法 :用
SELECT * EXCEPT(...)SELECT * EXCEPT(...)
过滤系统列,不需要硬编码业务列名:
select
row_number() over (order by `__commit_timestamp`, customer_key) as log_id,
* except (`__change_type`, `__commit_timestamp`, `__commit_version`),
`__change_type` as cdc_change_type,
`__commit_timestamp` as cdc_commit_ts,
`__commit_version` as cdc_version,
case when `__change_type` = 'DELETE' then 'Y' else 'N' end as delete_flag
from {{ get_table_stream(ref('dim_customers')) }} as d
⚠️ 注意 :
__change_type__change_type
等系统列名需要用 backtick 引用。
SELECT *SELECT *
不加
EXCEPTEXCEPT
会报保留名冲突错误。
创建 Table Stream 的 macro (替换 Snowflake 的
get_stream()get_stream()
macro,完整代码见
03_lakehouse/macros/get_table_stream.sql03_lakehouse/macros/get_table_stream.sql
):
{%- macro get_table_stream(table, stream_name=( (this.alias or this.name) ~ "_ts" )) -%}
{%- set stream_full = this.schema ~ "." ~ stream_name -%}
{% if execute and flags.WHICH in ('run', 'build') %}
{%- set stream_create_statement -%}
create table stream if not exists {{ stream_full }}
on table {{ table }}
with properties ('TABLE_STREAM_MODE' = 'STANDARD')
{%- endset -%}
{%- do run_query(stream_create_statement) -%}
{%- endif -%}
{{ return(stream_full) }}
{%- endmacro -%}
第四步:Surrogate Key
Snowflake 用
SEQUENCESEQUENCE
对象生成自增 surrogate key,ClickZetta 支持
IDENTITYIDENTITY
自增列,语义更接近:
{{ sequence_get_nextval() }} as customer_surrogate_key
ClickZetta 建表时直接在列定义里加
identityidentity
:
id bigint identity
不过 dbt incremental 模型无法直接在
configconfig
里声明
identityidentity
列,需要通过
pre_hookpre_hook
建表或用
row_number()row_number()
作为替代。本项目采用
row_number()row_number()
方案:
row_number() over (order by s.customer_key) as customer_surrogate_key
⚠️ 注意语义差异 :
SEQUENCESEQUENCE
和
IDENTITYIDENTITY
都是全局自增的,同一条记录在每次 incremental run 里始终保持同一个 surrogate key。
row_number()row_number()
是基于当前查询结果集计算的,全量重跑时行号会重新计算,同一条记录可能得到不同的值。因此使用
row_number()row_number()
时,
unique_keyunique_key
不能用 surrogate key,必须改为业务主键:
{{ config(
materialized='incremental',
unique_key='integration_id'
) }}
其他需要同步删除的 Snowflake 专有 config(ClickZetta 不支持也不需要):
transient=false
merge_exclude_columns=[...]
第五步:多列哈希
Snowflake 的
hash()hash()
接受任意类型的多列,ClickZetta 用
hash_combine(crc32(col), ...)hash_combine(crc32(col), ...)
替代:
hash(c.customer_name, c.customer_address, c.nation_key, ...) as cdc_hash_key
改为:
hash_combine(
crc32(coalesce(c.customer_name, '')),
crc32(coalesce(c.customer_address, '')),
crc32(coalesce(cast(c.nation_key as varchar), '')),
...
) as cdc_hash_key
hash_combine_commutative()hash_combine_commutative()
只接受
bigintbigint
参数,varchar 列需要先用
crc32()crc32()
转换为整数再组合。
第六步:日历表行生成
Snowflake 的
table(generator(rowcount => N))table(generator(rowcount => N))
用于生成指定行数的序列,ClickZetta 用
explode(sequence(0, N-1))explode(sequence(0, N-1))
替代:
select row_number() over (order by seq4()) as day_seq
from table(generator(rowcount => 50 * 365))
改为:
select date_add(date('1992-01-01'), pos) as day_dt, pos + 1 as day_seq
from (select explode(sequence(0, 50 * 365 - 1)) as pos) t
同时两处日期函数需要调整:
to_char(date, 'YYYYMMDD')::number(8,0)to_char(date, 'YYYYMMDD')::number(8,0)
改为
cast(date_format(date, 'yyyyMMdd') as int)cast(date_format(date, 'yyyyMMdd') as int)
;
last_day(date, 'YEAR')last_day(date, 'YEAR')
改为
date(concat(extract(year from date), '-12-31'))date(concat(extract(year from date), '-12-31'))
。
第七步:Python model
原始项目的
async_bulk_operations.pyasync_bulk_operations.py
深度依赖 Snowpark stored procedures(
session.sproc.registersession.sproc.register
),ClickZetta 不支持,这个模型需要跳过或重新设计。
customer_clustering.pycustomer_clustering.py
的 K-means 聚类逻辑
无法直接迁移 ——dbt-clickzetta 目前只支持 SQL 模型,不支持 Python 模型。
替代方案:用 SQL 近似实现聚类逻辑(如
NTILENTILE
分桶),详见项目中的
customer_clustering.sqlcustomer_clustering.sql
。
端到端验证
e2e.pye2e.py
对迁移结果执行 12 项自动化检查,覆盖从 seed 到 gold 层的全链路:
python3 e2e.py
实际运行结果:
Done. PASS=27 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=27
✓ PASS fx_rates row count: 9135 行
✓ PASS stg_customers: 1500 万行(SF100 dedup 后)
✓ PASS stg_orders: 1.5 亿行
✓ PASS stg_orders_incremental: 有数据
✓ PASS customer_segments: 5 个分群(PREMIUM/HIGH_VALUE/STANDARD/BASIC/LOW_VALUE)
✓ PASS fx_rates intermediate: 9135 行
✓ PASS dim_customers: 3000 万行
✓ PASS dim_orders: 有数据
✓ PASS dim_calendar_day: 18250 行(50 年)
✓ PASS customer_insights: 无 null 分类
✓ PASS customer_cdc_stream: 有数据
✓ PASS DIM_CUSTOMER_CHANGES: 有数据
=== Result: 12/12 checks passed ===
12/12 验证通过 。
预期运行时间
首次运行约 9-10 分钟 (默认最小规格 VCluster)。Dynamic Table 首次创建需全量刷新(约 4 分钟),这是正常行为,不是卡住。后续增量刷新只需几秒。
相关文档