PySpark → ZettaPark 迁移实战:F1 赛车数据工程项目

如果你熟悉 PySpark,迁移到 ZettaPark 的学习成本比你想象的低得多。ZettaPark 的 DataFrame API 与 PySpark 高度兼容——

select
select
filter
filter
join
join
、Window 函数、聚合函数,写法完全一致。你不需要重新学一套 API,也不需要重写业务逻辑,改动集中在 4 处环境配置。

本文用一个真实项目验证这一点:将基于 Apache Spark 技术栈(Azure Databricks + PySpark)的 F1 赛车数据工程项目完整迁移到 ClickZetta Lakehouse,经过 71 项自动化验证,全部通过。

完整代码见 GitHub:spark2lakehouse-formula1


原始项目

spark2lakehouse-formula1 fork 自 FerhattSimsekk/formula1-data-engineering,原始技术栈是 Azure Databricks + PySpark + Delta Lake。项目实现了从 API 下载、数据摄取、多表关联转换到分析层输出的完整数据工程链路,覆盖 2018–2023 赛季 125 场比赛的全量数据,包含赛道、车手、车队、比赛结果、进站记录、单圈时间、排位赛成绩共 8 张核心表,总数据量约 14 万行。

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,可与
01_spark/
01_spark/
逐文件对照。

结论先行

你不需要重写任何业务逻辑,也不需要重新培训团队。现有 PySpark 工程师可以直接上手——4 处改动全是机械替换,不涉及任何分析逻辑。

改动项工作量说明
导入路径替换极低机械替换,无逻辑改动
Session 显式创建极低Databricks 中
spark
spark
是全局注入的,ZettaPark 需要显式创建
DDL/DML 加
.collect()
.collect()
惰性执行机制,不加则不执行
文件路径格式
/mnt/...
/mnt/...
vol://schema.vol/...
vol://schema.vol/...

Window 函数、聚合、JOIN、过滤、排序——这些数据工程的核心操作,语法完全一致,不需要改。


技术栈对比

原始项目迁移后
计算引擎Apache Spark(Azure Databricks)ClickZetta Lakehouse
DataFrame APIPySpark (
pyspark.sql
pyspark.sql
)
ZettaPark (
clickzetta.zettapark
clickzetta.zettapark
)
存储格式Delta LakeLakehouse 原生表
文件存储ADLS(
/mnt/...
/mnt/...
挂载)
Volume(
vol://schema.vol/...
vol://schema.vol/...
Session 管理
spark
spark
(Databricks 全局注入)
Session.builder.configs({}).create()
Session.builder.configs({}).create()
SQL 执行
spark.sql(q)
spark.sql(q)
立即执行
session.sql(q).collect()
session.sql(q).collect()
触发执行
分区写入
df.write.partitionBy("col")
df.write.partitionBy("col")
PARTITIONED BY (col TYPE)
PARTITIONED BY (col TYPE)
DDL
临时视图
CREATE TEMP VIEW
CREATE TEMP VIEW
df.create_or_replace_temp_view()
df.create_or_replace_temp_view()
运行环境Databricks Notebook本地 Python 脚本 / Jupyter

表中的差异看起来不少,但对迁移工作量的影响很小。变化的是运行环境和 API 包名,不变的是数据处理逻辑本身。90% 的代码可以直接复用,需要修改的 4 处全部是机械性替换,不涉及业务逻辑重写。DataFrame 操作(

select
select
filter
filter
join
join
groupBy
groupBy
、Window 函数、聚合函数)语法完全一致,不需要改动。



项目背景

数据来源是 F1 赛车 API(Jolpica),覆盖 2018–2023 赛季 125 场比赛。数据架构分两层:

f1_processed
f1_processed
(摄取层)和
f1_presentation
f1_presentation
(分析层)。

行数说明
circuits7834 个国家赛道
races1252018–2023 全赛季
drivers37参赛车手
constructors15参赛车队
results2,500比赛结果
pit_stops4,294进站记录
lap_times134,957单圈时间
qualifying2,497排位赛成绩

迁移步骤

第一步:替换导入路径

这是唯一的机械替换,用编辑器全局搜索替换即可:

替换前(PySpark):

from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType

替换后(ZettaPark):

from clickzetta.zettapark import functions as F from clickzetta.zettapark.window import Window from clickzetta.zettapark.types import StructType, StructField, StringType, IntegerType

第二步:替换 Session 创建方式

Databricks 中

spark
spark
是全局注入的,不需要创建。ZettaPark 需要显式创建 Session:

替换前(Databricks,spark 全局可用):

df = spark.read.csv("/mnt/formula1dltr/raw/circuits.csv", header=True)

替换后(ZettaPark):

from clickzetta.zettapark.session import Session from dotenv import load_dotenv import os load_dotenv() session = Session.builder.configs({ "username": os.environ["CLICKZETTA_USERNAME"], "password": os.environ["CLICKZETTA_PASSWORD"], "service": os.environ["CLICKZETTA_SERVICE"], "instance": os.environ["CLICKZETTA_INSTANCE"], "workspace": os.environ["CLICKZETTA_WORKSPACE"], "schema": "f1_processed", "vcluster": os.environ.get("CLICKZETTA_VCLUSTER", "DEFAULT_AP"), }).create() df = session.read.option("header", "true").csv("vol://f1_raw.formula1_raw_vol/raw/circuits.csv")

第三步:DDL/DML 加
.collect()
.collect()

ZettaPark 的

session.sql()
session.sql()
是惰性的——返回 DataFrame 对象,不加 action 就不会真正执行。这是最容易漏掉的一处。

替换前(PySpark,立即执行):

spark.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...") spark.sql("CREATE TABLE f1_processed.circuits AS SELECT ...")

替换后(ZettaPark,必须加

.collect()
.collect()
):

session.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...").collect() session.sql("CREATE TABLE f1_processed.circuits AS SELECT ...").collect()

查询语句(SELECT)不强制要求,但如果只是触发执行而不需要结果,也加

.collect()
.collect()
是好习惯:

rows = session.sql("SELECT COUNT(*) FROM f1_processed.circuits").collect() print(rows[0][0]) # 78

第四步:替换文件路径

原始项目使用 ADLS 挂载路径,ZettaPark 使用 Volume 路径:

替换前:

"/mnt/formula1dltr/raw/circuits.csv" "/mnt/formula1dltr/processed/circuits"

替换后(写入直接用

save_as_table
save_as_table
,不需要路径):

"vol://f1_raw.formula1_raw_vol/raw/circuits.csv"

上传本地文件到 Volume:

session.file.put( "/tmp/circuits.csv", "vol://f1_raw.formula1_raw_vol/raw/circuits.csv", auto_compress=False, overwrite=True )


完全兼容的部分

以下是本项目实际用到的、不需要任何修改的 API:

df.filter(F.col("race_year") == 2023) df.select("race_id", "race_year", "circuit_ref")

JOIN(多表):

results_df.join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) .join(races_df, results_df["race_id"] == races_df["race_id"])

聚合:

df.groupBy("race_year", "driver_name") \ .agg(F.sum("points").alias("total_points"), F.count("*").alias("total_races"), F.sum(F.when(F.col("position") == 1, 1).otherwise(0)).alias("wins"))

Window 函数:

window_spec = Window.partitionBy("race_year").orderBy( F.desc("total_points"), F.desc("wins") ) df.select( "race_year", "driver_name", "total_points", "wins", F.rank().over(window_spec).alias("rank") )

MERGE INTO(标准 SQL,完全一致):

session.sql(""" MERGE INTO f1_processed.circuits tgt USING _merge_src src ON tgt.circuit_id = src.circuit_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """).collect()


注意事项

1.
withColumn
withColumn
在旧版本触发服务端 schema 解析

现象:调用

df.withColumn("rank", F.rank().over(window_spec))
df.withColumn("rank", F.rank().over(window_spec))
后,列名变成
r_f6fw_race_id
r_f6fw_race_id
这样的内部前缀格式,或者直接报错。

根因:ZettaPark 旧版本的

withColumn
withColumn
会调用
self._output
self._output
,触发对服务端的 schema 预解析请求,在某些情况下返回带内部前缀的列名。

解决方案:改用

select()
select()
+
.alias()
.alias()
,一次性完成所有列操作:

有问题的写法(旧版本):

df = df.withColumn("rank", F.rank().over(window_spec))

兼容所有版本的写法:

df = df.select( "race_year", "driver_name", "total_points", "wins", F.rank().over(window_spec).alias("rank"), )

2. 多表 JOIN 后
select()
select()
必须显式指定列来源

现象:三表 JOIN 后直接

select("race_id", "driver_name", ...)
select("race_id", "driver_name", ...)
报错,或者写入的表列名带内部前缀。

根因:多表 JOIN 后,ZettaPark 旧版本为了区分来源不同的同名列,会给列名加内部哈希前缀(如

r_f6fw_race_id
r_f6fw_race_id
)。

解决方案:JOIN 后的

select()
select()
对每一列显式指定来源 DataFrame 和
.alias()
.alias()

final_df = ( results_df .join(race_circuits_df, results_df["race_id"] == race_circuits_df["race_id"]) .join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) .join(constructors_df, results_df["constructor_id"] == constructors_df["constructor_ref"]) .select( race_circuits_df["race_id"].alias("race_id"), race_circuits_df["race_year"].alias("race_year"), race_circuits_df["circuit_location"].alias("location"), drivers_df["name"].alias("driver_name"), drivers_df["nationality"].alias("driver_nationality"), constructors_df["name"].alias("team"), results_df["grid"].alias("grid"), results_df["fastest_lap"].alias("fastest_lap"), results_df["time"].alias("race_time"), results_df["points"].alias("points"), results_df["position"].alias("position"), results_df["file_date"].alias("file_date"), ) )

3.
CREATE TEMP VIEW
CREATE TEMP VIEW
不支持

现象

session.sql("CREATE TEMP VIEW v AS SELECT ...").collect()
session.sql("CREATE TEMP VIEW v AS SELECT ...").collect()
报错:
CZLH-42000 Syntax error - missing KW_ENDPOINT at 'VIEW'
CZLH-42000 Syntax error - missing KW_ENDPOINT at 'VIEW'

根因:ClickZetta Lakehouse SQL 不支持

CREATE TEMP VIEW
CREATE TEMP VIEW
语法。

解决方案:改用 DataFrame 的

create_or_replace_temp_view()
create_or_replace_temp_view()
,行为与 PySpark 的
createOrReplaceTempView
createOrReplaceTempView
完全一致:

不支持的写法:

session.sql("CREATE TEMP VIEW race_result_updated AS SELECT ...").collect()

正确写法:

df.create_or_replace_temp_view("race_result_updated") session.sql("SELECT * FROM race_result_updated WHERE ...").collect()

4. MERGE INTO VALUES 子句列名歧义

现象:MERGE INTO 的

WHEN NOT MATCHED THEN INSERT ... VALUES (...)
WHEN NOT MATCHED THEN INSERT ... VALUES (...)
中,VALUES 里的裸列名被解析为目标表(
tgt
tgt
)的列,导致插入空值。

根因:Lakehouse SQL 中 VALUES 子句的裸列名解析为

tgt
tgt
,而不是
src
src

解决方案:VALUES 里的列名必须加

src.
src.
前缀:

session.sql(""" MERGE INTO f1_presentation.calculated_race_results tgt USING race_result_updated src ON (tgt.driver_id = src.driver_id AND tgt.race_id = src.race_id) WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT (race_year, team_name, driver_id, driver_name, race_id, position, points, calculated_points, created_date) VALUES (src.race_year, src.team_name, src.driver_id, src.driver_name, src.race_id, src.position, src.points, src.calculated_points, src.created_date) """).collect()


增量写入封装:
merge_delta_data
merge_delta_data

原始 PySpark 项目用

overwrite_partition()
overwrite_partition()
实现增量写入。ZettaPark 对应的封装是
merge_delta_data()
merge_delta_data()
,基于 MERGE INTO 实现 upsert 语义:

def merge_delta_data(input_df, db_name, table_name, merge_condition, partition_column): session = input_df.session full_name = f"{db_name}.{table_name}" 检查表是否存在 table_exists = False try: session.sql(f"DESCRIBE TABLE {full_name}").collect() table_exists = True except Exception: pass # 表不存在,走建表逻辑 if table_exists: input_df.create_or_replace_temp_view("_merge_src") session.sql(f""" MERGE INTO {full_name} tgt USING _merge_src src ON {merge_condition} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """).collect() else: _create_partitioned_table(session, full_name, input_df, partition_column)

这个封装的好处:同一段代码第一次运行建表,后续运行做增量更新,不需要在调用方判断表是否存在。

分区表的建表逻辑需要手写 DDL,因为 ZettaPark 的

save_as_table
save_as_table
不支持
partition_by
partition_by
参数:

def _create_partitioned_table(session, full_name, input_df, partition_column): fields = input_df.schema.fields def sql_type(dt): name = type(dt).__name__ mapping = { "LongType": "BIGINT", "IntegerType": "INT", "ShortType": "SMALLINT", "DoubleType": "DOUBLE", "FloatType": "FLOAT", "StringType": "STRING", "BooleanType": "BOOLEAN", "DateType": "DATE", "TimestampType": "TIMESTAMP", } if name == "DecimalType": return f"DECIMAL({dt.precision},{dt.scale})" return mapping.get(name, "STRING") col_defs = ", ".join( f"{f.name.strip('`')} {sql_type(f.datatype)}" for f in fields if f.name.strip('`') != partition_column ) part_field = next(f for f in fields if f.name.strip('`') == partition_column) part_def = f"{part_field.name.strip('`')} {sql_type(part_field.datatype)}" session.sql( f"CREATE TABLE {full_name} ({col_defs}) PARTITIONED BY ({part_def})" ).collect() 必须显式列名,不能用 SELECT * 原因:create_or_replace_temp_view 会将列名规范化(race_id → raceid), 显式列名按位置匹配,绕过这个问题 all_cols = ", ".join(f.name.strip('`') for f in fields) input_df.create_or_replace_temp_view("_insert_src") session.sql( f"INSERT INTO {full_name} ({all_cols}) SELECT {all_cols} FROM _insert_src" ).collect()


数据源兼容性问题

原始项目使用 Ergast API(已停服),本项目改用 Jolpica API。以下几个问题与 ZettaPark 无关,但迁移时必须处理。

主键类型变了

原始数据用整数 ID,Jolpica API 用字符串 ref(如

"hamilton"
"hamilton"
"mercedes"
"mercedes"
)。JOIN 条件需要对应修改:

原始写法(整数 ID):

results_df.join(drivers_df, results_df["driverId"] == drivers_df["driverId"])

Jolpica 数据写法(字符串 ref):

results_df.join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) results_df.join(constructors_df, results_df["constructor_id"] == constructors_df["constructor_ref"])

全局端点只返回 100 条

Jolpica 的

/drivers.json?limit=1000
/drivers.json?limit=1000
实际只返回 100 条(按字母序),导致 Hamilton、Vettel、Mercedes、Red Bull 等 2018–2023 的主要参赛者全部缺失。

解决方案:改用按赛季端点,逐赛季下载后合并去重:

SEASONS = list(range(2018, 2024)) seen_drivers = {} for season in SEASONS: url = f"https://api.jolpi.ca/ergast/f1/{season}/drivers.json?limit=100" data = fetch_json(url) for d in data["MRData"]["DriverTable"]["Drivers"]: ref = d["driverId"] if ref not in seen_drivers: seen_drivers[ref] = { "driverId": len(seen_drivers) + 1, "driverRef": ref, "forename": d["givenName"], "surname": d["familyName"], "nationality": d.get("nationality", ""), ... }

这个问题的症状是:

f1_presentation.race_results
f1_presentation.race_results
只有 66 行(应该有 2500 行),因为 JOIN 时大部分 driver_id 找不到对应的 driver_ref。


端到端验证

迁移完成后,用 71 项自动化检查验证数据正确性:

cd 03_lakehouse python ../04_validate.py

验证覆盖 5 个维度:

  1. 行数检查:12 张表全部有数据,行数符合预期
  2. NULL 检查:关键列(主键、外键、业务字段)无空值
  3. 重复记录检测:同一场比赛同一车手不重复,driver_ref 不重复
  4. 业务规则:position 在 1–20 之间,points 非负,calculated_points = 11 - position,rank 最小值为 1
  5. 跨层一致性
    driver_standings.total_points
    driver_standings.total_points
    等于
    race_results
    race_results
    按年聚合的结果

实际运行结果:71/71 全部通过


完整兼容性对照

APIPySparkZettaPark状态
df.select()
df.select()
完全兼容
df.filter()
df.filter()
/
df.where()
df.where()
完全兼容
df.join(other, cond, how)
df.join(other, cond, how)
完全兼容
df.groupBy().agg()
df.groupBy().agg()
完全兼容
df.sort()
df.sort()
/
df.orderBy()
df.orderBy()
完全兼容
df.limit(n)
df.limit(n)
/
df.count()
df.count()
完全兼容
df.show()
df.show()
/
df.collect()
df.collect()
完全兼容
df.dropDuplicates(keys)
df.dropDuplicates(keys)
完全兼容
df.union(other)
df.union(other)
/
df.distinct()
df.distinct()
完全兼容
F.col()
F.col()
/
F.lit()
F.lit()
/
F.when().otherwise()
F.when().otherwise()
完全兼容
F.sum()
F.sum()
/
F.count()
F.count()
/
F.avg()
F.avg()
/
F.max()
F.max()
/
F.min()
F.min()
完全兼容
F.rank()
F.rank()
/
F.dense_rank()
F.dense_rank()
/
F.row_number()
F.row_number()
完全兼容
F.current_timestamp()
F.current_timestamp()
/
F.current_date()
F.current_date()
完全兼容
F.concat()
F.concat()
/
F.trim()
F.trim()
/
F.upper()
F.upper()
/
F.lower()
F.lower()
完全兼容
F.isNull()
F.isNull()
/
F.isNotNull()
F.isNotNull()
完全兼容
Window.partitionBy().orderBy()
Window.partitionBy().orderBy()
完全兼容
MERGE INTO(SQL)完全兼容
df.withColumn()
df.withColumn()
✅ 0.1.5 已修复旧版本改用
select().alias()
select().alias()
session.sql()
session.sql()
DDL/DML
立即执行惰性,需加
.collect()
.collect()
⚠️ 必须修改
df.write.partitionBy()
df.write.partitionBy()
❌ 不支持改用
PARTITIONED BY
PARTITIONED BY
DDL
CREATE TEMP VIEW
CREATE TEMP VIEW
(SQL)
❌ 不支持改用
create_or_replace_temp_view()
create_or_replace_temp_view()
文件路径
/mnt/...
/mnt/...
vol://schema.vol/...
vol://schema.vol/...
⚠️ 格式不同
导入路径
pyspark.sql
pyspark.sql
clickzetta.zettapark
clickzetta.zettapark
⚠️ 机械替换

迁移结论

ZettaPark 与 PySpark 的 DataFrame API 高度兼容,迁移的核心工作量集中在环境适配,而非业务逻辑重写。

完全兼容(无需修改):

  • DataFrame 操作:
    select
    select
    filter
    filter
    join
    join
    groupBy
    groupBy
    agg
    agg
    sort
    sort
    limit
    limit
    union
    union
    distinct
    distinct
  • 函数库:
    F.col()
    F.col()
    F.when().otherwise()
    F.when().otherwise()
    F.sum/count/avg/max/min
    F.sum/count/avg/max/min
    F.rank/dense_rank/row_number
    F.rank/dense_rank/row_number
    F.concat/trim/upper/lower
    F.concat/trim/upper/lower
    F.current_timestamp()
    F.current_timestamp()
  • Window 函数:
    Window.partitionBy().orderBy()
    Window.partitionBy().orderBy()
    语法完全一致
  • SQL 语法:MERGE INTO、UPDATE、DELETE、INSERT、CTE 完全兼容

必须修改的 4 处:

改动原因工作量
导入路径
pyspark.sql
pyspark.sql
clickzetta.zettapark
clickzetta.zettapark
包名不同极低,全局替换
显式创建 SessionDatabricks 全局注入,ZettaPark 需显式创建极低,一次性
DDL/DML 加
.collect()
.collect()
ZettaPark 惰性执行,不加不触发低,逐一检查
文件路径
/mnt/...
/mnt/...
vol://...
vol://...
存储系统不同低,格式替换

本项目 71/71 验证通过,证明迁移后数据完整性与原 Databricks 版本完全一致。


参考

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询