Snowpark → ZettaPark 迁移实战:Frostbyte 数据工程管道
如果你在 Snowflake 上用 Snowpark Python 构建了数据工程管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 3 处:数据加载方式(Stage → Volume)、存储过程(改为 Python 脚本)、任务调度(TASK → cz-cli task)。DataFrame API 本身高度兼容,业务逻辑不需要改写。
本文用一个真实迁移项目演示完整过程:将 Snowflake 官方 Frostbyte 数据工程 Lab(
sfguide-data-engineering-with-snowpark-pythonsfguide-data-engineering-with-snowpark-python
,150 stars)迁移到 ClickZetta Lakehouse,数据源使用原始 Frostbyte S3 公开数据集,全部代码经过实际运行验证。
完整代码见 GitHub:snowflake2lakehouse-data-engineering
原始项目
snowflake2lakehouse-data-engineering 基于 Snowflake-Labs/sfguide-data-engineering-with-snowpark-python 改造,演示如何在 Snowflake 上用 Snowpark Python 构建完整的数据工程管道:从 S3 加载 Frostbyte 餐车订单数据,经过清洗、JOIN 打平、增量合并,最终输出每日城市销售指标。
迁移后的代码在
03_lakehouse/03_lakehouse/
目录,原始 Snowflake 代码保留在
01_snowflake/01_snowflake/
供对照。
结论先行
DataFrame 业务逻辑一行不需要改。 3 处改动全是平台基础设施替换:数据加载从 S3 Stage 换成 Volume,存储过程改为普通 Python 脚本,任务调度从 TASK 换成 cz-cli。
with_columnwith_column
、
joinjoin
、
group_bygroup_by
、
mergemerge
、Window 函数——这些核心操作在 ZettaPark 中与 Snowpark 完全一致。
改动项 工作量 说明 数据加载方式 低 S3 External Stage → Volume,session.file.put()session.file.put()
+ save_as_table()save_as_table()
存储过程 低 CREATE PROCEDURECREATE PROCEDURE
→ 普通 Python 脚本,逻辑不变任务调度 低 CREATE TASKCREATE TASK
→ cz-cli task create/deploy/executecz-cli task create/deploy/execute
Python UDF 和 Stream on View 有额外限制(见迁移步骤),但不影响核心数据处理逻辑。
技术栈对比
原始项目(Snowflake) 迁移后(Lakehouse) 数据加载 S3 External Stage + COPY INTOCOPY INTO
Volume + session.file.put()session.file.put()
+ save_as_table()save_as_table()
DataFrame API snowflake.snowparksnowflake.snowpark
clickzetta.zettaparkclickzetta.zettapark
(import 路径,API 相同)存储过程 Python Stored Procedure(CREATE PROCEDURECREATE PROCEDURE
) 普通 Python 脚本(逻辑不变) Python UDF Snowpark Python UDF(依赖 scipy) SQL UDF(RETURN exprRETURN expr
语法) Stream on View CREATE STREAM ON VIEWCREATE STREAM ON VIEW
不支持;需先物化 View 为 Table,再建 Stream 任务调度 CREATE TASK ... WHEN SYSTEM$STREAM_HAS_DATA(...)CREATE TASK ... WHEN SYSTEM$STREAM_HAS_DATA(...)
cz-cli task create/deploy/executecz-cli task create/deploy/execute
计算资源 WAREHOUSE = HOL_WHWAREHOUSE = HOL_WH
VCLUSTER defaultVCLUSTER default
DataFrame 的核心操作(
with_columnwith_column
、
joinjoin
、
group_bygroup_by
、
aggagg
、
mergemerge
、Window 函数)在 ZettaPark 中与 Snowpark 完全一致,业务逻辑一行不需要改。
架构概览
S3 (sfquickstarts bucket) Volume (frostbyte_raw_pos.frostbyte_vol)
pos/order_header/year=2021/ pos/order_header/
pos/order_detail/year=2021/ pos/order_detail/
pos/menu/ · pos/truck/ · ... pos/menu/ · pos/truck/ · ...
customer/customer_loyalty/ customer/customer_loyalty/
│ │
│ COPY INTO (Snowflake) │ session.file.put() + save_as_table()
▼ ▼
RAW_POS / RAW_CUSTOMER schemas frostbyte_raw_pos / frostbyte_raw_customer
│ │
│ Snowpark DataFrame join │ ZettaPark DataFrame join (identical)
▼ ▼
POS_FLATTENED_V (View) pos_flattened_v (View, SQL CREATE VIEW)
+ Stream ON VIEW + pos_flattened_v_table (materialized)
+ Stream ON TABLE (WITH PROPERTIES)
│ │
│ Python Stored Procedure │ Python Script (06_orders_update.py)
▼ ▼
HARMONIZED.ORDERS frostbyte_harmonized.orders
│ │
│ Python Stored Procedure │ Python Script (07_daily_city_metrics.py)
▼ ▼
ANALYTICS.DAILY_CITY_METRICS frostbyte_analytics.daily_city_metrics
│ │
│ CREATE TASK ... AFTER ... │ cz-cli task create/deploy/execute
▼ ▼
Task Orchestration (Snowsight) Studio Task (cz-cli)
快速开始
git clone https://github.com/clickzetta/snowflake2lakehouse-data-engineering.git
cd snowflake2lakehouse-data-engineering/03_lakehouse
cp .env.example .env
# 填写 .env 里的连接信息
pip install clickzetta_zettapark_python python-dotenv
# 下载 Frostbyte 数据(公开 S3,无需 AWS 账号)
aws s3 sync s3://sfquickstarts/data-engineering-with-snowpark-python/ ./datasets/ \
--no-sign-request \
--exclude "pos/order_header/*" --exclude "pos/order_detail/*"
# order_header 和 order_detail 各取一个 year=2021 文件(约 90MB)
aws s3 cp s3://sfquickstarts/data-engineering-with-snowpark-python/pos/order_header/year=2021/data_01a91b48-0605-6a9c-0000-711101079122_005_4_0.snappy.parquet \
./datasets/pos/order_header/ --no-sign-request
aws s3 cp s3://sfquickstarts/data-engineering-with-snowpark-python/pos/order_detail/year=2021/data_01a91b50-0605-721e-0000-71110107a166_008_0_0.snappy.parquet \
./datasets/pos/order_detail/ --no-sign-request
# 一键运行全流程并验证(24 项数据校验)
python e2e.py --teardown
# 或者分步运行:
python setup.py # 建 schema/volume,上传数据,注册 cz-cli profile
python steps/02_load_raw.py # 加载原始数据到 Lakehouse
python steps/04_create_pos_view.py # 建 View + Table Stream
cz-cli sql -f steps/05_udf.sql --profile frostbyte --sync --write # 建 SQL UDF
python steps/06_orders_update.py # 合并订单数据
python steps/07_daily_city_metrics.py # 计算每日城市指标
export $(grep -v '#' .env | xargs) && bash steps/08_orchestrate_tasks.sh # 部署调度任务
# 清理所有对象(Studio task + SQL 对象 + Volume)
bash steps/11_teardown.sh
迁移步骤
第一步:数据加载(Stage → Volume)
这是迁移的第一个差异点,也是最直观的。
Snowflake 用 External Stage 指向 S3,通过
COPY INTOCOPY INTO
加载:
CREATE STAGE FROSTBYTE_RAW_STAGE
URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/';
COPY INTO RAW_POS.ORDER_HEADER
FROM @FROSTBYTE_RAW_STAGE/pos/order_header/
FILE_FORMAT = (FORMAT_NAME = PARQUET_FORMAT)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;
Lakehouse 用 Volume 作为托管存储,通过 ZettaPark 上传文件后用
save_as_tablesave_as_table
加载:
# 上传到 Volume(替代 S3 Stage)
session.file.put(str(local_file), "vol://frostbyte_raw_pos.frostbyte_vol/pos/order_header/",
auto_compress=False, overwrite=True)
# 从 Volume 读取并写入表(替代 COPY INTO)
df = session.read.option("compression", "snappy").parquet(
"vol://frostbyte_raw_pos.frostbyte_vol/pos/order_header/"
)
df.write.save_as_table("frostbyte_raw_pos.order_header", mode="overwrite")
⚠️ 注意 :Volume 必须建在已存在的 schema 下。
setup.pysetup.py
先通过
cz-clicz-cli
建好
frostbyte_raw_posfrostbyte_raw_pos
schema,再建 Volume,所以 Volume 可以放在
frostbyte_raw_posfrostbyte_raw_pos
而不是
publicpublic
。ZettaPark session 连接时用
publicpublic
schema(始终存在),但
session.file.put()session.file.put()
可以写入任意已存在 schema 下的 Volume。
第二步:DataFrame API(几乎不需要改)
Snowpark 和 ZettaPark 的 DataFrame API 高度兼容,只需替换 import 路径:
# Snowflake
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.window import Window
# Lakehouse(只改 import,其余代码不变)
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
from clickzetta.zettapark.window import Window
以下操作在两个平台上写法完全一致:
# JOIN 打平多张表
final = order_detail.join(order_header, "order_id") \
.join(menu, "menu_item_id") \
.join(truck, "truck_id")
# 增量合并(MERGE INTO)
target.merge(
source,
target["order_detail_id"] == source["order_detail_id"],
[F.when_matched().update(updates),
F.when_not_matched().insert(updates)]
)
# Window 函数
window = Window.partition_by("city").order_by(F.col("date").desc())
df.with_column("rank", F.row_number().over(window))
第三步:Stream on View 的处理
Snowflake 支持直接在 View 上建 Stream:
CREATE STREAM HARMONIZED.POS_FLATTENED_V_STREAM
ON VIEW HARMONIZED.POS_FLATTENED_V;
Lakehouse 的 Table Stream 只支持
ON TABLEON TABLE
,不支持
ON VIEWON VIEW
。解决方案是先把 View 物化为 Table,再在 Table 上建 Stream:
# 1. 建 View(SQL 方式,确保持久化)
session.sql("""
CREATE OR REPLACE VIEW frostbyte_harmonized.pos_flattened_v AS
SELECT od.order_detail_id, oh.order_ts, m.truck_brand_name, ...
FROM frostbyte_raw_pos.order_detail od
JOIN frostbyte_raw_pos.order_header oh ON od.order_id = oh.order_id
...
""").collect()
# 2. 物化为 Table(Stream 只能建在 Table 上)
session.sql("""
CREATE TABLE IF NOT EXISTS frostbyte_harmonized.pos_flattened_v_table
AS SELECT * FROM frostbyte_harmonized.pos_flattened_v
""").collect()
# 3. 在 Table 上建 Stream(必须指定 TABLE_STREAM_MODE)
session.sql("""
CREATE TABLE STREAM IF NOT EXISTS frostbyte_harmonized.pos_flattened_v_stream
ON TABLE frostbyte_harmonized.pos_flattened_v_table
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')
""").collect()
⚠️ 注意 :Lakehouse
CREATE TABLE STREAMCREATE TABLE STREAM
必须加
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD')
,否则报语法错误。
Stream 消费时,Lakehouse Table Stream 会在数据列之外附加元数据列(
__change_type__change_type
、
__commit_version__commit_version
等)。INSERT 时需要显式指定列名,排除这些元数据列:
# 从 stream 消费数据时,用 SQL INSERT 显式指定列名
session.sql("""
INSERT INTO frostbyte_harmonized.orders
SELECT
order_detail_id, order_id, truck_id, ...,
CURRENT_TIMESTAMP() AS meta_updated_at
FROM frostbyte_harmonized.pos_flattened_v_stream
""").collect()
第四步:存储过程 → Python 脚本
Snowflake 的 Python Stored Procedure 把业务逻辑封装在数据库内部,通过
CALLCALL
触发:
CREATE OR REPLACE PROCEDURE HARMONIZED.ORDERS_UPDATE_SP()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'main'
AS $$
def main(session):
# 业务逻辑
...
$$;
CALL HARMONIZED.ORDERS_UPDATE_SP();
Lakehouse 不支持存储过程,但逻辑完全可以改为普通 Python 脚本直接运行:
# 06_orders_update.py — 与存储过程逻辑完全相同,只是运行方式不同
from clickzetta.zettapark.session import Session
def merge_order_updates(session):
# 原存储过程里的业务逻辑,一行不需要改
...
if __name__ == "__main__":
session = create_session()
merge_order_updates(session)
第五步:Python UDF → SQL UDF
Snowflake 的 Python UDF 可以使用第三方包(如 scipy):
# Snowflake Python UDF
from scipy.constants import convert_temperature
def main(temp_f: float) -> float:
return convert_temperature(float(temp_f), 'F', 'C')
Lakehouse 的 Python UDF 需要配置 External Function 服务(云函数部署),对于简单的数学计算,直接用 SQL UDF 更简单:
-- Lakehouse SQL UDF
-- 注意:用 RETURN expr 语法(不是 AS $$ ... $$),用 DOUBLE 类型(避免 DECIMAL 精度溢出)
CREATE OR REPLACE FUNCTION frostbyte_analytics.fahrenheit_to_celsius_udf(temp_f DOUBLE)
RETURNS DOUBLE
RETURN (temp_f - 32.0) * 5.0 / 9.0;
第六步:Task 调度(TASK → cz-cli task)
Snowflake 的 Task 可以监听 Stream 数据变化自动触发:
CREATE OR REPLACE TASK ORDERS_UPDATE_TASK
WAREHOUSE = HOL_WH
WHEN SYSTEM$STREAM_HAS_DATA('POS_FLATTENED_V_STREAM')
AS CALL HARMONIZED.ORDERS_UPDATE_SP();
ALTER TASK DAILY_CITY_METRICS_UPDATE_TASK RESUME;
EXECUTE TASK ORDERS_UPDATE_TASK;
Lakehouse 用
cz-cli taskcz-cli task
创建定时调度任务,脚本内容直接内嵌在 task 里:
# 创建 task
cz-cli task create orders_update_task --type PYTHON --profile frostbyte
# 保存脚本内容(连接信息内嵌,Studio task 运行在隔离环境)
cz-cli task save-content orders_update_task \
--content "$(cat orders_script.py)" --profile frostbyte
# 设置 cron 调度(替代 SYSTEM$STREAM_HAS_DATA 触发)
cz-cli task save-cron orders_update_task \
--cron "*/5 * * * *" --profile frostbyte
# 部署并立即执行一次
cz-cli task deploy orders_update_task -y --profile frostbyte
cz-cli task execute orders_update_task --profile frostbyte
⚠️ 注意 :Studio task 运行在隔离环境,无法读取本地
.env.env
文件。连接信息需要内嵌在脚本里,或通过
export $(grep -v '#' .env | xargs)export $(grep -v '#' .env | xargs)
在 shell 中展开后传入 heredoc。
验证结果
所有代码经过
python e2e.pypython e2e.py
端到端验证,24/24 项数据校验通过:
层 表 行数 校验项 Raw order_headerorder_header
7,336,341 行数匹配 Raw order_detailorder_detail
6,230,167 行数匹配 Raw customer_loyaltycustomer_loyalty
222,540 行数匹配 Raw menu / truck / location / ...menu / truck / location / ...
100 / 450 / 13,093 / ... 行数匹配 Harmonized pos_flattened_vpos_flattened_v
378,941 行数、15 个品牌、58 种菜品 Harmonized ordersorders
378,941 行数、无空主键、top 品牌 Freezing Point Harmonized ordersorders
— 总收入 $5,547,817.75、日期范围 2021-01-01 ~ 2022-01-01 Analytics daily_city_metricsdaily_city_metrics
247 行数、top 城市 New York City UDF fahrenheit_to_celsius_udffahrenheit_to_celsius_udf
— 212°F = 100°C ✓ UDF inch_to_millimeter_udfinch_to_millimeter_udf
— 1 inch = 25.4mm ✓
迁移结论
ZettaPark 与 Snowpark 的 DataFrame API 高度兼容,本项目验证了以下结论:
完全兼容(无需修改):
需要处理的 4 处差异:
差异点 Snowflake Lakehouse 数据加载 S3 Stage + COPY INTOCOPY INTO
Volume + session.file.put()session.file.put()
+ save_as_table()save_as_table()
Stream on View CREATE STREAM ON VIEWCREATE STREAM ON VIEW
不支持;物化 View 为 Table 后建 Stream Python UDF Snowpark Python UDF(第三方包) SQL UDF(RETURN exprRETURN expr
,DOUBLEDOUBLE
类型) 存储过程 + Task CREATE PROCEDURECREATE PROCEDURE
+ CREATE TASKCREATE TASK
Python 脚本 + cz-cli taskcz-cli task
参考