在 ClickZetta Lakehouse 上从零构建 Medallion 三层数仓

如果你在 Databricks 上做过 Medallion 架构,切换到 ClickZetta Lakehouse 的成本主要在环境配置,而不在建模逻辑和代码改写。Bronze 摄取、Silver 清洗、Gold 维度建模——这套思路和代码在 ZettaPark 上可以直接复用。

本文用一个真实迁移项目演示完整的三层建模过程:将基于 Apache Spark 技术栈(Databricks + PySpark)的 Medallion 架构示例迁移到 ClickZetta Lakehouse,经过 22 项自动化验证,20/22 通过(2 项警告来自源数据质量问题,非迁移引入)。

完整代码见 GitHub:spark2lakehouse-medallion


原始项目

spark2lakehouse-medallion 基于 DataWithBaraa/databricks_bootcamp_2026 改造,演示如何在 Databricks 上用 Medallion 架构整合 CRM 和 ERP 两套系统的数据,最终输出可供 BI 分析的维度模型。项目包含 6 张源表(CRM 3 张 + ERP 3 张),经过三层处理后输出 2 张维度表(

dim_customers
dim_customers
dim_products
dim_products
)和 1 张事实表(
fact_sales
fact_sales
)。

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,原始 Databricks Notebook 保留在
01_spark/
01_spark/
供对照。

结论先行

业务逻辑一行不需要改。 4 处改动全是机械替换,不涉及任何建模逻辑:导入路径、Session 创建、方法命名(

withColumn
withColumn
with_column
with_column
)、写入方式。Bronze/Silver/Gold 三层的处理逻辑、维度建模、Window 函数、多源 JOIN——全部直接复用。

改动项工作量说明
导入路径极低
pyspark.sql
pyspark.sql
clickzetta.zettapark
clickzetta.zettapark
Session 创建极低
spark
spark
(全局注入)→
Session.builder.configs({}).create()
Session.builder.configs({}).create()
方法命名极低
withColumn
withColumn
with_column
with_column
withColumnRenamed
withColumnRenamed
with_column_renamed
with_column_renamed
写入方式极低
df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t)
df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite")

如果原始项目不依赖 Databricks 全局注入的

spark
spark
,兼容性接近 100%。


技术栈对比

原始项目迁移后
计算引擎Apache Spark(Databricks)ClickZetta Lakehouse
DataFrame APIPySpark (
pyspark.sql
pyspark.sql
)
ZettaPark (
clickzetta.zettapark
clickzetta.zettapark
)
开发环境Databricks NotebookJupyter Notebook(本地)
存储格式Delta LakeLakehouse 原生表
文件存储DBFS / ADLSVolume(
vol://schema.vol/...
vol://schema.vol/...
Session 管理
spark
spark
(全局注入)
Session.builder.configs({}).create()
Session.builder.configs({}).create()
方法命名
withColumn
withColumn
/
withColumnRenamed
withColumnRenamed
with_column
with_column
/
with_column_renamed
with_column_renamed
写入方式
df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t)
df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite")
数据层隔离Delta 数据库独立 Schema(bronze / silver / gold)

变化的主要是运行环境——从 Databricks Notebook 换成本地 Jupyter,从 DBFS 换成 Volume。数据处理和建模的核心逻辑完全不变:清洗、去重、多源 JOIN、Window 函数、维度建模,这些在 ZettaPark 上写法与 PySpark 一致。业务逻辑(

F.when().otherwise()
F.when().otherwise()
Window.partition_by().order_by()
Window.partition_by().order_by()
df.join()
df.join()
df.filter()
df.filter()
)完全一致,一行不需要改。

你不需要重新设计数据架构,也不需要重写 Bronze/Silver/Gold 的处理逻辑。4 处改动全是机械替换(导入路径、Session 创建、方法命名、写入方式),不涉及任何业务逻辑。如果原始项目不依赖 Databricks 全局注入的

spark
spark
(即用本地 PySpark 运行),兼容性可以接近 100%。使用了 Python UDF 或
df.write.partitionBy()
df.write.partitionBy()
的项目,兼容性会相应降低。



架构概览

Medallion 架构将数据处理分为三层,每层有明确的职责边界:

数据源(CSV) ↓ session.read.csv("vol://...") Bronze 层(schema: bronze) 原始数据,不做任何转换,只做格式统一 ↓ with_column / filter / drop_duplicates Silver 层(schema: silver) 清洗、去重、标准化,多源数据整合 ↓ join + row_number().over(window) Gold 层(schema: gold) 维度建模,dim/fact 表,直接面向分析

三层对应三个独立的 Schema,物理隔离,互不干扰。每层只读上一层的数据,不跨层访问。


Medallion 数据模型

Medallion 架构(也称多跳架构)是数据湖仓一体场景下的主流数据组织模式,由 Databricks 推广,现已成为行业标准。

传统数据仓库把原始数据和加工后的数据混在一起,出了问题很难追溯,也很难重跑。Medallion 架构的核心思想是分层隔离:原始数据永远保留在 Bronze 层,每一层只做一件事,出问题可以从上一层重跑,不影响其他层。

这个模式特别适合多源数据整合的场景——不同系统的数据质量参差不齐,Bronze 层先把数据"存下来",Silver 层再做清洗和标准化,Gold 层专注于面向业务的建模,职责清晰,互不干扰。

三层职责

别名职责数据状态
Bronze原始层原样摄取,不做任何转换原始数据,可能含脏数据、重复、格式不一
Silver清洗层去重、清洗、标准化、多源整合干净、可信、业务语义化
Gold服务层维度建模,面向分析和 BI聚合宽表、星型模型

三层物理隔离的好处:任何一层出问题,可以从上一层重跑,不影响其他层,也不需要重新下载原始数据。

星型模型

Gold 层通常采用星型模型(Star Schema):一张事实表居中,多张维度表围绕。

dim_customers │ dim_products ── fact_sales

事实表(Fact Table)存储业务事件(订单、交易),每行是一个事件,包含度量值(金额、数量)和指向维度表的外键。维度表(Dimension Table)存储业务实体的属性(客户信息、产品信息),通过外键与事实表关联。

本项目的 Gold 层就是这个结构:

fact_sales
fact_sales
通过
customer_key
customer_key
product_key
product_key
关联
dim_customers
dim_customers
dim_products
dim_products

为什么需要代理键

维度表的主键有两种选择:业务主键(Natural Key,来自源系统,如

customer_id = "C001"
customer_id = "C001"
)和代理键(Surrogate Key,数仓内部生成的整数,如
customer_key = 1
customer_key = 1
)。

使用代理键的原因:

  • 性能:整数 JOIN 比字符串快
  • 稳定性:源系统 ID 可能随业务变化(合并、重编号),代理键不受影响
  • 跨源整合:多个源系统的同一实体可以映射到同一个代理键

本项目用

F.row_number().over(Window.order_by("customer_id"))
F.row_number().over(Window.order_by("customer_id"))
生成代理键,简单有效。


环境初始化

在跑任何 notebook 之前,先建好 Schema 和 Volume,再把 CSV 文件上传上去。这一步只需要跑一次。

init_lakehouse.ipynb:

from clickzetta.zettapark.session import Session import os from dotenv import load_dotenv load_dotenv() session = Session.builder.configs({ "username": os.environ["CLICKZETTA_USERNAME"], "password": os.environ["CLICKZETTA_PASSWORD"], "service": os.environ["CLICKZETTA_SERVICE"], "instance": os.environ["CLICKZETTA_INSTANCE"], "workspace": os.environ["CLICKZETTA_WORKSPACE"], "schema": os.environ["CLICKZETTA_SCHEMA"], "vcluster": os.environ["CLICKZETTA_VCLUSTER"], }).create()

建三层 Schema:

for schema in ["bronze", "silver", "gold"]: session.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}").collect()

建 Volume(存放原始 CSV):

vol_schema = os.environ["CLICKZETTA_SCHEMA"] vol_name = os.environ.get("CLICKZETTA_VOLUME", "medallion_vol") session.sql(f"CREATE VOLUME IF NOT EXISTS {vol_schema}.{vol_name}").collect()

上传本地 CSV 到 Volume:

import pathlib datasets_dir = pathlib.Path("../datasets") vol_base = f"vol://{vol_schema}.{vol_name}" for csv_file in datasets_dir.rglob("*.csv"): relative = csv_file.relative_to(datasets_dir) dest = f"{vol_base}/{relative}" session.file.put(str(csv_file), dest, auto_compress=False, overwrite=True) print(f" {relative} → {dest}")

.env
.env
配置:

CLICKZETTA_USERNAME=your_username CLICKZETTA_PASSWORD=your_password CLICKZETTA_SERVICE=cn-shanghai-alicloud.api.clickzetta.com CLICKZETTA_INSTANCE=your_instance_id CLICKZETTA_WORKSPACE=your_workspace CLICKZETTA_SCHEMA=public CLICKZETTA_VCLUSTER=DEFAULT_AP CLICKZETTA_VOLUME=medallion_vol


Bronze 层:原始数据摄取

Bronze 层的原则是不做任何业务转换,只把 CSV 读进来写成表。这样做的好处是:如果下游出了问题,可以从 Bronze 层重跑,不需要重新下载原始数据。

03_lakehouse/01_bronze/bronze.ipynb:

VOL_SCHEMA = os.environ.get("CLICKZETTA_SCHEMA", "public") VOLUME = os.environ.get("CLICKZETTA_VOLUME", "medallion_vol") INGESTION_CONFIG = [ {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/cust_info.csv", "table": "bronze.crm_cust_info"}, {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/prd_info.csv", "table": "bronze.crm_prd_info"}, {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_crm/sales_details.csv", "table": "bronze.crm_sales_details"}, {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/CUST_AZ12.csv", "table": "bronze.erp_cust_az12"}, {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/LOC_A101.csv", "table": "bronze.erp_loc_a101"}, {"path": f"vol://{VOL_SCHEMA}.{VOLUME}/source_erp/PX_CAT_G1V2.csv", "table": "bronze.erp_px_cat_g1v2"}, ] for item in INGESTION_CONFIG: print(f"Ingesting → {item['table']}") df = session.read.option("header", "true").csv(item["path"]) df.write.save_as_table(item["table"], mode="overwrite") print(" OK")

6 张表,6 行配置,统一处理。

mode="overwrite"
mode="overwrite"
保证幂等性——重跑不会产生重复数据。


Silver 层:清洗与标准化

Silver 层做三件事:去空值、标准化枚举值、重命名列。每个源表对应一个 notebook,逻辑独立,互不依赖。

客户信息清洗(crm_cust_info → silver.crm_customers)

03_lakehouse/02_silver/crm/silver_crm_cust_info.ipynb:

from clickzetta.zettapark.types import StringType from clickzetta.zettapark import functions as F df = session.table("bronze.crm_cust_info")

  1. 去掉所有字符串列的首尾空格:

for field in df.schema.fields: if isinstance(field.datatype, StringType): df = df.with_column(field.name, F.trim(F.col(field.name)))

  1. 标准化枚举值:

df = ( df .with_column( "cst_marital_status", F.when(F.upper(F.col("cst_marital_status")) == "S", "Single") .when(F.upper(F.col("cst_marital_status")) == "M", "Married") .otherwise("n/a") ) .with_column( "cst_gndr", F.when(F.upper(F.col("cst_gndr")) == "F", "Female") .when(F.upper(F.col("cst_gndr")) == "M", "Male") .otherwise("n/a") ) )

  1. 过滤掉主键为空的行:

df = df.filter(F.col("cst_id").is_not_null())

  1. 重命名列(业务语义化):

RENAME_MAP = { "cst_id": "customer_id", "cst_key": "customer_number", "cst_firstname": "first_name", "cst_lastname": "last_name", "cst_marital_status": "marital_status", "cst_gndr": "gender", "cst_create_date": "created_date", } for old, new in RENAME_MAP.items(): df = df.with_column_renamed(old, new) df.write.save_as_table("silver.crm_customers", mode="overwrite")

注意

with_column_renamed
with_column_renamed
是 ZettaPark 的命名风格(snake_case),对应 PySpark 的
withColumnRenamed
withColumnRenamed
,行为完全一致。

去重:按优先级保留最新记录

当源数据存在重复 ID 时,不能简单

drop_duplicates
drop_duplicates
——需要按业务规则决定保留哪条。这里用
row_number()
row_number()
按创建时间降序排,只保留最新的一条:

03_lakehouse/02_silver/crm/silver_crm_prd_info.ipynb:

from clickzetta.zettapark.window import Window

按 prd_id 分组,取 prd_start_dt 最新的一条:

window = Window.partition_by("prd_id").order_by(F.col("prd_start_dt").desc()) df = ( df .with_column("_row_num", F.row_number().over(window)) .filter(F.col("_row_num") == 1) .select([c for c in df.columns]) # 去掉辅助列 )

这个模式在多源数据整合时很常见:同一个产品在 CRM 和 ERP 里都有记录,以某个系统为主,另一个作为补充。

Silver 层编排

6 个 notebook 按依赖顺序运行:

03_lakehouse/02_silver/silver_orchestration.ipynb:

import subprocess, sys NOTEBOOKS = [ "crm/silver_crm_cust_info.ipynb", "crm/silver_crm_prd_info.ipynb", "crm/silver_crm_sales_details.ipynb", "erp/silver_erp_cust_az12.ipynb", "erp/silver_erp_loc_a101.ipynb", "erp/silver_erp_px_cat_g1v2.ipynb", ] for nb in NOTEBOOKS: print(f"Running {nb}...") result = subprocess.run( ["jupyter", "nbconvert", "--to", "notebook", "--execute", nb, "--output", nb, "--ExecutePreprocessor.timeout=300"], capture_output=True, text=True ) if result.returncode != 0: print(f"FAILED: {nb}") print(result.stderr) sys.exit(1) print(f" OK")


Gold 层:维度建模

Gold 层的核心工作是多源整合生成 surrogate key

客户维度表(dim_customers)

CRM 的客户信息是主表,ERP 的客户信息和地区信息作为补充,用 LEFT JOIN 整合:

03_lakehouse/03_gold/gold_dim_customers.ipynb:

from clickzetta.zettapark.window import Window ci = session.table("silver.crm_customers") # CRM 客户(主) ca = session.table("silver.erp_customers") # ERP 客户(补充 gender、birthdate) la = session.table("silver.erp_customer_location") # ERP 地区(补充 country) joined = ( ci.join(ca, ci["customer_number"] == ca["customer_number"], "left") .join(la, ci["customer_number"] == la["customer_number"], "left") )

多表 JOIN 后必须显式指定每列的来源,否则同名列会产生歧义:

df = joined.select( ci["customer_id"].alias("customer_id"), ci["customer_number"].alias("customer_number"), ci["first_name"].alias("first_name"), ci["last_name"].alias("last_name"), la["country"].alias("country"), ci["marital_status"].alias("marital_status"), F.when(ci["gender"] != "n/a", ci["gender"]) # gender:CRM为主,n/a 时用 ERP .otherwise(F.coalesce(ca["gender"], F.lit("n/a"))).alias("gender"), ca["birth_date"].alias("birthdate"), ci["created_date"].alias("create_date"), )

生成 surrogate key(全局唯一的整数主键):

w = Window.order_by(F.col("customer_id")) df = df.with_column("customer_key", F.row_number().over(w))

调整列顺序,surrogate key 放第一列:

df = df.select( "customer_key", "customer_id", "customer_number", "first_name", "last_name", "country", "marital_status", "gender", "birthdate", "create_date", ) df.write.save_as_table("gold.dim_customers", mode="overwrite")

为什么需要 surrogate key?

customer_id
customer_id
是业务主键,来自源系统,可能在不同系统间不一致,也可能随业务变化。
customer_key
customer_key
是数仓内部的整数主键,稳定、唯一,事实表用它做外键关联,不依赖源系统的 ID 格式。

销售事实表(fact_sales)

事实表通过 surrogate key 关联维度表,不存储冗余的业务属性:

03_lakehouse/03_gold/gold_fact_sales.ipynb:

sd = session.table("silver.crm_sales") dp = session.table("gold.dim_products") dc = session.table("gold.dim_customers")

用 surrogate key 关联维度表:

joined = ( sd.join(dp, sd["sls_prd_key"] == dp["product_number"], "left") .join(dc, sd["sls_cust_id"] == dc["customer_id"], "left") ) df = joined.select( sd["sls_ord_num"].alias("order_number"), dp["product_key"].alias("product_key"), # surrogate key dc["customer_key"].alias("customer_key"), # surrogate key sd["sls_order_dt"].alias("order_date"), sd["sls_ship_dt"].alias("ship_date"), sd["sls_due_dt"].alias("due_date"), sd["sls_sales"].alias("sales_amount"), sd["sls_quantity"].alias("quantity"), sd["sls_price"].alias("unit_price"), ) df.write.save_as_table("gold.fact_sales", mode="overwrite")


数据质量验证

管道跑完之后,用

04_validate.ipynb
04_validate.ipynb
做 22 项自动化检查。验证逻辑直接用 ZettaPark 查询,不依赖外部框架:

04_validate.ipynb(节选):

def check(label, condition_sql, expect_zero=True): """执行一条 SQL,检查结果是否符合预期""" result = session.sql(condition_sql).collect()[0][0] passed = (result == 0) if expect_zero else (result > 0) status = "✓" if passed else "✗ FAIL" print(f" {status} {label}: {result}") return passed

Bronze → Silver 行数一致性:

check( "silver.crm_customers 不少于 bronze.crm_cust_info(过滤空值后)", """ SELECT ABS( (SELECT COUNT(*) FROM silver.crm_customers) - (SELECT COUNT(*) FROM bronze.crm_cust_info WHERE cst_id IS NOT NULL) ) """, expect_zero=True )

关键列无空值:

check( "dim_customers.customer_key 无空值", "SELECT COUNT(*) FROM gold.dim_customers WHERE customer_key IS NULL", expect_zero=True )

surrogate key 唯一性:

check( "dim_customers.customer_key 无重复", """ SELECT COUNT(*) FROM ( SELECT customer_key, COUNT(*) AS cnt FROM gold.dim_customers GROUP BY customer_key HAVING cnt > 1 ) """, expect_zero=True )

外键完整性:

check( "fact_sales 所有 customer_key 均能关联到 dim_customers", """ SELECT COUNT(*) FROM gold.fact_sales f LEFT JOIN gold.dim_customers d ON f.customer_key = d.customer_key WHERE d.customer_key IS NULL """, expect_zero=True )

实际运行结果:20/22 通过。2 项警告:

  • bronze.crm_cust_info
    bronze.crm_cust_info
    存在 5 组重复
    customer_id
    customer_id
    (源数据质量问题)
  • bronze.crm_sales_details
    bronze.crm_sales_details
    存在 3 条负销售额(退货/冲销单,业务正常现象)

这两项不是迁移引入的问题,在原始 Databricks 项目中同样存在。


关键设计决策

为什么用
mode="overwrite"
mode="overwrite"
而不是 MERGE INTO?

Medallion 项目的数据量小(几千行),全量覆盖写入比增量 MERGE 简单得多,也更容易保证幂等性。如果数据量大(百万行以上),或者需要保留历史版本,才值得引入 MERGE INTO 的复杂度。

对比:

场景推荐写法原因
小数据量,全量刷新
save_as_table(mode="overwrite")
save_as_table(mode="overwrite")
简单,幂等,无需维护 merge key
大数据量,增量更新MERGE INTO +
merge_delta_data()
merge_delta_data()
避免全表扫描,保留历史
需要保留历史快照Time Travel + INSERTLakehouse 原生支持

为什么 Gold 层用 surrogate key?

事实表的外键如果直接用业务主键(如

customer_id
customer_id
),有两个问题:

  1. 业务主键可能是字符串,JOIN 性能不如整数
  2. 源系统的 ID 可能变化(合并、重编号),导致历史数据关联断裂

row_number().over(Window.order_by("customer_id"))
row_number().over(Window.order_by("customer_id"))
生成的整数 key 稳定、唯一,是维度建模的标准做法。

多表 JOIN 后为什么必须显式指定列来源?

这样写会有问题:joined.select("customer_id", "country", ...):

因为 ci、ca、la 三张表都有 customer_number 列,ZettaPark 不知道取哪个:

正确写法:每列都指定来源 DataFrame:

df = joined.select( ci["customer_id"].alias("customer_id"), la["country"].alias("country"), 其余字段同上省略 )

这不是 ZettaPark 的限制,PySpark 多表 JOIN 后同样需要这样处理。显式指定来源是好习惯,可读性更好,也避免了列名歧义导致的运行时错误。


完整运行顺序

  1. 安装依赖:

pip install clickzetta_zettapark_python python-dotenv jupyter

  1. 配置连接:

cp .env.sample .env

编辑 .env,填写 ClickZetta 连接信息:

  1. 按顺序运行 notebooks:

jupyter nbconvert --to notebook --execute 03_lakehouse/init_lakehouse.ipynb --output 03_lakehouse/init_lakehouse.ipynb jupyter nbconvert --to notebook --execute 03_lakehouse/01_bronze/bronze.ipynb --output 03_lakehouse/01_bronze/bronze.ipynb jupyter nbconvert --to notebook --execute 03_lakehouse/02_silver/silver_orchestration.ipynb --output 03_lakehouse/02_silver/silver_orchestration.ipynb jupyter nbconvert --to notebook --execute 03_lakehouse/03_gold/gold_orchestration.ipynb --output 03_lakehouse/03_gold/gold_orchestration.ipynb

  1. 验证结果:

jupyter nbconvert --to notebook --execute 04_validate.ipynb --output 04_validate.ipynb


与 PySpark 版本的对照

本项目保留了原始 Databricks Notebooks(

01_spark/
01_spark/
目录),可以逐文件对照。核心差异只有 4 处:

差异点PySparkZettaPark
方法命名
withColumn
withColumn
/
withColumnRenamed
withColumnRenamed
with_column
with_column
/
with_column_renamed
with_column_renamed
Session 创建
spark
spark
(全局注入)
Session.builder.configs({...}).create()
Session.builder.configs({...}).create()
文件路径本地路径 / DBFS
vol://schema.vol/path
vol://schema.vol/path
写入方式
df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t)
df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite")

业务逻辑(

F.when().otherwise()
F.when().otherwise()
Window.partition_by().order_by()
Window.partition_by().order_by()
df.join()
df.join()
df.filter()
df.filter()
完全一致,一行不需要改


迁移结论

ZettaPark 与 PySpark 的 DataFrame API 高度兼容,本项目验证了以下结论:

完全兼容(无需修改):

  • DataFrame 链式操作:
    filter
    filter
    select
    select
    join
    join
    group_by
    group_by
    agg
    agg
    drop_duplicates
    drop_duplicates
  • 函数库:
    F.when().otherwise()
    F.when().otherwise()
    F.trim()
    F.trim()
    F.upper()
    F.upper()
    F.coalesce()
    F.coalesce()
    F.row_number()
    F.row_number()
    F.isnotnull()
    F.isnotnull()
  • Window 函数:
    Window.partition_by().order_by()
    Window.partition_by().order_by()
    ,窗口聚合行为完全一致
  • SQL DML:MERGE INTO、INSERT、UPDATE、DELETE

需要修改的 4 处(与 F1 项目一致):

差异点PySparkZettaPark
方法命名
withColumn
withColumn
/
withColumnRenamed
withColumnRenamed
with_column
with_column
/
with_column_renamed
with_column_renamed
Session 创建
spark
spark
(全局注入)
Session.builder.configs({...}).create()
Session.builder.configs({...}).create()
文件路径本地路径 / DBFS
vol://schema.vol/path
vol://schema.vol/path
写入方式
df.write.mode("overwrite").saveAsTable(t)
df.write.mode("overwrite").saveAsTable(t)
df.write.save_as_table(t, mode="overwrite")
df.write.save_as_table(t, mode="overwrite")

本项目 20/22 验证通过(2 项警告来自源数据质量,非迁移引入),证明 ZettaPark 可以完整承接 PySpark 的 Medallion 数据工程工作负载。


参考

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询