Zettapark 创建 Dynamic Table
Dynamic Table 是云器 Lakehouse 的增量计算对象——你用 DataFrame 定义转换逻辑,系统自动感知上游变化并增量刷新结果,无需手写调度任务。Zettapark 通过
create_or_replace_dynamic_table()
create_or_replace_dynamic_table()
直接从 DataFrame 创建 Dynamic Table。
前置准备
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.clickzetta.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
建源表并插入测试数据:
session.sql("""
CREATE TABLE IF NOT EXISTS orders (
id INT, user_id INT, category STRING,
amount DECIMAL(10,2), status STRING, order_date STRING
)
""").collect()
session.sql("""
INSERT INTO orders VALUES
(1, 101, 'A', 100.0, 'paid', '2024-01-15'),
(2, 102, 'A', 200.0, 'paid', '2024-01-15'),
(3, 101, 'B', 300.0, 'pending', '2024-01-16'),
(4, 103, 'B', 150.0, 'paid', '2024-01-16')
""").collect()
创建 Dynamic Table
聚合汇总
source = session.table("orders")
定义转换逻辑:
agg_df = source.group_by("category").agg(
F.count(F.col("id")).alias("order_count"),
F.sum(F.col("amount")).alias("total_amount"),
F.avg(F.col("amount")).alias("avg_amount")
)
创建 Dynamic Table,每分钟自动刷新:
agg_df.create_or_replace_dynamic_table(
"orders_summary",
lag="1 minute", # 刷新间隔
warehouse="default" # 使用的计算集群
)
session.table("orders_summary").show()
+--------+-----------+------------+----------+
|category|order_count|total_amount|avg_amount|
+--------+-----------+------------+----------+
| A| 2| 300.00| 150.00|
| B| 2| 450.00| 225.00|
+--------+-----------+------------+----------+
过滤 + 计算列
filtered_df = source \
.filter(F.col("amount") > 150) \
.with_column("amount_tax", F.col("amount") * 1.13) \
.with_column("year", F.year(F.to_date(F.col("order_date"))))
filtered_df.create_or_replace_dynamic_table(
"orders_filtered",
lag="5 minutes",
warehouse="default"
)
多表 Join
先建 users 表:
session.sql("""
CREATE TABLE IF NOT EXISTS users (
user_id INT, name STRING, city STRING
)
""").collect()
session.sql("""
INSERT INTO users VALUES
(101, 'Alice', 'Beijing'),
(102, 'Bob', 'Shanghai'),
(103, 'Carol', 'Guangzhou')
""").collect()
orders = session.table("orders")
users = session.table("users")
paid = orders.filter(F.col("status") == "paid") \
.select(F.col("id"), orders["user_id"].alias("o_uid"),
F.col("amount"), F.col("order_date"))
join 时先消除同名列歧义:
joined = paid.join(users, paid["o_uid"] == users["user_id"]) \
.select("user_id", "name", "amount", "order_date")
joined.create_or_replace_dynamic_table(
"paid_orders_with_user",
lag="1 minute",
warehouse="default"
)
管理 Dynamic Table
查看结构
session.sql("DESC DYNAMIC TABLE orders_summary").collect()
暂停与恢复
暂停自动刷新:
session.sql("ALTER DYNAMIC TABLE orders_summary SUSPEND").collect()
恢复自动刷新:
session.sql("ALTER DYNAMIC TABLE orders_summary RESUME").collect()
手动触发刷新
session.sql("ALTER DYNAMIC TABLE orders_summary REFRESH").collect()
删除
session.sql("DROP DYNAMIC TABLE IF EXISTS orders_summary").collect()
Dynamic Table vs 手工调度任务
| 对比项 | Dynamic Table | Studio SQL 任务 + 调度 |
|---|
| 定义方式 | DataFrame API 或 SQL | SQL 脚本 |
| 刷新触发 | 系统自动感知上游变化 | Cron 定时或手动触发 |
| 增量计算 | 系统自动选择增量/全量 | 需手写增量逻辑 |
| 依赖管理 | 自动识别上下游依赖 | 手动配置任务依赖 |
| 适用场景 | 持续刷新的聚合/转换结果 | 复杂业务逻辑、需要精确控制执行时机 |
注意事项
相关文档