Zettapark DataFrame API 指南
Zettapark 是云器 Lakehouse 的 Python DataFrame API,提供类似 pandas / PySpark 的操作接口。你写的 Python 代码会被翻译成 SQL 在 Lakehouse 中分布式执行,无需手写 SQL。
本文覆盖:创建 DataFrame → 基础转换 → 聚合 → Join → 集合运算 → 缺失值处理 → 窗口函数 → 读写表 → 视图与动态表。
安装
pip install clickzetta_zettapark_python
建立会话
from clickzetta.zettapark.session import Session
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.clickzetta.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
用完关闭:
session.close()
创建 DataFrame
从 Python 数据创建
from clickzetta.zettapark.session import Session
data = [(1, "Alice", 1000.0), (2, "Bob", 2000.0), (3, "Carol", 500.0)]
df = session.create_dataframe(data, schema=["id", "name", "amount"])
df.show()
+---+-----+------+
| id| name|amount|
+---+-----+------+
| 1|Alice| 1000|
| 2| Bob| 2000|
| 3|Carol| 500|
+---+-----+------+
从已有表读取
先建表并插入数据:
session.sql("""
CREATE TABLE IF NOT EXISTS orders (
order_id BIGINT, user_id BIGINT, product STRING,
amount DECIMAL(10,2), status STRING, order_date STRING
)
""").collect()
session.sql("""
INSERT INTO orders VALUES
(1001,101,'iPhone',7999.00,'paid','2024-01-15'),
(1002,102,'MacBook',14999.00,'paid','2024-01-15'),
(1003,101,'AirPods',1799.00,'pending','2024-01-16')
""").collect()
然后用
session.table()
session.table()
读取:
df = session.table("orders")
df.show()
执行 SQL 返回 DataFrame
df = session.sql("SELECT * FROM orders WHERE status = 'paid'")
df.show()
基础转换
from clickzetta.zettapark import functions as F
data = [(1,"A",100.0),(2,"A",200.0),(3,"B",300.0),(4,"B",150.0)]
df = session.create_dataframe(data, schema=["id","category","amount"])
filter — 过滤行
df.filter(F.col("amount") > 150).show()
select — 选择列
df.select("category", "amount").show()
sort — 排序
df.sort("amount", ascending=False).show()
with_column — 新增或替换列
df.with_column("amount_tax", F.col("amount") * 1.13).show()
with_column_renamed — 重命名列
df.with_column_renamed("amount", "price").show()
drop — 删除列
df.drop("id").show()
limit
df.limit(2).show()
聚合
group_by + agg
result = df.group_by("category").agg(
F.sum("amount").alias("total"),
F.count("id").alias("cnt"),
F.avg("amount").alias("avg_amount"),
F.max("amount").alias("max_amount"),
F.min("amount").alias("min_amount")
)
result.show()
+--------+-----+---+----------+---------+---------+
|category|total|cnt|avg_amount|max_amount|min_amount|
+--------+-----+---+----------+---------+---------+
| A| 300| 2| 150| 200| 100|
| B| 450| 2| 225| 300| 150|
+--------+-----+---+----------+---------+---------+
Join
users = session.create_dataframe([(1,"Alice"),(2,"Bob"),(3,"Carol")], schema=["id","name"])
orders = session.create_dataframe([(1,500.0),(1,300.0),(2,800.0)], schema=["user_id","amount"])
inner join(默认)
users.join(orders, users["id"] == orders["user_id"]).show()
left join
users.join(orders, users["id"] == orders["user_id"], "left").show()
+---+-----+-------+------+
| id| name|user_id|amount|
+---+-----+-------+------+
| 1|Alice| 1| 300|
| 1|Alice| 1| 500|
| 2| Bob| 2| 800|
| 3|Carol| NULL| NULL|
+---+-----+-------+------+
cross join
users.cross_join(orders).show()
集合运算
df1 = session.create_dataframe([(1,"A"),(2,"B"),(3,"C")], schema=["id","val"])
df2 = session.create_dataframe([(2,"B"),(3,"C"),(4,"D")], schema=["id","val"])
df1.union_all(df2).show() # 合并(保留重复)
df1.intersect(df2).show() # 交集
df1.except_(df2).show() # 差集(df1 有但 df2 没有)
缺失值处理
data = [(1,"Alice",100.0),(2,None,200.0),(3,"Carol",None)]
df = session.create_dataframe(data, schema=["id","name","amount"])
删除含 NULL 的行
df.dropna().show()
+---+-----+------+
| id| name|amount|
+---+-----+------+
| 1|Alice| 100|
+---+-----+------+
填充 NULL
df.fillna({"name": "Unknown", "amount": 0.0}).show()
+---+-------+------+
| id| name|amount|
+---+-------+------+
| 1| Alice| 100|
| 2|Unknown| 200|
| 3| Carol| 0|
+---+-------+------+
窗口函数
from clickzetta.zettapark.window import Window
data = [(1,"A",100),(2,"A",200),(3,"B",300),(4,"B",150),(5,"A",50)]
df = session.create_dataframe(data, schema=["id","category","amount"])
分组排名
w_rank = Window.partition_by("category").order_by(F.col("amount").desc())
分组累计求和
w_sum = Window.partition_by("category").order_by("amount")
result = df \
.with_column("rank", F.rank().over(w_rank)) \
.with_column("running_total", F.sum("amount").over(w_sum))
result.show()
+---+--------+------+----+-------------+
| id|category|amount|rank|running_total|
+---+--------+------+----+-------------+
| 5| A| 50| 3| 50|
| 1| A| 100| 2| 150|
| 2| A| 200| 1| 350|
| 4| B| 150| 2| 150|
| 3| B| 300| 1| 450|
+---+--------+------+----+-------------+
读写表
写入表
df = session.create_dataframe([(1,"Alice",100.0),(2,"Bob",200.0)], schema=["id","name","amount"])
覆盖写入(表不存在则创建)
df.write.save_as_table("my_table", mode="overwrite")
追加写入
df.write.save_as_table("my_table", mode="append")
读取表
df = session.table("my_table")
df.show()
转为 pandas DataFrame
pdf = df.to_pandas()
print(type(pdf)) # <class 'pandas.core.frame.DataFrame'>
print(pdf.head())
视图与动态表
临时视图(会话内有效)
df.filter(F.col("amount") > 100).create_or_replace_temp_view("high_value_orders")
用 SQL 查询临时视图
session.sql("SELECT * FROM high_value_orders").show()
持久化视图
df.filter(F.col("amount") > 100).create_or_replace_view("v_high_value_orders")
动态表(自动增量刷新)
基于源表定义转换逻辑,系统自动增量刷新
source_df = session.table("raw_orders").filter(F.col("status") == "paid")
source_df.create_or_replace_dynamic_table(
"paid_orders_summary",
lag="1 minute", # 刷新间隔
warehouse="default" # 使用的计算集群
)
详见 Dynamic Table 文档。
查看生成的 SQL
用
explain()
explain()
查看 DataFrame 操作对应的 SQL,便于调试和性能分析:
df.filter(F.col("amount") > 150) \
.group_by("category") \
.agg(F.sum("amount").alias("total")) \
.explain()
输出:
SELECT `category`, sum(`amount`) AS `total`
FROM ( SELECT ... WHERE (`amount` > CAST(150 AS bigint)))
GROUP BY `category`
相关文档