Zettapark 数据工程实战
本文通过订单分析场景,展示 Zettapark 在完整数据工程流程中的用法。
环境准备
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
from clickzetta.zettapark.window import Window
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.clickzetta.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
前置数据准备
本文所有示例基于以下两张表,运行前先执行:
建表
session.sql("""
CREATE TABLE IF NOT EXISTS orders (
order_id BIGINT,
user_id BIGINT,
product STRING,
amount DECIMAL(10, 2),
status STRING,
order_date STRING
)
""").collect()
session.sql("""
CREATE TABLE IF NOT EXISTS users (
user_id BIGINT,
name STRING,
city STRING,
level STRING
)
""").collect()
插入测试数据
session.sql("""
INSERT INTO orders VALUES
(1001, 101, 'iPhone', 7999.00, 'paid', '2024-01-15'),
(1002, 102, 'MacBook', 14999.00, 'paid', '2024-01-15'),
(1003, 101, 'AirPods', 1799.00, 'pending', '2024-01-16'),
(1004, 103, 'iPad', 8999.00, 'paid', '2024-01-16'),
(1005, 102, 'Watch', 3299.00, 'cancelled', '2024-01-17'),
(1006, 101, 'MacBook', 14999.00, 'paid', '2024-01-17')
""").collect()
session.sql("""
INSERT INTO users VALUES
(101, 'Alice', 'Beijing', 'gold'),
(102, 'Bob', 'Shanghai', 'silver'),
(103, 'Carol', 'Guangzhou', 'bronze')
""").collect()
💡
order_dateorder_date
使用 STRING 类型存储日期字符串,查询时用
F.to_date()F.to_date()
转换。这是因为 Lakehouse 对 DATE 列的隐式类型转换有限制,STRING 更灵活。
场景1:多表 Join + 聚合 + 写入结果表
将订单表和用户表关联,统计每个用户的消费汇总,写入结果表。
orders = session.table("orders") # order_id, user_id, product, amount, status, order_date
users = session.table("users") # user_id, name, city, level
注意:join 两表有同名列(user_id)时,先 select 重命名消除歧义
paid = orders.filter(F.col("status") == "paid") \
.select(
F.col("order_id"),
orders["user_id"].alias("o_user_id"), # 重命名避免 join 后歧义
F.col("amount"),
F.col("order_date")
)
result = paid.join(users, paid["o_user_id"] == users["user_id"]) \
.group_by("user_id", "name", "city", "level") \
.agg(
F.count(F.col("order_id")).alias("order_count"),
F.sum(F.col("amount")).alias("total_amount"),
F.max(F.col("order_date")).alias("last_order_date")
) \
.sort(F.col("total_amount").desc())
result.show()
+-------+-----+---------+------+-----------+------------+---------------+
|user_id| name| city| level|order_count|total_amount|last_order_date|
+-------+-----+---------+------+-----------+------------+---------------+
| 101|Alice| Beijing| gold| 2| 22998.00| 2024-01-17|
| 102| Bob| Shanghai|silver| 1| 14999.00| 2024-01-15|
| 103|Carol|Guangzhou|bronze| 1| 8999.00| 2024-01-16|
+-------+-----+---------+------+-----------+------------+---------------+
写入结果表
result.write.save_as_table("user_order_summary", mode="overwrite")
⚠️ Join 同名列歧义 :两张表有相同列名时,join 后
group_bygroup_by
或
selectselect
会报
cannot resolve columncannot resolve column
。解决方法:join 前用
.alias().alias()
重命名其中一张表的同名列。
场景2:窗口函数 — 排名与累计统计
summary = session.table("user_order_summary")
按消费金额降序排名
w_rank = Window.order_by(F.col("total_amount").desc())
分城市统计累计消费
w_city = Window.partition_by("city").order_by(F.col("total_amount").desc())
result = summary \
.with_column("rank", F.rank().over(w_rank)) \
.with_column("city_rank", F.rank().over(w_city)) \
.with_column("running_total", F.sum("total_amount").over(w_city))
result.show()
+-------+-----+------+-----------+------------+----+---------+-------------+
|user_id| name| level|order_count|total_amount|rank|city_rank|running_total|
+-------+-----+------+-----------+------------+----+---------+-------------+
| 101|Alice| gold| 2| 22998.00| 1| 1| 22998.00|
| 102| Bob|silver| 1| 14999.00| 2| 1| 14999.00|
| 103|Carol|bronze| 1| 8999.00| 3| 1| 8999.00|
+-------+-----+------+-----------+------------+----+---------+-------------+
场景3:创建视图供 BI 使用
创建已支付订单视图,加入年月维度便于 BI 分析
orders.filter(F.col("status") == "paid") \
.select(
F.col("order_id"),
F.col("user_id"),
F.col("product"),
F.col("amount"),
F.col("order_date"),
F.year(F.to_date(F.col("order_date"))).alias("year"),
F.month(F.to_date(F.col("order_date"))).alias("month"),
).create_or_replace_view("v_paid_orders")
BI 工具直接查询视图
session.table("v_paid_orders").show()
场景4:增量处理
只处理某个时间点之后的新数据,适合定时增量 ETL:
只处理 2024-01-16 之后的新订单
cutoff = "2024-01-16"
new_orders = orders.filter(F.col("order_date") >= cutoff)
print(f"新增订单数: {new_orders.count()}")
new_orders.show()
增量写入目标表(append 模式)
new_orders.filter(F.col("status") == "paid") \
.write.save_as_table("paid_orders_archive", mode="append")
场景5:数据质量检查
在写入之前检查数据质量:
检查 NULL 值
null_counts = orders.select(
F.count(F.lit(1)).alias("total"),
F.sum(F.iff(F.is_null(F.col("amount")), F.lit(1), F.lit(0))).alias("null_amount"),
F.sum(F.iff(F.is_null(F.col("user_id")), F.lit(1), F.lit(0))).alias("null_user_id"),
)
null_counts.show()
检查状态分布
orders.group_by("status").agg(
F.count(F.lit(1)).alias("cnt")
).sort("cnt", ascending=False).show()
检查金额异常(负数或超大值)
anomalies = orders.filter(
(F.col("amount") <= 0) | (F.col("amount") > 100000)
)
print(f"异常订单数: {anomalies.count()}")
场景6:与 SQL 混合使用
复杂逻辑可以直接用
session.sql()session.sql()
执行,结果还是 DataFrame,可以继续链式操作:
用 SQL 执行复杂查询,返回 DataFrame 继续处理
df = session.sql("""
SELECT
user_id,
DATE_TRUNC('month', TO_DATE(order_date)) AS month,
SUM(amount) AS monthly_amount
FROM orders
WHERE status = 'paid'
GROUP BY user_id, DATE_TRUNC('month', TO_DATE(order_date))
""")
继续用 DataFrame API 处理
w = Window.partition_by("user_id").order_by("month")
df.with_column("cumulative", F.sum("monthly_amount").over(w)).show()
注意事项
DATE 列写入 :通过
session.sql()session.sql()
INSERT 时,字符串无法隐式转换为 DATE 类型。建议:
Join 同名列歧义 :两表 join 后如有同名列,
group_bygroup_by
时会报错。解决方法:join 前对其中一张表的同名列用
.alias().alias()
重命名。
链式操作的惰性求值 :Zettapark DataFrame 是惰性的,只有调用
show()show()
、
collect()collect()
、
count()count()
、
write.save_as_table()write.save_as_table()
等 action 时才会真正执行。
相关文档