Zettapark 数据工程实战

本文通过订单分析场景,展示 Zettapark 在完整数据工程流程中的用法。


环境准备

from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F from clickzetta.zettapark.window import Window session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()


前置数据准备

本文所有示例基于以下两张表,运行前先执行:

建表

session.sql(""" CREATE TABLE IF NOT EXISTS orders ( order_id BIGINT, user_id BIGINT, product STRING, amount DECIMAL(10, 2), status STRING, order_date STRING ) """).collect() session.sql(""" CREATE TABLE IF NOT EXISTS users ( user_id BIGINT, name STRING, city STRING, level STRING ) """).collect()

插入测试数据

session.sql(""" INSERT INTO orders VALUES (1001, 101, 'iPhone', 7999.00, 'paid', '2024-01-15'), (1002, 102, 'MacBook', 14999.00, 'paid', '2024-01-15'), (1003, 101, 'AirPods', 1799.00, 'pending', '2024-01-16'), (1004, 103, 'iPad', 8999.00, 'paid', '2024-01-16'), (1005, 102, 'Watch', 3299.00, 'cancelled', '2024-01-17'), (1006, 101, 'MacBook', 14999.00, 'paid', '2024-01-17') """).collect() session.sql(""" INSERT INTO users VALUES (101, 'Alice', 'Beijing', 'gold'), (102, 'Bob', 'Shanghai', 'silver'), (103, 'Carol', 'Guangzhou', 'bronze') """).collect()


场景1:多表 Join + 聚合 + 写入结果表

将订单表和用户表关联,统计每个用户的消费汇总,写入结果表。

orders = session.table("orders") # order_id, user_id, product, amount, status, order_date users = session.table("users") # user_id, name, city, level

注意:join 两表有同名列(user_id)时,先 select 重命名消除歧义

paid = orders.filter(F.col("status") == "paid") \ .select( F.col("order_id"), orders["user_id"].alias("o_user_id"), # 重命名避免 join 后歧义 F.col("amount"), F.col("order_date") ) result = paid.join(users, paid["o_user_id"] == users["user_id"]) \ .group_by("user_id", "name", "city", "level") \ .agg( F.count(F.col("order_id")).alias("order_count"), F.sum(F.col("amount")).alias("total_amount"), F.max(F.col("order_date")).alias("last_order_date") ) \ .sort(F.col("total_amount").desc()) result.show()

+-------+-----+---------+------+-----------+------------+---------------+ |user_id| name| city| level|order_count|total_amount|last_order_date| +-------+-----+---------+------+-----------+------------+---------------+ | 101|Alice| Beijing| gold| 2| 22998.00| 2024-01-17| | 102| Bob| Shanghai|silver| 1| 14999.00| 2024-01-15| | 103|Carol|Guangzhou|bronze| 1| 8999.00| 2024-01-16| +-------+-----+---------+------+-----------+------------+---------------+

写入结果表

result.write.save_as_table("user_order_summary", mode="overwrite")


场景2:窗口函数 — 排名与累计统计

summary = session.table("user_order_summary")

按消费金额降序排名

w_rank = Window.order_by(F.col("total_amount").desc())

分城市统计累计消费

w_city = Window.partition_by("city").order_by(F.col("total_amount").desc()) result = summary \ .with_column("rank", F.rank().over(w_rank)) \ .with_column("city_rank", F.rank().over(w_city)) \ .with_column("running_total", F.sum("total_amount").over(w_city)) result.show()

+-------+-----+------+-----------+------------+----+---------+-------------+ |user_id| name| level|order_count|total_amount|rank|city_rank|running_total| +-------+-----+------+-----------+------------+----+---------+-------------+ | 101|Alice| gold| 2| 22998.00| 1| 1| 22998.00| | 102| Bob|silver| 1| 14999.00| 2| 1| 14999.00| | 103|Carol|bronze| 1| 8999.00| 3| 1| 8999.00| +-------+-----+------+-----------+------------+----+---------+-------------+


场景3:创建视图供 BI 使用

创建已支付订单视图,加入年月维度便于 BI 分析

orders.filter(F.col("status") == "paid") \ .select( F.col("order_id"), F.col("user_id"), F.col("product"), F.col("amount"), F.col("order_date"), F.year(F.to_date(F.col("order_date"))).alias("year"), F.month(F.to_date(F.col("order_date"))).alias("month"), ).create_or_replace_view("v_paid_orders")

BI 工具直接查询视图

session.table("v_paid_orders").show()


场景4:增量处理

只处理某个时间点之后的新数据,适合定时增量 ETL:

只处理 2024-01-16 之后的新订单

cutoff = "2024-01-16" new_orders = orders.filter(F.col("order_date") >= cutoff) print(f"新增订单数: {new_orders.count()}") new_orders.show()

增量写入目标表(append 模式)

new_orders.filter(F.col("status") == "paid") \ .write.save_as_table("paid_orders_archive", mode="append")


场景5:数据质量检查

在写入之前检查数据质量:

检查 NULL 值

null_counts = orders.select( F.count(F.lit(1)).alias("total"), F.sum(F.iff(F.is_null(F.col("amount")), F.lit(1), F.lit(0))).alias("null_amount"), F.sum(F.iff(F.is_null(F.col("user_id")), F.lit(1), F.lit(0))).alias("null_user_id"), ) null_counts.show()

检查状态分布

orders.group_by("status").agg( F.count(F.lit(1)).alias("cnt") ).sort("cnt", ascending=False).show()

检查金额异常(负数或超大值)

anomalies = orders.filter( (F.col("amount") <= 0) | (F.col("amount") > 100000) ) print(f"异常订单数: {anomalies.count()}")


场景6:与 SQL 混合使用

复杂逻辑可以直接用

session.sql()
session.sql()
执行,结果还是 DataFrame,可以继续链式操作:

用 SQL 执行复杂查询,返回 DataFrame 继续处理

df = session.sql(""" SELECT user_id, DATE_TRUNC('month', TO_DATE(order_date)) AS month, SUM(amount) AS monthly_amount FROM orders WHERE status = 'paid' GROUP BY user_id, DATE_TRUNC('month', TO_DATE(order_date)) """)

继续用 DataFrame API 处理

w = Window.partition_by("user_id").order_by("month") df.with_column("cumulative", F.sum("monthly_amount").over(w)).show()


注意事项

DATE 列写入:通过

session.sql()
session.sql()
INSERT 时,字符串无法隐式转换为 DATE 类型。建议:

  • 建表时将日期列设为
    STRING
    STRING
    类型,查询时用
    F.to_date()
    F.to_date()
    转换
  • 或使用
    CAST
    CAST
    显式转换:
    INSERT INTO t VALUES (CAST('2024-01-15' AS DATE))
    INSERT INTO t VALUES (CAST('2024-01-15' AS DATE))

Join 同名列歧义:两表 join 后如有同名列,

group_by
group_by
时会报错。解决方法:join 前对其中一张表的同名列用
.alias()
.alias()
重命名。

链式操作的惰性求值:Zettapark DataFrame 是惰性的,只有调用

show()
show()
collect()
collect()
count()
count()
write.save_as_table()
write.save_as_table()
等 action 时才会真正执行。


相关文档

文档说明
Zettapark DataFrame API 指南DataFrame 操作完整参考
Zettapark 常用函数参考字符串、数值、日期、条件函数
Dynamic Table基于 Zettapark 创建自动增量刷新的数据管道
Python Connector SDK标准 SQL 执行接口
联系我们
预约咨询
微信咨询
电话咨询