在 ClickZetta Lakehouse 上从零构建 Medallion 三层数仓
如果你在 Databricks 上做过 Medallion 架构,切换到 ClickZetta Lakehouse 的成本主要在环境配置,而不在建模逻辑和代码改写。Bronze 摄取、Silver 清洗、Gold 维度建模——这套思路和代码在 ZettaPark 上可以直接复用。
本文用一个真实迁移项目演示完整的三层建模过程:将基于 Apache Spark 技术栈(Databricks + PySpark)的 Medallion 架构示例迁移到 ClickZetta Lakehouse,经过 22 项自动化验证,20/22 通过(2 项警告来自源数据质量问题,非迁移引入)。
完整代码见 GitHub:spark2lakehouse-medallion
原始项目
spark2lakehouse-medallion 基于 DataWithBaraa/databricks_bootcamp_2026 改造,演示如何在 Databricks 上用 Medallion 架构整合 CRM 和 ERP 两套系统的数据,最终输出可供 BI 分析的维度模型。项目包含 6 张源表(CRM 3 张 + ERP 3 张),经过三层处理后输出 2 张维度表(
dim_customers
dim_customers
、
dim_products
dim_products
)和 1 张事实表(
fact_sales
fact_sales
)。
迁移后的代码在
03_lakehouse/
03_lakehouse/
目录,原始 Databricks Notebook 保留在
01_spark/
01_spark/
供对照。
结论先行
业务逻辑一行不需要改。 4 处改动全是机械替换,不涉及任何建模逻辑:导入路径、Session 创建、方法命名(
withColumn
withColumn
→
with_column
with_column
)、写入方式。Bronze/Silver/Gold 三层的处理逻辑、维度建模、Window 函数、多源 JOIN——全部直接复用。
| 改动项 | 工作量 | 说明 |
|---|
| 导入路径 | 极低 | pyspark.sql
pyspark.sql → clickzetta.zettapark
clickzetta.zettapark |
| Session 创建 | 极低 | spark
spark (全局注入)→ Session.builder.configs({}).create()
Session.builder.configs({}).create() |
| 方法命名 | 极低 | withColumn
withColumn → with_column
with_column ,withColumnRenamed
withColumnRenamed → with_column_renamed
with_column_renamed |
| 写入方式 | 极低 | df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t) → df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite") |
如果原始项目不依赖 Databricks 全局注入的
spark
spark
,兼容性接近 100%。
技术栈对比
| 原始项目 | 迁移后 |
|---|
| 计算引擎 | Apache Spark(Databricks) | ClickZetta Lakehouse |
| DataFrame API | PySpark (pyspark.sql
pyspark.sql ) | ZettaPark (clickzetta.zettapark
clickzetta.zettapark ) |
| 开发环境 | Databricks Notebook | Jupyter Notebook(本地) |
| 存储格式 | Delta Lake | Lakehouse 原生表 |
| 文件存储 | DBFS / ADLS | Volume(vol://schema.vol/...
vol://schema.vol/... ) |
| Session 管理 | spark
spark (全局注入) | Session.builder.configs({}).create()
Session.builder.configs({}).create() |
| 方法命名 | withColumn
withColumn / withColumnRenamed
withColumnRenamed | with_column
with_column / with_column_renamed
with_column_renamed |
| 写入方式 | df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t) | df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite") |
| 数据层隔离 | Delta 数据库 | 独立 Schema(bronze / silver / gold) |
变化的主要是运行环境——从 Databricks Notebook 换成本地 Jupyter,从 DBFS 换成 Volume。数据处理和建模的核心逻辑完全不变:清洗、去重、多源 JOIN、Window 函数、维度建模,这些在 ZettaPark 上写法与 PySpark 一致。业务逻辑(
F.when().otherwise()
F.when().otherwise()
、
Window.partition_by().order_by()
Window.partition_by().order_by()
、
df.join()
df.join()
、
df.filter()
df.filter()
)完全一致,一行不需要改。
你不需要重新设计数据架构,也不需要重写 Bronze/Silver/Gold 的处理逻辑。4 处改动全是机械替换(导入路径、Session 创建、方法命名、写入方式),不涉及任何业务逻辑。如果原始项目不依赖 Databricks 全局注入的
spark
spark
(即用本地 PySpark 运行),兼容性可以接近 100%。使用了 Python UDF 或
df.write.partitionBy()
df.write.partitionBy()
的项目,兼容性会相应降低。

架构概览
Medallion 架构将数据处理分为三层,每层有明确的职责边界:
数据源(CSV)
↓ session.read.csv("vol://...")
Bronze 层(schema: bronze)
原始数据,不做任何转换,只做格式统一
↓ with_column / filter / drop_duplicates
Silver 层(schema: silver)
清洗、去重、标准化,多源数据整合
↓ join + row_number().over(window)
Gold 层(schema: gold)
维度建模,dim/fact 表,直接面向分析
三层对应三个独立的 Schema,物理隔离,互不干扰。每层只读上一层的数据,不跨层访问。
Medallion 数据模型
Medallion 架构(也称多跳架构)是数据湖仓一体场景下的主流数据组织模式,由 Databricks 推广,现已成为行业标准。
传统数据仓库把原始数据和加工后的数据混在一起,出了问题很难追溯,也很难重跑。Medallion 架构的核心思想是分层隔离:原始数据永远保留在 Bronze 层,每一层只做一件事,出问题可以从上一层重跑,不影响其他层。
这个模式特别适合多源数据整合的场景——不同系统的数据质量参差不齐,Bronze 层先把数据"存下来",Silver 层再做清洗和标准化,Gold 层专注于面向业务的建模,职责清晰,互不干扰。
三层职责
| 层 | 别名 | 职责 | 数据状态 |
|---|
| Bronze | 原始层 | 原样摄取,不做任何转换 | 原始数据,可能含脏数据、重复、格式不一 |
| Silver | 清洗层 | 去重、清洗、标准化、多源整合 | 干净、可信、业务语义化 |
| Gold | 服务层 | 维度建模,面向分析和 BI | 聚合宽表、星型模型 |
三层物理隔离的好处:任何一层出问题,可以从上一层重跑,不影响其他层,也不需要重新下载原始数据。
星型模型
Gold 层通常采用星型模型(Star Schema):一张事实表居中,多张维度表围绕。
dim_customers
│
dim_products ── fact_sales
事实表(Fact Table)存储业务事件(订单、交易),每行是一个事件,包含度量值(金额、数量)和指向维度表的外键。维度表(Dimension Table)存储业务实体的属性(客户信息、产品信息),通过外键与事实表关联。
本项目的 Gold 层就是这个结构:
fact_sales
fact_sales
通过
customer_key
customer_key
和
product_key
product_key
关联
dim_customers
dim_customers
和
dim_products
dim_products
。
为什么需要代理键
维度表的主键有两种选择:业务主键(Natural Key,来自源系统,如
customer_id = "C001"
customer_id = "C001"
)和代理键(Surrogate Key,数仓内部生成的整数,如
customer_key = 1
customer_key = 1
)。
使用代理键的原因:
- 性能:整数 JOIN 比字符串快
- 稳定性:源系统 ID 可能随业务变化(合并、重编号),代理键不受影响
- 跨源整合:多个源系统的同一实体可以映射到同一个代理键
本项目用
F.row_number().over(Window.order_by("customer_id"))
F.row_number().over(Window.order_by("customer_id"))
生成代理键,简单有效。
环境初始化
在跑任何 notebook 之前,先建好 Schema 和 Volume,再把 CSV 文件上传上去。这一步只需要跑一次。
init_lakehouse.ipynb:
from clickzetta.zettapark.session import Session
import os
from dotenv import load_dotenv
load_dotenv()
session = Session.builder.configs({
"username": os.environ["CLICKZETTA_USERNAME"],
"password": os.environ["CLICKZETTA_PASSWORD"],
"service": os.environ["CLICKZETTA_SERVICE"],
"instance": os.environ["CLICKZETTA_INSTANCE"],
"workspace": os.environ["CLICKZETTA_WORKSPACE"],
"schema": os.environ["CLICKZETTA_SCHEMA"],
"vcluster": os.environ["CLICKZETTA_VCLUSTER"],
}).create()
建三层 Schema:
for schema in ["bronze", "silver", "gold"]:
session.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}").collect()
建 Volume(存放原始 CSV):
vol_schema = os.environ["CLICKZETTA_SCHEMA"]
vol_name = os.environ.get("CLICKZETTA_VOLUME", "medallion_vol")
session.sql(f"CREATE VOLUME IF NOT EXISTS {vol_schema}.{vol_name}").collect()
上传本地 CSV 到 Volume:
import pathlib
datasets_dir = pathlib.Path("../datasets")
vol_base = f"vol://{vol_schema}.{vol_name}"
for csv_file in datasets_dir.rglob("*.csv"):
relative = csv_file.relative_to(datasets_dir)
dest = f"{vol_base}/{relative}"
session.file.put(str(csv_file), dest, auto_compress=False, overwrite=True)
print(f" {relative} → {dest}")
.env
.env
配置:
CLICKZETTA_USERNAME=your_username
CLICKZETTA_PASSWORD=your_password
CLICKZETTA_SERVICE=cn-shanghai-alicloud.api.clickzetta.com
CLICKZETTA_INSTANCE=your_instance_id
CLICKZETTA_WORKSPACE=your_workspace
CLICKZETTA_SCHEMA=public
CLICKZETTA_VCLUSTER=DEFAULT_AP
CLICKZETTA_VOLUME=medallion_vol
Bronze 层:原始数据摄取
Bronze 层的原则是不做任何业务转换,只把 CSV 读进来写成表。这样做的好处是:如果下游出了问题,可以从 Bronze 层重跑,不需要重新下载原始数据。
03_lakehouse/01_bronze/bronze.ipynb:
VOL_SCHEMA = os.environ.get("CLICKZETTA_SCHEMA", "public")
VOLUME = os.environ.get("CLICKZETTA_VOLUME", "medallion_vol")
INGESTION_CONFIG = [
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/cust_info.csv", "table": "bronze.crm_cust_info"},
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/prd_info.csv", "table": "bronze.crm_prd_info"},
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/sales_details.csv", "table": "bronze.crm_sales_details"},
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/CUST_AZ12.csv", "table": "bronze.erp_cust_az12"},
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/LOC_A101.csv", "table": "bronze.erp_loc_a101"},
{"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/PX_CAT_G1V2.csv", "table": "bronze.erp_px_cat_g1v2"},
]
for item in INGESTION_CONFIG:
print(f"Ingesting → {item['table']}")
df = session.read.option("header", "true").csv(item["path"])
df.write.save_as_table(item["table"], mode="overwrite")
print(" OK")
6 张表,6 行配置,统一处理。
mode="overwrite"
mode="overwrite"
保证幂等性——重跑不会产生重复数据。
Silver 层:清洗与标准化
Silver 层做三件事:去空值、标准化枚举值、重命名列。每个源表对应一个 notebook,逻辑独立,互不依赖。
客户信息清洗(crm_cust_info → silver.crm_customers)
03_lakehouse/02_silver/crm/silver_crm_cust_info.ipynb:
from clickzetta.zettapark.types import StringType
from clickzetta.zettapark import functions as F
df = session.table("bronze.crm_cust_info")
- 去掉所有字符串列的首尾空格:
for field in df.schema.fields:
if isinstance(field.datatype, StringType):
df = df.with_column(field.name, F.trim(F.col(field.name)))
- 标准化枚举值:
df = (
df
.with_column(
"cst_marital_status",
F.when(F.upper(F.col("cst_marital_status")) == "S", "Single")
.when(F.upper(F.col("cst_marital_status")) == "M", "Married")
.otherwise("n/a")
)
.with_column(
"cst_gndr",
F.when(F.upper(F.col("cst_gndr")) == "F", "Female")
.when(F.upper(F.col("cst_gndr")) == "M", "Male")
.otherwise("n/a")
)
)
- 过滤掉主键为空的行:
df = df.filter(F.col("cst_id").is_not_null())
- 重命名列(业务语义化):
RENAME_MAP = {
"cst_id": "customer_id",
"cst_key": "customer_number",
"cst_firstname": "first_name",
"cst_lastname": "last_name",
"cst_marital_status": "marital_status",
"cst_gndr": "gender",
"cst_create_date": "created_date",
}
for old, new in RENAME_MAP.items():
df = df.with_column_renamed(old, new)
df.write.save_as_table("silver.crm_customers", mode="overwrite")
注意
with_column_renamed
with_column_renamed
是 ZettaPark 的命名风格(snake_case),对应 PySpark 的
withColumnRenamed
withColumnRenamed
,行为完全一致。
去重:按优先级保留最新记录
当源数据存在重复 ID 时,不能简单
drop_duplicates
drop_duplicates
——需要按业务规则决定保留哪条。这里用
row_number()
row_number()
按创建时间降序排,只保留最新的一条:
03_lakehouse/02_silver/crm/silver_crm_prd_info.ipynb:
from clickzetta.zettapark.window import Window
按 prd_id 分组,取 prd_start_dt 最新的一条:
window = Window.partition_by("prd_id").order_by(F.col("prd_start_dt").desc())
df = (
df
.with_column("_row_num", F.row_number().over(window))
.filter(F.col("_row_num") == 1)
.select([c for c in df.columns]) # 去掉辅助列
)
这个模式在多源数据整合时很常见:同一个产品在 CRM 和 ERP 里都有记录,以某个系统为主,另一个作为补充。
Silver 层编排
6 个 notebook 按依赖顺序运行:
03_lakehouse/02_silver/silver_orchestration.ipynb:
import subprocess, sys
NOTEBOOKS = [
"crm/silver_crm_cust_info.ipynb",
"crm/silver_crm_prd_info.ipynb",
"crm/silver_crm_sales_details.ipynb",
"erp/silver_erp_cust_az12.ipynb",
"erp/silver_erp_loc_a101.ipynb",
"erp/silver_erp_px_cat_g1v2.ipynb",
]
for nb in NOTEBOOKS:
print(f"Running {nb}...")
result = subprocess.run(
["jupyter", "nbconvert", "--to", "notebook", "--execute", nb,
"--output", nb, "--ExecutePreprocessor.timeout=300"],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"FAILED: {nb}")
print(result.stderr)
sys.exit(1)
print(f" OK")
Gold 层:维度建模
Gold 层的核心工作是多源整合和生成 surrogate key。
客户维度表(dim_customers)
CRM 的客户信息是主表,ERP 的客户信息和地区信息作为补充,用 LEFT JOIN 整合:
03_lakehouse/03_gold/gold_dim_customers.ipynb:
from clickzetta.zettapark.window import Window
ci = session.table("silver.crm_customers") # CRM 客户(主)
ca = session.table("silver.erp_customers") # ERP 客户(补充 gender、birthdate)
la = session.table("silver.erp_customer_location") # ERP 地区(补充 country)
joined = (
ci.join(ca, ci["customer_number"] == ca["customer_number"], "left")
.join(la, ci["customer_number"] == la["customer_number"], "left")
)
多表 JOIN 后必须显式指定每列的来源,否则同名列会产生歧义:
df = joined.select(
ci["customer_id"].alias("customer_id"),
ci["customer_number"].alias("customer_number"),
ci["first_name"].alias("first_name"),
ci["last_name"].alias("last_name"),
la["country"].alias("country"),
ci["marital_status"].alias("marital_status"),
F.when(ci["gender"] != "n/a", ci["gender"]) # gender:CRM为主,n/a 时用 ERP
.otherwise(F.coalesce(ca["gender"], F.lit("n/a"))).alias("gender"),
ca["birth_date"].alias("birthdate"),
ci["created_date"].alias("create_date"),
)
生成 surrogate key(全局唯一的整数主键):
w = Window.order_by(F.col("customer_id"))
df = df.with_column("customer_key", F.row_number().over(w))
调整列顺序,surrogate key 放第一列:
df = df.select(
"customer_key", "customer_id", "customer_number",
"first_name", "last_name", "country",
"marital_status", "gender", "birthdate", "create_date",
)
df.write.save_as_table("gold.dim_customers", mode="overwrite")
为什么需要 surrogate key?
customer_id
customer_id
是业务主键,来自源系统,可能在不同系统间不一致,也可能随业务变化。
customer_key
customer_key
是数仓内部的整数主键,稳定、唯一,事实表用它做外键关联,不依赖源系统的 ID 格式。
销售事实表(fact_sales)
事实表通过 surrogate key 关联维度表,不存储冗余的业务属性:
03_lakehouse/03_gold/gold_fact_sales.ipynb:
sd = session.table("silver.crm_sales")
dp = session.table("gold.dim_products")
dc = session.table("gold.dim_customers")
用 surrogate key 关联维度表:
joined = (
sd.join(dp, sd["sls_prd_key"] == dp["product_number"], "left")
.join(dc, sd["sls_cust_id"] == dc["customer_id"], "left")
)
df = joined.select(
sd["sls_ord_num"].alias("order_number"),
dp["product_key"].alias("product_key"), # surrogate key
dc["customer_key"].alias("customer_key"), # surrogate key
sd["sls_order_dt"].alias("order_date"),
sd["sls_ship_dt"].alias("ship_date"),
sd["sls_due_dt"].alias("due_date"),
sd["sls_sales"].alias("sales_amount"),
sd["sls_quantity"].alias("quantity"),
sd["sls_price"].alias("unit_price"),
)
df.write.save_as_table("gold.fact_sales", mode="overwrite")
数据质量验证
管道跑完之后,用
04_validate.ipynb
04_validate.ipynb
做 22 项自动化检查。验证逻辑直接用 ZettaPark 查询,不依赖外部框架:
04_validate.ipynb(节选):
def check(label, condition_sql, expect_zero=True):
"""执行一条 SQL,检查结果是否符合预期"""
result = session.sql(condition_sql).collect()[0][0]
passed = (result == 0) if expect_zero else (result > 0)
status = "✓" if passed else "✗ FAIL"
print(f" {status} {label}: {result}")
return passed
Bronze → Silver 行数一致性:
check(
"silver.crm_customers 不少于 bronze.crm_cust_info(过滤空值后)",
"""
SELECT ABS(
(SELECT COUNT(*) FROM silver.crm_customers) -
(SELECT COUNT(*) FROM bronze.crm_cust_info WHERE cst_id IS NOT NULL)
)
""",
expect_zero=True
)
关键列无空值:
check(
"dim_customers.customer_key 无空值",
"SELECT COUNT(*) FROM gold.dim_customers WHERE customer_key IS NULL",
expect_zero=True
)
surrogate key 唯一性:
check(
"dim_customers.customer_key 无重复",
"""
SELECT COUNT(*) FROM (
SELECT customer_key, COUNT(*) AS cnt
FROM gold.dim_customers
GROUP BY customer_key
HAVING cnt > 1
)
""",
expect_zero=True
)
外键完整性:
check(
"fact_sales 所有 customer_key 均能关联到 dim_customers",
"""
SELECT COUNT(*) FROM gold.fact_sales f
LEFT JOIN gold.dim_customers d ON f.customer_key = d.customer_key
WHERE d.customer_key IS NULL
""",
expect_zero=True
)
实际运行结果:20/22 通过。2 项警告:
bronze.crm_cust_info
bronze.crm_cust_info
存在 5 组重复 customer_id
customer_id
(源数据质量问题)
bronze.crm_sales_details
bronze.crm_sales_details
存在 3 条负销售额(退货/冲销单,业务正常现象)
这两项不是迁移引入的问题,在原始 Databricks 项目中同样存在。
关键设计决策
为什么用 mode="overwrite"
mode="overwrite"
而不是 MERGE INTO?
Medallion 项目的数据量小(几千行),全量覆盖写入比增量 MERGE 简单得多,也更容易保证幂等性。如果数据量大(百万行以上),或者需要保留历史版本,才值得引入 MERGE INTO 的复杂度。
对比:
| 场景 | 推荐写法 | 原因 |
|---|
| 小数据量,全量刷新 | save_as_table(mode="overwrite")
save_as_table(mode="overwrite") | 简单,幂等,无需维护 merge key |
| 大数据量,增量更新 | MERGE INTO + merge_delta_data()
merge_delta_data() | 避免全表扫描,保留历史 |
| 需要保留历史快照 | Time Travel + INSERT | Lakehouse 原生支持 |
为什么 Gold 层用 surrogate key?
事实表的外键如果直接用业务主键(如
customer_id
customer_id
),有两个问题:
- 业务主键可能是字符串,JOIN 性能不如整数
- 源系统的 ID 可能变化(合并、重编号),导致历史数据关联断裂
row_number().over(Window.order_by("customer_id"))
row_number().over(Window.order_by("customer_id"))
生成的整数 key 稳定、唯一,是维度建模的标准做法。
多表 JOIN 后为什么必须显式指定列来源?
这样写会有问题:joined.select("customer_id", "country", ...):
因为 ci、ca、la 三张表都有 customer_number 列,ZettaPark 不知道取哪个:
正确写法:每列都指定来源 DataFrame:
df = joined.select(
ci["customer_id"].alias("customer_id"),
la["country"].alias("country"),
其余字段同上省略
)
这不是 ZettaPark 的限制,PySpark 多表 JOIN 后同样需要这样处理。显式指定来源是好习惯,可读性更好,也避免了列名歧义导致的运行时错误。
完整运行顺序
- 安装依赖:
pip install clickzetta_zettapark_python python-dotenv jupyter
- 配置连接:
cp .env.sample .env
编辑 .env,填写 ClickZetta 连接信息:
- 按顺序运行 notebooks:
jupyter nbconvert --to notebook --execute 03_lakehouse/init_lakehouse.ipynb --output 03_lakehouse/init_lakehouse.ipynb
jupyter nbconvert --to notebook --execute 03_lakehouse/01_bronze/bronze.ipynb --output 03_lakehouse/01_bronze/bronze.ipynb
jupyter nbconvert --to notebook --execute 03_lakehouse/02_silver/silver_orchestration.ipynb --output 03_lakehouse/02_silver/silver_orchestration.ipynb
jupyter nbconvert --to notebook --execute 03_lakehouse/03_gold/gold_orchestration.ipynb --output 03_lakehouse/03_gold/gold_orchestration.ipynb
- 验证结果:
jupyter nbconvert --to notebook --execute 04_validate.ipynb --output 04_validate.ipynb
与 PySpark 版本的对照
本项目保留了原始 Databricks Notebooks(
01_spark/
01_spark/
目录),可以逐文件对照。核心差异只有 4 处:
| 差异点 | PySpark | ZettaPark |
|---|
| 方法命名 | withColumn
withColumn / withColumnRenamed
withColumnRenamed | with_column
with_column / with_column_renamed
with_column_renamed |
| Session 创建 | spark
spark (全局注入) | Session.builder.configs({...}).create()
Session.builder.configs({...}).create() |
| 文件路径 | 本地路径 / DBFS | vol://schema.vol/path
vol://schema.vol/path |
| 写入方式 | df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t) | df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite") |
业务逻辑(
F.when().otherwise()
F.when().otherwise()
、
Window.partition_by().order_by()
Window.partition_by().order_by()
、
df.join()
df.join()
、
df.filter()
df.filter()
)
完全一致,一行不需要改。
迁移结论
ZettaPark 与 PySpark 的 DataFrame API 高度兼容,本项目验证了以下结论:
完全兼容(无需修改):
需要修改的 4 处(与 F1 项目一致):
| 差异点 | PySpark | ZettaPark |
|---|
| 方法命名 | withColumn
withColumn / withColumnRenamed
withColumnRenamed | with_column
with_column / with_column_renamed
with_column_renamed |
| Session 创建 | spark
spark (全局注入) | Session.builder.configs({...}).create()
Session.builder.configs({...}).create() |
| 文件路径 | 本地路径 / DBFS | vol://schema.vol/path
vol://schema.vol/path |
| 写入方式 | df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t) | df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite") |
本项目 20/22 验证通过(2 项警告来自源数据质量,非迁移引入),证明 ZettaPark 可以完整承接 PySpark 的 Medallion 数据工程工作负载。
参考