Zettapark DataFrame API 指南

Zettapark 是云器 Lakehouse 的 Python DataFrame API,提供类似 pandas / PySpark 的操作接口。你写的 Python 代码会被翻译成 SQL 在 Lakehouse 中分布式执行,无需手写 SQL。

本文覆盖:创建 DataFrame → 基础转换 → 聚合 → Join → 集合运算 → 缺失值处理 → 窗口函数 → 读写表 → 视图与动态表。


安装

pip install clickzetta_zettapark_python


建立会话

from clickzetta.zettapark.session import Session session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()

用完关闭:

session.close()


创建 DataFrame

从 Python 数据创建

from clickzetta.zettapark.session import Session data = [(1, "Alice", 1000.0), (2, "Bob", 2000.0), (3, "Carol", 500.0)] df = session.create_dataframe(data, schema=["id", "name", "amount"]) df.show()

+---+-----+------+ | id| name|amount| +---+-----+------+ | 1|Alice| 1000| | 2| Bob| 2000| | 3|Carol| 500| +---+-----+------+

从已有表读取

先建表并插入数据:

session.sql(""" CREATE TABLE IF NOT EXISTS orders ( order_id BIGINT, user_id BIGINT, product STRING, amount DECIMAL(10,2), status STRING, order_date STRING ) """).collect() session.sql(""" INSERT INTO orders VALUES (1001,101,'iPhone',7999.00,'paid','2024-01-15'), (1002,102,'MacBook',14999.00,'paid','2024-01-15'), (1003,101,'AirPods',1799.00,'pending','2024-01-16') """).collect()

然后用

session.table()
session.table()
读取:

df = session.table("orders") df.show()

执行 SQL 返回 DataFrame

df = session.sql("SELECT * FROM orders WHERE status = 'paid'") df.show()


基础转换

from clickzetta.zettapark import functions as F data = [(1,"A",100.0),(2,"A",200.0),(3,"B",300.0),(4,"B",150.0)] df = session.create_dataframe(data, schema=["id","category","amount"])

filter — 过滤行

df.filter(F.col("amount") > 150).show()

select — 选择列

df.select("category", "amount").show()

sort — 排序

df.sort("amount", ascending=False).show()

with_column — 新增或替换列

df.with_column("amount_tax", F.col("amount") * 1.13).show()

with_column_renamed — 重命名列

df.with_column_renamed("amount", "price").show()

drop — 删除列

df.drop("id").show()

limit

df.limit(2).show()


聚合

group_by + agg

result = df.group_by("category").agg( F.sum("amount").alias("total"), F.count("id").alias("cnt"), F.avg("amount").alias("avg_amount"), F.max("amount").alias("max_amount"), F.min("amount").alias("min_amount") ) result.show()

+--------+-----+---+----------+---------+---------+ |category|total|cnt|avg_amount|max_amount|min_amount| +--------+-----+---+----------+---------+---------+ | A| 300| 2| 150| 200| 100| | B| 450| 2| 225| 300| 150| +--------+-----+---+----------+---------+---------+


Join

users = session.create_dataframe([(1,"Alice"),(2,"Bob"),(3,"Carol")], schema=["id","name"]) orders = session.create_dataframe([(1,500.0),(1,300.0),(2,800.0)], schema=["user_id","amount"])

inner join(默认)

users.join(orders, users["id"] == orders["user_id"]).show()

left join

users.join(orders, users["id"] == orders["user_id"], "left").show()

+---+-----+-------+------+ | id| name|user_id|amount| +---+-----+-------+------+ | 1|Alice| 1| 300| | 1|Alice| 1| 500| | 2| Bob| 2| 800| | 3|Carol| NULL| NULL| +---+-----+-------+------+

cross join

users.cross_join(orders).show()


集合运算

df1 = session.create_dataframe([(1,"A"),(2,"B"),(3,"C")], schema=["id","val"]) df2 = session.create_dataframe([(2,"B"),(3,"C"),(4,"D")], schema=["id","val"]) df1.union_all(df2).show() # 合并(保留重复) df1.intersect(df2).show() # 交集 df1.except_(df2).show() # 差集(df1 有但 df2 没有)


缺失值处理

data = [(1,"Alice",100.0),(2,None,200.0),(3,"Carol",None)] df = session.create_dataframe(data, schema=["id","name","amount"])

删除含 NULL 的行

df.dropna().show()

+---+-----+------+ | id| name|amount| +---+-----+------+ | 1|Alice| 100| +---+-----+------+

填充 NULL

df.fillna({"name": "Unknown", "amount": 0.0}).show()

+---+-------+------+ | id| name|amount| +---+-------+------+ | 1| Alice| 100| | 2|Unknown| 200| | 3| Carol| 0| +---+-------+------+


窗口函数

from clickzetta.zettapark.window import Window data = [(1,"A",100),(2,"A",200),(3,"B",300),(4,"B",150),(5,"A",50)] df = session.create_dataframe(data, schema=["id","category","amount"])

分组排名

w_rank = Window.partition_by("category").order_by(F.col("amount").desc())

分组累计求和

w_sum = Window.partition_by("category").order_by("amount") result = df \ .with_column("rank", F.rank().over(w_rank)) \ .with_column("running_total", F.sum("amount").over(w_sum)) result.show()

+---+--------+------+----+-------------+ | id|category|amount|rank|running_total| +---+--------+------+----+-------------+ | 5| A| 50| 3| 50| | 1| A| 100| 2| 150| | 2| A| 200| 1| 350| | 4| B| 150| 2| 150| | 3| B| 300| 1| 450| +---+--------+------+----+-------------+


读写表

写入表

df = session.create_dataframe([(1,"Alice",100.0),(2,"Bob",200.0)], schema=["id","name","amount"])

覆盖写入(表不存在则创建)

df.write.save_as_table("my_table", mode="overwrite")

追加写入

df.write.save_as_table("my_table", mode="append")

读取表

df = session.table("my_table") df.show()

转为 pandas DataFrame

pdf = df.to_pandas() print(type(pdf)) # <class 'pandas.core.frame.DataFrame'> print(pdf.head())


视图与动态表

临时视图(会话内有效)

df.filter(F.col("amount") > 100).create_or_replace_temp_view("high_value_orders")

用 SQL 查询临时视图

session.sql("SELECT * FROM high_value_orders").show()

持久化视图

df.filter(F.col("amount") > 100).create_or_replace_view("v_high_value_orders")

动态表(自动增量刷新)

基于源表定义转换逻辑,系统自动增量刷新

source_df = session.table("raw_orders").filter(F.col("status") == "paid") source_df.create_or_replace_dynamic_table( "paid_orders_summary", lag="1 minute", # 刷新间隔 warehouse="default" # 使用的计算集群 )

详见 Dynamic Table 文档


查看生成的 SQL

explain()
explain()
查看 DataFrame 操作对应的 SQL,便于调试和性能分析:

df.filter(F.col("amount") > 150) \ .group_by("category") \ .agg(F.sum("amount").alias("total")) \ .explain()

输出:

SELECT `category`, sum(`amount`) AS `total` FROM ( SELECT ... WHERE (`amount` > CAST(150 AS bigint))) GROUP BY `category`


相关文档

文档说明
Zettapark 快速上手安装和基础示例
Python Connector SDK标准 SQL 执行接口
Dynamic Table自动增量刷新的数据管道
BulkLoad 批量导入百万行级高速写入
联系我们
预约咨询
微信咨询
电话咨询