Snowpark → ZettaPark 迁移实战:Frostbyte 数据工程管道

如果你在 Snowflake 上用 Snowpark Python 构建了数据工程管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 3 处:数据加载方式(Stage → Volume)、存储过程(改为 Python 脚本)、任务调度(TASK → cz-cli task)。DataFrame API 本身高度兼容,业务逻辑不需要改写。

本文用一个真实迁移项目演示完整过程:将 Snowflake 官方 Frostbyte 数据工程 Lab(

sfguide-data-engineering-with-snowpark-python
sfguide-data-engineering-with-snowpark-python
,150 stars)迁移到 ClickZetta Lakehouse,数据源使用原始 Frostbyte S3 公开数据集,全部代码经过实际运行验证。

完整代码见 GitHub:snowflake2lakehouse-data-engineering


原始项目

snowflake2lakehouse-data-engineering 基于 Snowflake-Labs/sfguide-data-engineering-with-snowpark-python 改造,演示如何在 Snowflake 上用 Snowpark Python 构建完整的数据工程管道:从 S3 加载 Frostbyte 餐车订单数据,经过清洗、JOIN 打平、增量合并,最终输出每日城市销售指标。

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,原始 Snowflake 代码保留在
01_snowflake/
01_snowflake/
供对照。

结论先行

DataFrame 业务逻辑一行不需要改。 3 处改动全是平台基础设施替换:数据加载从 S3 Stage 换成 Volume,存储过程改为普通 Python 脚本,任务调度从 TASK 换成 cz-cli。

with_column
with_column
join
join
group_by
group_by
merge
merge
、Window 函数——这些核心操作在 ZettaPark 中与 Snowpark 完全一致。

改动项工作量说明
数据加载方式S3 External Stage → Volume,
session.file.put()
session.file.put()
+
save_as_table()
save_as_table()
存储过程
CREATE PROCEDURE
CREATE PROCEDURE
→ 普通 Python 脚本,逻辑不变
任务调度
CREATE TASK
CREATE TASK
cz-cli task create/deploy/execute
cz-cli task create/deploy/execute

Python UDF 和 Stream on View 有额外限制(见迁移步骤),但不影响核心数据处理逻辑。


技术栈对比

原始项目(Snowflake)迁移后(Lakehouse)
数据加载S3 External Stage +
COPY INTO
COPY INTO
Volume +
session.file.put()
session.file.put()
+
save_as_table()
save_as_table()
DataFrame API
snowflake.snowpark
snowflake.snowpark
clickzetta.zettapark
clickzetta.zettapark
(import 路径,API 相同)
存储过程Python Stored Procedure(
CREATE PROCEDURE
CREATE PROCEDURE
普通 Python 脚本(逻辑不变)
Python UDFSnowpark Python UDF(依赖 scipy)SQL UDF(
RETURN expr
RETURN expr
语法)
Stream on View
CREATE STREAM ON VIEW
CREATE STREAM ON VIEW
不支持;需先物化 View 为 Table,再建 Stream
任务调度
CREATE TASK ... WHEN SYSTEM$STREAM_HAS_DATA(...)
CREATE TASK ... WHEN SYSTEM$STREAM_HAS_DATA(...)
cz-cli task create/deploy/execute
cz-cli task create/deploy/execute
计算资源
WAREHOUSE = HOL_WH
WAREHOUSE = HOL_WH
VCLUSTER default
VCLUSTER default

DataFrame 的核心操作(

with_column
with_column
join
join
group_by
group_by
agg
agg
merge
merge
、Window 函数)在 ZettaPark 中与 Snowpark 完全一致,业务逻辑一行不需要改。



架构概览

S3 (sfquickstarts bucket) Volume (frostbyte_raw_pos.frostbyte_vol) pos/order_header/year=2021/ pos/order_header/ pos/order_detail/year=2021/ pos/order_detail/ pos/menu/ · pos/truck/ · ... pos/menu/ · pos/truck/ · ... customer/customer_loyalty/ customer/customer_loyalty/ │ │ │ COPY INTO (Snowflake) │ session.file.put() + save_as_table() ▼ ▼ RAW_POS / RAW_CUSTOMER schemas frostbyte_raw_pos / frostbyte_raw_customer │ │ │ Snowpark DataFrame join │ ZettaPark DataFrame join (identical) ▼ ▼ POS_FLATTENED_V (View) pos_flattened_v (View, SQL CREATE VIEW) + Stream ON VIEW + pos_flattened_v_table (materialized) + Stream ON TABLE (WITH PROPERTIES) │ │ │ Python Stored Procedure │ Python Script (06_orders_update.py) ▼ ▼ HARMONIZED.ORDERS frostbyte_harmonized.orders │ │ │ Python Stored Procedure │ Python Script (07_daily_city_metrics.py) ▼ ▼ ANALYTICS.DAILY_CITY_METRICS frostbyte_analytics.daily_city_metrics │ │ │ CREATE TASK ... AFTER ... │ cz-cli task create/deploy/execute ▼ ▼ Task Orchestration (Snowsight) Studio Task (cz-cli)


快速开始

git clone https://github.com/clickzetta/snowflake2lakehouse-data-engineering.git cd snowflake2lakehouse-data-engineering/03_lakehouse cp .env.example .env # 填写 .env 里的连接信息 pip install clickzetta_zettapark_python python-dotenv # 下载 Frostbyte 数据(公开 S3,无需 AWS 账号) aws s3 sync s3://sfquickstarts/data-engineering-with-snowpark-python/ ./datasets/ \ --no-sign-request \ --exclude "pos/order_header/*" --exclude "pos/order_detail/*" # order_header 和 order_detail 各取一个 year=2021 文件(约 90MB) aws s3 cp s3://sfquickstarts/data-engineering-with-snowpark-python/pos/order_header/year=2021/data_01a91b48-0605-6a9c-0000-711101079122_005_4_0.snappy.parquet \ ./datasets/pos/order_header/ --no-sign-request aws s3 cp s3://sfquickstarts/data-engineering-with-snowpark-python/pos/order_detail/year=2021/data_01a91b50-0605-721e-0000-71110107a166_008_0_0.snappy.parquet \ ./datasets/pos/order_detail/ --no-sign-request # 一键运行全流程并验证(24 项数据校验) python e2e.py --teardown # 或者分步运行: python setup.py # 建 schema/volume,上传数据,注册 cz-cli profile python steps/02_load_raw.py # 加载原始数据到 Lakehouse python steps/04_create_pos_view.py # 建 View + Table Stream cz-cli sql -f steps/05_udf.sql --profile frostbyte --sync --write # 建 SQL UDF python steps/06_orders_update.py # 合并订单数据 python steps/07_daily_city_metrics.py # 计算每日城市指标 export $(grep -v '#' .env | xargs) && bash steps/08_orchestrate_tasks.sh # 部署调度任务 # 清理所有对象(Studio task + SQL 对象 + Volume) bash steps/11_teardown.sh


迁移步骤

第一步:数据加载(Stage → Volume)

这是迁移的第一个差异点,也是最直观的。

Snowflake 用 External Stage 指向 S3,通过

COPY INTO
COPY INTO
加载:

CREATE STAGE FROSTBYTE_RAW_STAGE URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/'; COPY INTO RAW_POS.ORDER_HEADER FROM @FROSTBYTE_RAW_STAGE/pos/order_header/ FILE_FORMAT = (FORMAT_NAME = PARQUET_FORMAT) MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

Lakehouse 用 Volume 作为托管存储,通过 ZettaPark 上传文件后用

save_as_table
save_as_table
加载:

# 上传到 Volume(替代 S3 Stage) session.file.put(str(local_file), "vol://frostbyte_raw_pos.frostbyte_vol/pos/order_header/", auto_compress=False, overwrite=True) # 从 Volume 读取并写入表(替代 COPY INTO) df = session.read.option("compression", "snappy").parquet( "vol://frostbyte_raw_pos.frostbyte_vol/pos/order_header/" ) df.write.save_as_table("frostbyte_raw_pos.order_header", mode="overwrite")

第二步:DataFrame API(几乎不需要改)

Snowpark 和 ZettaPark 的 DataFrame API 高度兼容,只需替换 import 路径:

# Snowflake from snowflake.snowpark import Session from snowflake.snowpark import functions as F from snowflake.snowpark.window import Window # Lakehouse(只改 import,其余代码不变) from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F from clickzetta.zettapark.window import Window

以下操作在两个平台上写法完全一致:

# JOIN 打平多张表 final = order_detail.join(order_header, "order_id") \ .join(menu, "menu_item_id") \ .join(truck, "truck_id") # 增量合并(MERGE INTO) target.merge( source, target["order_detail_id"] == source["order_detail_id"], [F.when_matched().update(updates), F.when_not_matched().insert(updates)] ) # Window 函数 window = Window.partition_by("city").order_by(F.col("date").desc()) df.with_column("rank", F.row_number().over(window))

第三步:Stream on View 的处理

Snowflake 支持直接在 View 上建 Stream:

CREATE STREAM HARMONIZED.POS_FLATTENED_V_STREAM ON VIEW HARMONIZED.POS_FLATTENED_V;

Lakehouse 的 Table Stream 只支持

ON TABLE
ON TABLE
,不支持
ON VIEW
ON VIEW
。解决方案是先把 View 物化为 Table,再在 Table 上建 Stream:

# 1. 建 View(SQL 方式,确保持久化) session.sql(""" CREATE OR REPLACE VIEW frostbyte_harmonized.pos_flattened_v AS SELECT od.order_detail_id, oh.order_ts, m.truck_brand_name, ... FROM frostbyte_raw_pos.order_detail od JOIN frostbyte_raw_pos.order_header oh ON od.order_id = oh.order_id ... """).collect() # 2. 物化为 Table(Stream 只能建在 Table 上) session.sql(""" CREATE TABLE IF NOT EXISTS frostbyte_harmonized.pos_flattened_v_table AS SELECT * FROM frostbyte_harmonized.pos_flattened_v """).collect() # 3. 在 Table 上建 Stream(必须指定 TABLE_STREAM_MODE) session.sql(""" CREATE TABLE STREAM IF NOT EXISTS frostbyte_harmonized.pos_flattened_v_stream ON TABLE frostbyte_harmonized.pos_flattened_v_table WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD') """).collect()

Stream 消费时,Lakehouse Table Stream 会在数据列之外附加元数据列(

__change_type
__change_type
__commit_version
__commit_version
等)。INSERT 时需要显式指定列名,排除这些元数据列:

# 从 stream 消费数据时,用 SQL INSERT 显式指定列名 session.sql(""" INSERT INTO frostbyte_harmonized.orders SELECT order_detail_id, order_id, truck_id, ..., CURRENT_TIMESTAMP() AS meta_updated_at FROM frostbyte_harmonized.pos_flattened_v_stream """).collect()

第四步:存储过程 → Python 脚本

Snowflake 的 Python Stored Procedure 把业务逻辑封装在数据库内部,通过

CALL
CALL
触发:

CREATE OR REPLACE PROCEDURE HARMONIZED.ORDERS_UPDATE_SP() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.8' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'main' AS $$ def main(session): # 业务逻辑 ... $$; CALL HARMONIZED.ORDERS_UPDATE_SP();

Lakehouse 不支持存储过程,但逻辑完全可以改为普通 Python 脚本直接运行:

# 06_orders_update.py — 与存储过程逻辑完全相同,只是运行方式不同 from clickzetta.zettapark.session import Session def merge_order_updates(session): # 原存储过程里的业务逻辑,一行不需要改 ... if __name__ == "__main__": session = create_session() merge_order_updates(session)

第五步:Python UDF → SQL UDF

Snowflake 的 Python UDF 可以使用第三方包(如 scipy):

# Snowflake Python UDF from scipy.constants import convert_temperature def main(temp_f: float) -> float: return convert_temperature(float(temp_f), 'F', 'C')

Lakehouse 的 Python UDF 需要配置 External Function 服务(云函数部署),对于简单的数学计算,直接用 SQL UDF 更简单:

-- Lakehouse SQL UDF -- 注意:用 RETURN expr 语法(不是 AS $$ ... $$),用 DOUBLE 类型(避免 DECIMAL 精度溢出) CREATE OR REPLACE FUNCTION frostbyte_analytics.fahrenheit_to_celsius_udf(temp_f DOUBLE) RETURNS DOUBLE RETURN (temp_f - 32.0) * 5.0 / 9.0;

第六步:Task 调度(TASK → cz-cli task)

Snowflake 的 Task 可以监听 Stream 数据变化自动触发:

CREATE OR REPLACE TASK ORDERS_UPDATE_TASK WAREHOUSE = HOL_WH WHEN SYSTEM$STREAM_HAS_DATA('POS_FLATTENED_V_STREAM') AS CALL HARMONIZED.ORDERS_UPDATE_SP(); ALTER TASK DAILY_CITY_METRICS_UPDATE_TASK RESUME; EXECUTE TASK ORDERS_UPDATE_TASK;

Lakehouse 用

cz-cli task
cz-cli task
创建定时调度任务,脚本内容直接内嵌在 task 里:

# 创建 task cz-cli task create orders_update_task --type PYTHON --profile frostbyte # 保存脚本内容(连接信息内嵌,Studio task 运行在隔离环境) cz-cli task save-content orders_update_task \ --content "$(cat orders_script.py)" --profile frostbyte # 设置 cron 调度(替代 SYSTEM$STREAM_HAS_DATA 触发) cz-cli task save-cron orders_update_task \ --cron "*/5 * * * *" --profile frostbyte # 部署并立即执行一次 cz-cli task deploy orders_update_task -y --profile frostbyte cz-cli task execute orders_update_task --profile frostbyte


验证结果

所有代码经过

python e2e.py
python e2e.py
端到端验证,24/24 项数据校验通过:

行数校验项
Raw
order_header
order_header
7,336,341行数匹配
Raw
order_detail
order_detail
6,230,167行数匹配
Raw
customer_loyalty
customer_loyalty
222,540行数匹配
Raw
menu / truck / location / ...
menu / truck / location / ...
100 / 450 / 13,093 / ...行数匹配
Harmonized
pos_flattened_v
pos_flattened_v
378,941行数、15 个品牌、58 种菜品
Harmonized
orders
orders
378,941行数、无空主键、top 品牌 Freezing Point
Harmonized
orders
orders
总收入 $5,547,817.75、日期范围 2021-01-01 ~ 2022-01-01
Analytics
daily_city_metrics
daily_city_metrics
247行数、top 城市 New York City
UDF
fahrenheit_to_celsius_udf
fahrenheit_to_celsius_udf
212°F = 100°C ✓
UDF
inch_to_millimeter_udf
inch_to_millimeter_udf
1 inch = 25.4mm ✓

迁移结论

ZettaPark 与 Snowpark 的 DataFrame API 高度兼容,本项目验证了以下结论:

完全兼容(无需修改):

  • DataFrame 链式操作:
    with_column
    with_column
    join
    join
    group_by
    group_by
    agg
    agg
    filter
    filter
    select
    select
  • 函数库:
    F.when().otherwise()
    F.when().otherwise()
    F.current_timestamp()
    F.current_timestamp()
    F.row_number()
    F.row_number()
    F.sum()
    F.sum()
    F.coalesce()
    F.coalesce()
  • Window 函数:
    Window.partition_by().order_by()
    Window.partition_by().order_by()
    ,行为完全一致
  • target.merge()
    target.merge()
    MERGE INTO 语义
  • SQL UDF 语法(
    RETURN expr
    RETURN expr
  • Table Stream(
    ON TABLE
    ON TABLE

需要处理的 4 处差异:

差异点SnowflakeLakehouse
数据加载S3 Stage +
COPY INTO
COPY INTO
Volume +
session.file.put()
session.file.put()
+
save_as_table()
save_as_table()
Stream on View
CREATE STREAM ON VIEW
CREATE STREAM ON VIEW
不支持;物化 View 为 Table 后建 Stream
Python UDFSnowpark Python UDF(第三方包)SQL UDF(
RETURN expr
RETURN expr
DOUBLE
DOUBLE
类型)
存储过程 + Task
CREATE PROCEDURE
CREATE PROCEDURE
+
CREATE TASK
CREATE TASK
Python 脚本 +
cz-cli task
cz-cli task

参考

联系我们
预约咨询
微信咨询
电话咨询