Snowflake Dynamic Tables 迁移实战:Bronze–Silver–Gold 三层管道
如果你在 Snowflake 上用 Dynamic Tables 构建了 Medallion 数据管道,迁移到 ClickZetta Lakehouse 的成本主要集中在 4 处语法差异,核心的 SQL 查询逻辑不需要改动。
本文用一个真实迁移项目演示完整过程:将基于 Snowflake Dynamic Tables 的 Bronze–Silver–Gold 三层管道迁移到 ClickZetta Lakehouse,数据源使用两个平台共有的 TPC-H 标准数据集,全部 SQL 经过 cz-cli 实际验证。
完整代码见 GitHub:snowflake2lakehouse-dynamic-tables
原始项目
snowflake2lakehouse-dynamic-tables 基于 Techy-Malay/snowflake-bsg-dynamic-tables 改造,演示如何在 Snowflake 上用 Dynamic Tables 实现 Bronze–Silver–Gold 三层架构。项目以 TPC-H ORDERS 表为数据源,经过三层 Dynamic Table 处理后输出每日销售汇总。
迁移后的代码在
03_lakehouse/
03_lakehouse/
目录,原始 Snowflake SQL 保留在
01_snowflake/
01_snowflake/
供对照。
结论先行
核心 SQL 查询逻辑不需要改动。 4 处改动全是平台配置替换:
TARGET_LAG
TARGET_LAG
→
REFRESH INTERVAL
REFRESH INTERVAL
,
WAREHOUSE
WAREHOUSE
→
VCLUSTER
VCLUSTER
,
DATA_RETENTION_TIME_IN_DAYS
DATA_RETENTION_TIME_IN_DAYS
改为建表后单独执行,
DOWNSTREAM
DOWNSTREAM
级联刷新改为各层独立配置刷新周期。
| 改动项 | 工作量 | 说明 |
|---|
| Dynamic Table 刷新参数 | 极低 | TARGET_LAG
TARGET_LAG → REFRESH INTERVAL
REFRESH INTERVAL ,WAREHOUSE
WAREHOUSE → VCLUSTER
VCLUSTER |
| DOWNSTREAM 级联刷新 | 低 | ClickZetta 无此概念,各层独立设置 REFRESH INTERVAL
REFRESH INTERVAL |
| Time Travel 保留期 | 极低 | 从 CREATE TABLE 内联选项改为建表后 ALTER TABLE SET PROPERTIES
ALTER TABLE SET PROPERTIES |
| 数据源引用 | 极低 | SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1 → clickzetta_sample_data.tpch_100g
clickzetta_sample_data.tpch_100g |
清洗、去重(QUALIFY)、聚合、日期截断——核心 SQL 逻辑语法完全一致,不需要改。
技术栈对比
| 原始项目(Snowflake) | 迁移后(Lakehouse) |
|---|
| 计算资源 | WAREHOUSE = compute_wh
WAREHOUSE = compute_wh | VCLUSTER default
VCLUSTER default |
| 刷新策略 | TARGET_LAG = '5 minutes'
TARGET_LAG = '5 minutes' | REFRESH INTERVAL '5' MINUTE
REFRESH INTERVAL '5' MINUTE |
| 依赖传播 | TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM' (自动级联) | 无此概念,各层独立刷新 |
| 手动刷新 | ALTER DYNAMIC TABLE ... REFRESH
ALTER DYNAMIC TABLE ... REFRESH | REFRESH DYNAMIC TABLE ...
REFRESH DYNAMIC TABLE ... |
| Time Travel 保留期 | DATA_RETENTION_TIME_IN_DAYS = 1
DATA_RETENTION_TIME_IN_DAYS = 1 (CREATE TABLE 内联选项) | ALTER TABLE ... SET PROPERTIES ('data_retention_days' = '1')
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = '1') (建表后单独执行) |
| 样本数据集 | SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS (1GB) | clickzetta_sample_data.tpch_100g.orders
clickzetta_sample_data.tpch_100g.orders (100GB) |
| Schema 引用 | USE SCHEMA
USE SCHEMA + 非限定名,或完全限定名 | 相同,两种写法均支持 |
| 去重语法 | QUALIFY ROW_NUMBER() OVER (...) = 1
QUALIFY ROW_NUMBER() OVER (...) = 1 | 相同语法,完全支持 |
| 日期截断 | DATE_TRUNC('day', ts)
DATE_TRUNC('day', ts) | 相同语法,完全支持 |
变化的主要是平台配置——从 Snowflake Virtual Warehouse 换成 Lakehouse VCluster,从
TARGET_LAG
TARGET_LAG
换成
REFRESH INTERVAL
REFRESH INTERVAL
。数据处理的核心 SQL 逻辑完全不变:清洗、去重、聚合,这些在 Lakehouse 上写法与 Snowflake 一致。
架构概览
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS clickzetta_sample_data.tpch_100g.orders
(Snowflake 内置样本数据集,1GB) (Lakehouse 共享数据集,100GB)
│ │
▼ ▼
ORDERS_STG orders_stg
DATA_RETENTION_TIME_IN_DAYS = 1 SET PROPERTIES ('data_retention_days' = '1')
│ │
▼ TARGET_LAG = '5 minutes' ▼ REFRESH INTERVAL '5' MINUTE
bronze_orders bronze_orders
(原始数据 + 摄取元数据) (原始数据 + 摄取元数据)
│ │
▼ TARGET_LAG = '5 minutes' + QUALIFY ▼ REFRESH INTERVAL '5' MINUTE + QUALIFY
silver_orders silver_orders
(清洗、去重、类型标准化) (清洗、去重、类型标准化)
│ │
▼ TARGET_LAG = '10 minutes' ▼ REFRESH INTERVAL '10' MINUTE
gold_sales_summary gold_sales_summary
(每日销售汇总,面向分析) (每日销售汇总,面向分析)

迁移步骤
第一步:替换样本数据集引用
Snowflake 的内置样本数据通过
SNOWFLAKE_SAMPLE_DATA
SNOWFLAKE_SAMPLE_DATA
数据库访问,Lakehouse 的共享数据集通过
clickzetta_sample_data
clickzetta_sample_data
访问。
Snowflake:
CREATE OR REPLACE TABLE ARCH_BSG_DYNAMIC_TABLES.ORDERS_STG
DATA_RETENTION_TIME_IN_DAYS = 1
AS
SELECT
O_ORDERKEY AS order_id,
O_CUSTKEY AS customer_id,
O_ORDERDATE AS order_ts,
O_TOTALPRICE AS amount
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS;
Lakehouse:
CREATE SCHEMA IF NOT EXISTS bsg_dynamic_tables;
CREATE OR REPLACE TABLE bsg_dynamic_tables.orders_stg
AS
SELECT
O_ORDERKEY AS order_id,
O_CUSTKEY AS customer_id,
O_ORDERDATE AS order_ts,
O_TOTALPRICE AS amount
FROM clickzetta_sample_data.tpch_100g.orders;
ALTER TABLE bsg_dynamic_tables.orders_stg
SET PROPERTIES ('data_retention_days' = '1');
两处变化:
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
→ clickzetta_sample_data.tpch_100g
clickzetta_sample_data.tpch_100g
(数据集名称不同,列名相同)
DATA_RETENTION_TIME_IN_DAYS = 1
DATA_RETENTION_TIME_IN_DAYS = 1
从 DDL 内联选项改为建表后单独执行 ALTER TABLE ... SET PROPERTIES
ALTER TABLE ... SET PROPERTIES
第二步:替换 Dynamic Table 语法
这是迁移的核心部分,涉及 3 处语法变化。
TARGET_LAG
TARGET_LAG
→ REFRESH INTERVAL
REFRESH INTERVAL
Snowflake 用
TARGET_LAG
TARGET_LAG
声明可接受的数据延迟(平台自动决定刷新频率);Lakehouse 用
REFRESH INTERVAL
REFRESH INTERVAL
设置固定刷新周期。
Snowflake:
CREATE OR REPLACE DYNAMIC TABLE bronze_orders
TARGET_LAG = '5 minutes'
WAREHOUSE = compute_wh
AS
SELECT ...
Lakehouse:
CREATE OR REPLACE DYNAMIC TABLE bsg_dynamic_tables.bronze_orders
REFRESH INTERVAL '5' MINUTE
VCLUSTER default
AS
SELECT ...
WAREHOUSE
WAREHOUSE
→ VCLUSTER
VCLUSTER
Snowflake Virtual Warehouse 对应 Lakehouse VCluster,填写你的 VCluster 名称(大多数实例默认为
default
default
)。
TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
的处理
Snowflake 支持
TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
,让上游表的刷新自动触发下游表刷新,形成依赖级联。Lakehouse 没有这个概念——每张 Dynamic Table 按自己的
REFRESH INTERVAL
REFRESH INTERVAL
独立刷新。
实践建议:将 Tier 1(Bronze/Silver)的刷新间隔设置得比 Tier 2(Gold)短,近似模拟级联效果。例如 Bronze/Silver 设 5 分钟,Gold 设 10 分钟,Gold 刷新时 Bronze/Silver 已经是最新数据。
第三步:替换手动刷新命令
Snowflake:
ALTER DYNAMIC TABLE bronze_orders REFRESH;
Lakehouse:
REFRESH DYNAMIC TABLE bsg_dynamic_tables.bronze_orders;
注意刷新必须按依赖顺序手动执行——Lakehouse 不会自动级联:
REFRESH DYNAMIC TABLE bsg_dynamic_tables.bronze_orders;
REFRESH DYNAMIC TABLE bsg_dynamic_tables.silver_orders;
REFRESH DYNAMIC TABLE bsg_dynamic_tables.gold_sales_summary;
第四步:完全兼容的部分(无需修改)
以下语法在 Lakehouse 中与 Snowflake 完全一致:
QUALIFY ROW_NUMBER() OVER (...) = 1
QUALIFY ROW_NUMBER() OVER (...) = 1
(Silver 层去重)
-- Snowflake 和 Lakehouse 写法完全相同
FROM bronze_orders
QUALIFY ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY ingestion_ts DESC
) = 1;
DATE_TRUNC('day', ts)
DATE_TRUNC('day', ts)
(Gold 层日期聚合)
-- Snowflake 和 Lakehouse 写法完全相同
SELECT
DATE_TRUNC('day', order_ts) AS order_date,
COUNT(*) AS total_orders,
SUM(amount) AS total_sales
FROM silver_orders
GROUP BY DATE_TRUNC('day', order_ts);
CAST(order_ts AS TIMESTAMP)
CAST(order_ts AS TIMESTAMP)
(Silver 层类型转换)
-- Snowflake 和 Lakehouse 写法完全相同
CAST(order_ts AS TIMESTAMP) AS order_ts
验证结果
所有 SQL 经过 cz-cli 在 Lakehouse 实例上实际运行验证:
| 表 | 行数 | 说明 |
|---|
orders_stg
orders_stg | 100,000 | 从 1.5 亿行 TPC-H 数据集采样 |
bronze_orders
bronze_orders | 100,000 | 加入 ingestion_ts
ingestion_ts 、source_system
source_system 两列 |
silver_orders
silver_orders | 100,000 | QUALIFY 去重后(TPC-H 源数据无重复) |
gold_sales_summary
gold_sales_summary | 103 | 103 个不同订单日期,总销售额 $150 亿 |
运行完成后,用以下命令清理所有 Lakehouse 对象:
cz-cli sql -f 03_lakehouse/06_cleanup.sql --profile <your-profile> --sync --write
迁移结论
Snowflake Dynamic Tables 与 Lakehouse Dynamic Tables 的 SQL 查询逻辑高度兼容,本项目验证了以下结论:
完全兼容(无需修改):
需要修改的 4 处:
| 差异点 | Snowflake | Lakehouse |
|---|
| 计算资源 | WAREHOUSE = wh_name
WAREHOUSE = wh_name | VCLUSTER vcluster_name
VCLUSTER vcluster_name |
| 刷新策略 | TARGET_LAG = 'N minutes'
TARGET_LAG = 'N minutes' | REFRESH INTERVAL 'N' MINUTE
REFRESH INTERVAL 'N' MINUTE |
| 依赖级联 | TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM' | 无此概念,各层独立设置间隔 |
| 手动刷新 | ALTER DYNAMIC TABLE ... REFRESH
ALTER DYNAMIC TABLE ... REFRESH | REFRESH DYNAMIC TABLE ...
REFRESH DYNAMIC TABLE ... |
| Time Travel 保留期 | DATA_RETENTION_TIME_IN_DAYS = N
DATA_RETENTION_TIME_IN_DAYS = N (CREATE TABLE 内联选项) | ALTER TABLE ... SET PROPERTIES ('data_retention_days' = 'N')
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = 'N') (建表后单独执行) |
参考