DBT Snowflake 迁移实战:TPC-H 数仓管道

如果你在 Snowflake 上用 dbt 构建了数仓管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 6 处平台专有特性上,标准 SQL 模型一行不用改。

本文用一个真实迁移项目演示完整过程:将 sfc-gh-dflippo/snowflake-dbt-demo(Snowflake 官方工程师维护的 dbt 演示项目)迁移到 ClickZetta Lakehouse,数据源使用两个平台共有的 TPC-H 标准数据集,全部模型经过实际运行验证,24/24 通过。

完整代码见 GitHub:clickzetta/snowflake-dbt2lakehouse-dbt


原始项目

sfc-gh-dflippo/snowflake-dbt-demo 是 Snowflake 官方工程师维护的 dbt 功能演示项目。业务场景是 TPC-H 客户订单分析:从原始订单数据出发,经过 Bronze(清洗)→ Silver(聚合、分群)→ Gold(维度表、指标)三层处理,最终输出客户价值分层、订单事实表、日历维度等分析模型。

项目覆盖了 dbt 在 Snowflake 上的大多数高级特性:Dynamic Tables、Streams(CDC)、Sequences、Python models、Incremental 策略等,是验证迁移完整性的理想对象。

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,原始 Snowflake 代码保留在
01_snowflake/
01_snowflake/
供对照,迁移说明在
02_migration/MIGRATION_NOTES.md
02_migration/MIGRATION_NOTES.md


结论先行

你的 dbt 项目可以迁移,业务逻辑不需要重写。 这次迁移改动了 6 处,全部是平台配置和函数名替换,没有一处涉及数据处理逻辑本身。如果你的项目只用了标准 SQL(没有 Dynamic Tables、Streams、Sequences),换一个

profiles.yml
profiles.yml
就能跑通。

改动项工作量说明
profiles.yml
profiles.yml
连接配置
极低字段名对照替换,5 分钟完成
Dynamic Table 参数名极低
target_lag
target_lag
refresh_interval
refresh_interval
snowflake_warehouse
snowflake_warehouse
refresh_vc
refresh_vc
CDC Stream 列名
METADATA$ACTION
METADATA$ACTION
`__change_type`
`__change_type`
,消费时用
SELECT * EXCEPT(...)
SELECT * EXCEPT(...)
Surrogate key
SEQUENCE .nextval
SEQUENCE .nextval
IDENTITY
IDENTITY
列或
row_number() over (...)
row_number() over (...)
,注意语义差异(见第四步)
多列哈希
hash()
hash()
hash_combine(crc32(col), ...)
hash_combine(crc32(col), ...)
,需要了解 ClickZetta 函数体系
日历表行生成
table(generator(...))
table(generator(...))
explode(sequence(...))
explode(sequence(...))
,一行替换

Snowflake 专有 config(

transient
transient
merge_exclude_columns
merge_exclude_columns
copy_grants
copy_grants
)直接删除即可,ClickZetta 不支持但也不需要。


技术栈对比

原始项目(Snowflake)迁移后(Lakehouse)
dbt adapter
dbt-snowflake
dbt-snowflake
dbt-clickzetta >= 1.7.8
dbt-clickzetta >= 1.7.8
计算资源配置
snowflake_warehouse: target.warehouse
snowflake_warehouse: target.warehouse
refresh_vc: default
refresh_vc: default
Dynamic Table 刷新周期
target_lag: '1 hour'
target_lag: '1 hour'
refresh_interval: '1 HOUR'
refresh_interval: '1 HOUR'
配置变更策略
on_configuration_change: apply
on_configuration_change: apply
不支持,用
ALTER DYNAMIC TABLE
ALTER DYNAMIC TABLE
Stream 变更类型列
METADATA$ACTION
METADATA$ACTION
`__change_type`
`__change_type`
Stream 消费模式
SELECT *
SELECT *
SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)
SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)
Surrogate key
SEQUENCE .nextval
SEQUENCE .nextval
IDENTITY
IDENTITY
列(建表时)或
row_number() over (...)
row_number() over (...)
多列哈希
hash(col1, col2, ...)
hash(col1, col2, ...)
hash_combine(crc32(col1), crc32(col2), ...)
hash_combine(crc32(col1), crc32(col2), ...)
行生成
table(generator(rowcount => N))
table(generator(rowcount => N))
explode(sequence(0, N-1))
explode(sequence(0, N-1))
采样
sample (10)
sample (10)
TABLESAMPLE SYSTEM(10)
TABLESAMPLE SYSTEM(10)
数据源
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
clickzetta_sample_data.tpch_100g
clickzetta_sample_data.tpch_100g
FX 汇率数据Cybersyn Marketplace(Snowflake 专属)mock seed CSV(2020–2024,5 种货币)

标准 SQL 操作——SELECT、JOIN、GROUP BY、窗口函数、CTE、QUALIFY——语法完全一致,不需要改动。


准备工作

需要 Python 3.10+(推荐 3.12)和 dbt-clickzetta >= 1.7.8。

git clone https://github.com/clickzetta/snowflake-dbt2lakehouse-dbt.git cd snowflake-dbt2lakehouse-dbt python3 -m venv .venv source .venv/bin/activate pip install "dbt-clickzetta>=1.7.8"

复制连接配置模板,填入你的连接信息:

cp 03_lakehouse/profiles.yml.example ~/.dbt/profiles.yml

profiles.yml
profiles.yml
关键字段差异:
account
account
service
service
(API 地址),
warehouse
warehouse
vcluster
vcluster
(计算集群名)。完整字段说明见 dbt ClickZetta adapter 使用指南

验证连接:

cd 03_lakehouse dbt debug

快速验证模式:TPC-H SF100 数据量大(customers 3000万行、orders 1.5亿行),首次全量运行约 10 分钟。如需快速验证迁移正确性,可用

sample_limit
sample_limit
变量限制 staging 层数据量:

# 采样 1 万行,约 1 分钟 bash run.sh --limit 10000 # 全量运行 bash run.sh --full

或者直接用 dbt:

dbt build --vars '{"sample_limit": 10000}' # 采样模式,~1 分钟 dbt build # 全量模式,~10 分钟


迁移步骤

第一步:数据源替换

原始项目依赖两个 Snowflake 专属数据源:

TPC-H 数据

SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
clickzetta_sample_data.tpch_100g
clickzetta_sample_data.tpch_100g

ClickZetta 内置共享数据集,无需导入,直接在

_sources.yml
_sources.yml
里修改
schema
schema

- name: TPC_H schema: TPCH_SF1 database: SNOWFLAKE_SAMPLE_DATA

改为(需要 dbt-clickzetta >= 1.7.8):

- name: TPC_H database: clickzetta_sample_data schema: tpch_100g

Cybersyn FX 汇率数据:Snowflake Marketplace 专属,ClickZetta 没有对应数据源。项目里已提供 mock seed 替代(2020–2024,USD 基准,5 种货币,9135 行):

dbt seed


第二步:Dynamic Table 参数

只需替换两个参数名:

{{ config( materialized='dynamic_table', snowflake_warehouse=target.warehouse, target_lag='1 hour', on_configuration_change='apply' ) }}

改为:

{{ config( materialized='dynamic_table', refresh_vc='default', refresh_interval='1 HOUR' ) }}

on_configuration_change='apply'
on_configuration_change='apply'
直接删除——ClickZetta 不支持,需要修改 Dynamic Table 配置时用
ALTER DYNAMIC TABLE
ALTER DYNAMIC TABLE
手动执行。

原始项目还有一个

target_lag='DOWNSTREAM'
target_lag='DOWNSTREAM'
(下游触发刷新),ClickZetta 没有这个概念,改成显式的
refresh_interval
refresh_interval

{{ config(materialized='dynamic_table', target_lag='DOWNSTREAM') }}

改为:

{{ config(materialized='dynamic_table', refresh_vc='default', refresh_interval='1 HOUR') }}


第三步:CDC Stream

Snowflake Streams 和 ClickZetta Table Stream 概念相同,但创建语法和系统列名不同:

Snowflake StreamClickZetta Table Stream
创建语法
CREATE STREAM ... ON TABLE t SHOW_INITIAL_ROWS = TRUE
CREATE STREAM ... ON TABLE t SHOW_INITIAL_ROWS = TRUE
CREATE TABLE STREAM ... ON TABLE t WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')
CREATE TABLE STREAM ... ON TABLE t WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')
变更类型列
METADATA$ACTION
METADATA$ACTION
(INSERT / DELETE)
`__change_type`
`__change_type`
(INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE)
是否 UPDATE
METADATA$ISUPDATE
METADATA$ISUPDATE
(布尔值)
通过
__change_type = 'UPDATE_BEFORE'
__change_type = 'UPDATE_BEFORE'
判断
行标识
METADATA$ROW_ID
METADATA$ROW_ID
__commit_version
__commit_version
提交时间无直接对应
__commit_timestamp
__commit_timestamp

消费 Stream 的推荐写法:用

SELECT * EXCEPT(...)
SELECT * EXCEPT(...)
过滤系统列,不需要硬编码业务列名:

select row_number() over (order by `__commit_timestamp`, customer_key) as log_id, * except (`__change_type`, `__commit_timestamp`, `__commit_version`), `__change_type` as cdc_change_type, `__commit_timestamp` as cdc_commit_ts, `__commit_version` as cdc_version, case when `__change_type` = 'DELETE' then 'Y' else 'N' end as delete_flag from {{ get_table_stream(ref('dim_customers')) }} as d

创建 Table Stream 的 macro(替换 Snowflake 的

get_stream()
get_stream()
macro,完整代码见
03_lakehouse/macros/get_table_stream.sql
03_lakehouse/macros/get_table_stream.sql
):

{%- macro get_table_stream(table, stream_name=( (this.alias or this.name) ~ "_ts" )) -%} {%- set stream_full = this.schema ~ "." ~ stream_name -%} {% if execute and flags.WHICH in ('run', 'build') %} {%- set stream_create_statement -%} create table stream if not exists {{ stream_full }} on table {{ table }} with properties ('TABLE_STREAM_MODE' = 'STANDARD') {%- endset -%} {%- do run_query(stream_create_statement) -%} {%- endif -%} {{ return(stream_full) }} {%- endmacro -%}


第四步:Surrogate Key

Snowflake 用

SEQUENCE
SEQUENCE
对象生成自增 surrogate key,ClickZetta 支持
IDENTITY
IDENTITY
自增列,语义更接近:

{{ sequence_get_nextval() }} as customer_surrogate_key

ClickZetta 建表时直接在列定义里加

identity
identity

id bigint identity

不过 dbt incremental 模型无法直接在

config
config
里声明
identity
identity
列,需要通过
pre_hook
pre_hook
建表或用
row_number()
row_number()
作为替代。本项目采用
row_number()
row_number()
方案:

row_number() over (order by s.customer_key) as customer_surrogate_key

{{ config( materialized='incremental', unique_key='integration_id' ) }}

其他需要同步删除的 Snowflake 专有 config(ClickZetta 不支持也不需要):

transient=false merge_exclude_columns=[...]


第五步:多列哈希

Snowflake 的

hash()
hash()
接受任意类型的多列,ClickZetta 用
hash_combine(crc32(col), ...)
hash_combine(crc32(col), ...)
替代:

hash(c.customer_name, c.customer_address, c.nation_key, ...) as cdc_hash_key

改为:

hash_combine( crc32(coalesce(c.customer_name, '')), crc32(coalesce(c.customer_address, '')), crc32(coalesce(cast(c.nation_key as varchar), '')), ... ) as cdc_hash_key

hash_combine_commutative()
hash_combine_commutative()
只接受
bigint
bigint
参数,varchar 列需要先用
crc32()
crc32()
转换为整数再组合。


第六步:日历表行生成

Snowflake 的

table(generator(rowcount => N))
table(generator(rowcount => N))
用于生成指定行数的序列,ClickZetta 用
explode(sequence(0, N-1))
explode(sequence(0, N-1))
替代:

select row_number() over (order by seq4()) as day_seq from table(generator(rowcount => 50 * 365))

改为:

select date_add(date('1992-01-01'), pos) as day_dt, pos + 1 as day_seq from (select explode(sequence(0, 50 * 365 - 1)) as pos) t

同时两处日期函数需要调整:

to_char(date, 'YYYYMMDD')::number(8,0)
to_char(date, 'YYYYMMDD')::number(8,0)
改为
cast(date_format(date, 'yyyyMMdd') as int)
cast(date_format(date, 'yyyyMMdd') as int)
last_day(date, 'YEAR')
last_day(date, 'YEAR')
改为
date(concat(extract(year from date), '-12-31'))
date(concat(extract(year from date), '-12-31'))


第七步:Python model

原始项目的

async_bulk_operations.py
async_bulk_operations.py
深度依赖 Snowpark stored procedures(
session.sproc.register
session.sproc.register
),ClickZetta 不支持,这个模型需要跳过或重新设计。

customer_clustering.py
customer_clustering.py
的 K-means 聚类逻辑无法直接迁移——dbt-clickzetta 目前只支持 SQL 模型,不支持 Python 模型。

替代方案:用 SQL 近似实现聚类逻辑(如

NTILE
NTILE
分桶),详见项目中的
customer_clustering.sql
customer_clustering.sql


端到端验证

e2e.py
e2e.py
对迁移结果执行 12 项自动化检查,覆盖从 seed 到 gold 层的全链路:

python3 e2e.py

实际运行结果:

Done. PASS=27 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=27 ✓ PASS fx_rates row count: 9135 行 ✓ PASS stg_customers: 1500 万行(SF100 dedup 后) ✓ PASS stg_orders: 1.5 亿行 ✓ PASS stg_orders_incremental: 有数据 ✓ PASS customer_segments: 5 个分群(PREMIUM/HIGH_VALUE/STANDARD/BASIC/LOW_VALUE) ✓ PASS fx_rates intermediate: 9135 行 ✓ PASS dim_customers: 3000 万行 ✓ PASS dim_orders: 有数据 ✓ PASS dim_calendar_day: 18250 行(50 年) ✓ PASS customer_insights: 无 null 分类 ✓ PASS customer_cdc_stream: 有数据 ✓ PASS DIM_CUSTOMER_CHANGES: 有数据 === Result: 12/12 checks passed ===

12/12 验证通过

预期运行时间

首次运行约 9-10 分钟(默认最小规格 VCluster)。Dynamic Table 首次创建需全量刷新(约 4 分钟),这是正常行为,不是卡住。后续增量刷新只需几秒。


相关文档

联系我们
预约咨询
微信咨询
电话咨询