表中的差异看起来不少,但对迁移工作量的影响很小。变化的是运行环境和 API 包名,不变的是数据处理逻辑本身。90% 的代码可以直接复用,需要修改的 4 处全部是机械性替换,不涉及业务逻辑重写。DataFrame 操作(
select
select
、
filter
filter
、
join
join
、
groupBy
groupBy
、Window 函数、聚合函数)语法完全一致,不需要改动。
预览
项目背景
数据来源是 F1 赛车 API(Jolpica),覆盖 2018–2023 赛季 125 场比赛。数据架构分两层:
f1_processed
f1_processed
(摄取层)和
f1_presentation
f1_presentation
(分析层)。
表
行数
说明
circuits
78
34 个国家赛道
races
125
2018–2023 全赛季
drivers
37
参赛车手
constructors
15
参赛车队
results
2,500
比赛结果
pit_stops
4,294
进站记录
lap_times
134,957
单圈时间
qualifying
2,497
排位赛成绩
迁移步骤
第一步:替换导入路径
这是唯一的机械替换,用编辑器全局搜索替换即可:
替换前(PySpark):
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
替换后(ZettaPark):
from clickzetta.zettapark import functions as F
from clickzetta.zettapark.window import Window
from clickzetta.zettapark.types import StructType, StructField, StringType, IntegerType
spark.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...")
spark.sql("CREATE TABLE f1_processed.circuits AS SELECT ...")
替换后(ZettaPark,必须加
.collect()
.collect()
):
session.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...").collect()
session.sql("CREATE TABLE f1_processed.circuits AS SELECT ...").collect()
查询语句(SELECT)不强制要求,但如果只是触发执行而不需要结果,也加
.collect()
.collect()
是好习惯:
rows = session.sql("SELECT COUNT(*) FROM f1_processed.circuits").collect()
print(rows[0][0]) # 78
session.sql("""
MERGE INTO f1_processed.circuits tgt
USING _merge_src src
ON tgt.circuit_id = src.circuit_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""").collect()
session.sql("CREATE TEMP VIEW v AS SELECT ...").collect()
session.sql("CREATE TEMP VIEW v AS SELECT ...").collect()
报错:
CZLH-42000 Syntax error - missing KW_ENDPOINT at 'VIEW'
CZLH-42000 Syntax error - missing KW_ENDPOINT at 'VIEW'
。
根因:ClickZetta Lakehouse SQL 不支持
CREATE TEMP VIEW
CREATE TEMP VIEW
语法。
解决方案:改用 DataFrame 的
create_or_replace_temp_view()
create_or_replace_temp_view()
,行为与 PySpark 的
createOrReplaceTempView
createOrReplaceTempView
完全一致:
不支持的写法:
session.sql("CREATE TEMP VIEW race_result_updated AS SELECT ...").collect()
正确写法:
df.create_or_replace_temp_view("race_result_updated")
session.sql("SELECT * FROM race_result_updated WHERE ...").collect()
4. MERGE INTO VALUES 子句列名歧义
现象:MERGE INTO 的
WHEN NOT MATCHED THEN INSERT ... VALUES (...)
WHEN NOT MATCHED THEN INSERT ... VALUES (...)
中,VALUES 里的裸列名被解析为目标表(
tgt
tgt
)的列,导致插入空值。
根因:Lakehouse SQL 中 VALUES 子句的裸列名解析为
tgt
tgt
,而不是
src
src
。
解决方案:VALUES 里的列名必须加
src.
src.
前缀:
session.sql("""
MERGE INTO f1_presentation.calculated_race_results tgt
USING race_result_updated src
ON (tgt.driver_id = src.driver_id AND tgt.race_id = src.race_id)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT
(race_year, team_name, driver_id, driver_name, race_id, position, points, calculated_points, created_date)
VALUES
(src.race_year, src.team_name, src.driver_id, src.driver_name,
src.race_id, src.position, src.points, src.calculated_points, src.created_date)
""").collect()
增量写入封装:
merge_delta_data
merge_delta_data
原始 PySpark 项目用
overwrite_partition()
overwrite_partition()
实现增量写入。ZettaPark 对应的封装是
merge_delta_data()
merge_delta_data()
,基于 MERGE INTO 实现 upsert 语义:
def merge_delta_data(input_df, db_name, table_name, merge_condition, partition_column):
session = input_df.session
full_name = f"{db_name}.{table_name}"
检查表是否存在
table_exists = False
try:
session.sql(f"DESCRIBE TABLE {full_name}").collect()
table_exists = True
except Exception:
pass # 表不存在,走建表逻辑
if table_exists:
input_df.create_or_replace_temp_view("_merge_src")
session.sql(f"""
MERGE INTO {full_name} tgt
USING _merge_src src
ON {merge_condition}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""").collect()
else:
_create_partitioned_table(session, full_name, input_df, partition_column)
这个封装的好处:同一段代码第一次运行建表,后续运行做增量更新,不需要在调用方判断表是否存在。
分区表的建表逻辑需要手写 DDL,因为 ZettaPark 的
save_as_table
save_as_table
不支持
partition_by
partition_by
参数:
def _create_partitioned_table(session, full_name, input_df, partition_column):
fields = input_df.schema.fields
def sql_type(dt):
name = type(dt).__name__
mapping = {
"LongType": "BIGINT", "IntegerType": "INT", "ShortType": "SMALLINT",
"DoubleType": "DOUBLE", "FloatType": "FLOAT",
"StringType": "STRING", "BooleanType": "BOOLEAN",
"DateType": "DATE", "TimestampType": "TIMESTAMP",
}
if name == "DecimalType":
return f"DECIMAL({dt.precision},{dt.scale})"
return mapping.get(name, "STRING")
col_defs = ", ".join(
f"{f.name.strip('`')} {sql_type(f.datatype)}"
for f in fields if f.name.strip('`') != partition_column
)
part_field = next(f for f in fields if f.name.strip('`') == partition_column)
part_def = f"{part_field.name.strip('`')} {sql_type(part_field.datatype)}"
session.sql(
f"CREATE TABLE {full_name} ({col_defs}) PARTITIONED BY ({part_def})"
).collect()
必须显式列名,不能用 SELECT *
原因:create_or_replace_temp_view 会将列名规范化(race_id → raceid),
显式列名按位置匹配,绕过这个问题
all_cols = ", ".join(f.name.strip('`') for f in fields)
input_df.create_or_replace_temp_view("_insert_src")
session.sql(
f"INSERT INTO {full_name} ({all_cols}) SELECT {all_cols} FROM _insert_src"
).collect()
SEASONS = list(range(2018, 2024))
seen_drivers = {}
for season in SEASONS:
url = f"https://api.jolpi.ca/ergast/f1/{season}/drivers.json?limit=100"
data = fetch_json(url)
for d in data["MRData"]["DriverTable"]["Drivers"]:
ref = d["driverId"]
if ref not in seen_drivers:
seen_drivers[ref] = {
"driverId": len(seen_drivers) + 1,
"driverRef": ref,
"forename": d["givenName"],
"surname": d["familyName"],
"nationality": d.get("nationality", ""),
...
}