Snowflake Dynamic Tables 迁移实战:Bronze–Silver–Gold 三层管道

如果你在 Snowflake 上用 Dynamic Tables 构建了 Medallion 数据管道,迁移到 ClickZetta Lakehouse 的成本主要集中在 4 处语法差异,核心的 SQL 查询逻辑不需要改动。

本文用一个真实迁移项目演示完整过程:将基于 Snowflake Dynamic Tables 的 Bronze–Silver–Gold 三层管道迁移到 ClickZetta Lakehouse,数据源使用两个平台共有的 TPC-H 标准数据集,全部 SQL 经过 cz-cli 实际验证。

完整代码见 GitHub:snowflake2lakehouse-dynamic-tables


原始项目

snowflake2lakehouse-dynamic-tables 基于 Techy-Malay/snowflake-bsg-dynamic-tables 改造,演示如何在 Snowflake 上用 Dynamic Tables 实现 Bronze–Silver–Gold 三层架构。项目以 TPC-H ORDERS 表为数据源,经过三层 Dynamic Table 处理后输出每日销售汇总。

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,原始 Snowflake SQL 保留在
01_snowflake/
01_snowflake/
供对照。

结论先行

核心 SQL 查询逻辑不需要改动。 4 处改动全是平台配置替换:

TARGET_LAG
TARGET_LAG
REFRESH INTERVAL
REFRESH INTERVAL
WAREHOUSE
WAREHOUSE
VCLUSTER
VCLUSTER
DATA_RETENTION_TIME_IN_DAYS
DATA_RETENTION_TIME_IN_DAYS
改为建表后单独执行,
DOWNSTREAM
DOWNSTREAM
级联刷新改为各层独立配置刷新周期。

改动项工作量说明
Dynamic Table 刷新参数极低
TARGET_LAG
TARGET_LAG
REFRESH INTERVAL
REFRESH INTERVAL
WAREHOUSE
WAREHOUSE
VCLUSTER
VCLUSTER
DOWNSTREAM 级联刷新ClickZetta 无此概念,各层独立设置
REFRESH INTERVAL
REFRESH INTERVAL
Time Travel 保留期极低从 CREATE TABLE 内联选项改为建表后
ALTER TABLE SET PROPERTIES
ALTER TABLE SET PROPERTIES
数据源引用极低
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
clickzetta_sample_data.tpch_100g
clickzetta_sample_data.tpch_100g

清洗、去重(QUALIFY)、聚合、日期截断——核心 SQL 逻辑语法完全一致,不需要改。


技术栈对比

原始项目(Snowflake)迁移后(Lakehouse)
计算资源
WAREHOUSE = compute_wh
WAREHOUSE = compute_wh
VCLUSTER default
VCLUSTER default
刷新策略
TARGET_LAG = '5 minutes'
TARGET_LAG = '5 minutes'
REFRESH INTERVAL '5' MINUTE
REFRESH INTERVAL '5' MINUTE
依赖传播
TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
(自动级联)
无此概念,各层独立刷新
手动刷新
ALTER DYNAMIC TABLE ... REFRESH
ALTER DYNAMIC TABLE ... REFRESH
REFRESH DYNAMIC TABLE ...
REFRESH DYNAMIC TABLE ...
Time Travel 保留期
DATA_RETENTION_TIME_IN_DAYS = 1
DATA_RETENTION_TIME_IN_DAYS = 1
(CREATE TABLE 内联选项)
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = '1')
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = '1')
(建表后单独执行)
样本数据集
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
(1GB)
clickzetta_sample_data.tpch_100g.orders
clickzetta_sample_data.tpch_100g.orders
(100GB)
Schema 引用
USE SCHEMA
USE SCHEMA
+ 非限定名,或完全限定名
相同,两种写法均支持
去重语法
QUALIFY ROW_NUMBER() OVER (...) = 1
QUALIFY ROW_NUMBER() OVER (...) = 1
相同语法,完全支持
日期截断
DATE_TRUNC('day', ts)
DATE_TRUNC('day', ts)
相同语法,完全支持

变化的主要是平台配置——从 Snowflake Virtual Warehouse 换成 Lakehouse VCluster,从

TARGET_LAG
TARGET_LAG
换成
REFRESH INTERVAL
REFRESH INTERVAL
。数据处理的核心 SQL 逻辑完全不变:清洗、去重、聚合,这些在 Lakehouse 上写法与 Snowflake 一致。


架构概览

SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS clickzetta_sample_data.tpch_100g.orders (Snowflake 内置样本数据集,1GB) (Lakehouse 共享数据集,100GB) │ │ ▼ ▼ ORDERS_STG orders_stg DATA_RETENTION_TIME_IN_DAYS = 1 SET PROPERTIES ('data_retention_days' = '1') │ │ ▼ TARGET_LAG = '5 minutes' ▼ REFRESH INTERVAL '5' MINUTE bronze_orders bronze_orders (原始数据 + 摄取元数据) (原始数据 + 摄取元数据) │ │ ▼ TARGET_LAG = '5 minutes' + QUALIFY ▼ REFRESH INTERVAL '5' MINUTE + QUALIFY silver_orders silver_orders (清洗、去重、类型标准化) (清洗、去重、类型标准化) │ │ ▼ TARGET_LAG = '10 minutes' ▼ REFRESH INTERVAL '10' MINUTE gold_sales_summary gold_sales_summary (每日销售汇总,面向分析) (每日销售汇总,面向分析)



迁移步骤

第一步:替换样本数据集引用

Snowflake 的内置样本数据通过

SNOWFLAKE_SAMPLE_DATA
SNOWFLAKE_SAMPLE_DATA
数据库访问,Lakehouse 的共享数据集通过
clickzetta_sample_data
clickzetta_sample_data
访问。

Snowflake:

CREATE OR REPLACE TABLE ARCH_BSG_DYNAMIC_TABLES.ORDERS_STG DATA_RETENTION_TIME_IN_DAYS = 1 AS SELECT O_ORDERKEY AS order_id, O_CUSTKEY AS customer_id, O_ORDERDATE AS order_ts, O_TOTALPRICE AS amount FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS;

Lakehouse:

CREATE SCHEMA IF NOT EXISTS bsg_dynamic_tables; CREATE OR REPLACE TABLE bsg_dynamic_tables.orders_stg AS SELECT O_ORDERKEY AS order_id, O_CUSTKEY AS customer_id, O_ORDERDATE AS order_ts, O_TOTALPRICE AS amount FROM clickzetta_sample_data.tpch_100g.orders; ALTER TABLE bsg_dynamic_tables.orders_stg SET PROPERTIES ('data_retention_days' = '1');

两处变化:

  1. SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
    SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
    clickzetta_sample_data.tpch_100g
    clickzetta_sample_data.tpch_100g
    (数据集名称不同,列名相同)
  2. DATA_RETENTION_TIME_IN_DAYS = 1
    DATA_RETENTION_TIME_IN_DAYS = 1
    从 DDL 内联选项改为建表后单独执行
    ALTER TABLE ... SET PROPERTIES
    ALTER TABLE ... SET PROPERTIES

第二步:替换 Dynamic Table 语法

这是迁移的核心部分,涉及 3 处语法变化。

TARGET_LAG
TARGET_LAG
REFRESH INTERVAL
REFRESH INTERVAL

Snowflake 用

TARGET_LAG
TARGET_LAG
声明可接受的数据延迟(平台自动决定刷新频率);Lakehouse 用
REFRESH INTERVAL
REFRESH INTERVAL
设置固定刷新周期。

Snowflake:

CREATE OR REPLACE DYNAMIC TABLE bronze_orders TARGET_LAG = '5 minutes' WAREHOUSE = compute_wh AS SELECT ...

Lakehouse:

CREATE OR REPLACE DYNAMIC TABLE bsg_dynamic_tables.bronze_orders REFRESH INTERVAL '5' MINUTE VCLUSTER default AS SELECT ...

WAREHOUSE
WAREHOUSE
VCLUSTER
VCLUSTER

Snowflake Virtual Warehouse 对应 Lakehouse VCluster,填写你的 VCluster 名称(大多数实例默认为

default
default
)。

TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
的处理

Snowflake 支持

TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
,让上游表的刷新自动触发下游表刷新,形成依赖级联。Lakehouse 没有这个概念——每张 Dynamic Table 按自己的
REFRESH INTERVAL
REFRESH INTERVAL
独立刷新。

实践建议:将 Tier 1(Bronze/Silver)的刷新间隔设置得比 Tier 2(Gold)短,近似模拟级联效果。例如 Bronze/Silver 设 5 分钟,Gold 设 10 分钟,Gold 刷新时 Bronze/Silver 已经是最新数据。

第三步:替换手动刷新命令

Snowflake:

ALTER DYNAMIC TABLE bronze_orders REFRESH;

Lakehouse:

REFRESH DYNAMIC TABLE bsg_dynamic_tables.bronze_orders;

注意刷新必须按依赖顺序手动执行——Lakehouse 不会自动级联:

REFRESH DYNAMIC TABLE bsg_dynamic_tables.bronze_orders; REFRESH DYNAMIC TABLE bsg_dynamic_tables.silver_orders; REFRESH DYNAMIC TABLE bsg_dynamic_tables.gold_sales_summary;

第四步:完全兼容的部分(无需修改)

以下语法在 Lakehouse 中与 Snowflake 完全一致:

QUALIFY ROW_NUMBER() OVER (...) = 1
QUALIFY ROW_NUMBER() OVER (...) = 1
(Silver 层去重)

-- Snowflake 和 Lakehouse 写法完全相同 FROM bronze_orders QUALIFY ROW_NUMBER() OVER ( PARTITION BY order_id ORDER BY ingestion_ts DESC ) = 1;

DATE_TRUNC('day', ts)
DATE_TRUNC('day', ts)
(Gold 层日期聚合)

-- Snowflake 和 Lakehouse 写法完全相同 SELECT DATE_TRUNC('day', order_ts) AS order_date, COUNT(*) AS total_orders, SUM(amount) AS total_sales FROM silver_orders GROUP BY DATE_TRUNC('day', order_ts);

CAST(order_ts AS TIMESTAMP)
CAST(order_ts AS TIMESTAMP)
(Silver 层类型转换)

-- Snowflake 和 Lakehouse 写法完全相同 CAST(order_ts AS TIMESTAMP) AS order_ts


验证结果

所有 SQL 经过 cz-cli 在 Lakehouse 实例上实际运行验证:

行数说明
orders_stg
orders_stg
100,000从 1.5 亿行 TPC-H 数据集采样
bronze_orders
bronze_orders
100,000加入
ingestion_ts
ingestion_ts
source_system
source_system
两列
silver_orders
silver_orders
100,000QUALIFY 去重后(TPC-H 源数据无重复)
gold_sales_summary
gold_sales_summary
103103 个不同订单日期,总销售额 $150 亿

运行完成后,用以下命令清理所有 Lakehouse 对象:

cz-cli sql -f 03_lakehouse/06_cleanup.sql --profile <your-profile> --sync --write


迁移结论

Snowflake Dynamic Tables 与 Lakehouse Dynamic Tables 的 SQL 查询逻辑高度兼容,本项目验证了以下结论:

完全兼容(无需修改):

  • QUALIFY ROW_NUMBER() OVER (...) = 1
    QUALIFY ROW_NUMBER() OVER (...) = 1
    去重
  • DATE_TRUNC('day', ts)
    DATE_TRUNC('day', ts)
    日期截断
  • CAST(col AS TYPE)
    CAST(col AS TYPE)
    类型转换
  • 标准聚合函数:
    COUNT
    COUNT
    SUM
    SUM
    AVG
    AVG
  • CURRENT_TIMESTAMP()
    CURRENT_TIMESTAMP()
    系统函数

需要修改的 4 处:

差异点SnowflakeLakehouse
计算资源
WAREHOUSE = wh_name
WAREHOUSE = wh_name
VCLUSTER vcluster_name
VCLUSTER vcluster_name
刷新策略
TARGET_LAG = 'N minutes'
TARGET_LAG = 'N minutes'
REFRESH INTERVAL 'N' MINUTE
REFRESH INTERVAL 'N' MINUTE
依赖级联
TARGET_LAG = 'DOWNSTREAM'
TARGET_LAG = 'DOWNSTREAM'
无此概念,各层独立设置间隔
手动刷新
ALTER DYNAMIC TABLE ... REFRESH
ALTER DYNAMIC TABLE ... REFRESH
REFRESH DYNAMIC TABLE ...
REFRESH DYNAMIC TABLE ...
Time Travel 保留期
DATA_RETENTION_TIME_IN_DAYS = N
DATA_RETENTION_TIME_IN_DAYS = N
(CREATE TABLE 内联选项)
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = 'N')
ALTER TABLE ... SET PROPERTIES ('data_retention_days' = 'N')
(建表后单独执行)

参考

联系我们
预约咨询
微信咨询
电话咨询