Zettapark 创建 Dynamic Table

Dynamic Table 是云器 Lakehouse 的增量计算对象——你用 DataFrame 定义转换逻辑,系统自动感知上游变化并增量刷新结果,无需手写调度任务。Zettapark 通过

create_or_replace_dynamic_table()
create_or_replace_dynamic_table()
直接从 DataFrame 创建 Dynamic Table。


前置准备

from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()

建源表并插入测试数据:

session.sql(""" CREATE TABLE IF NOT EXISTS orders ( id INT, user_id INT, category STRING, amount DECIMAL(10,2), status STRING, order_date STRING ) """).collect() session.sql(""" INSERT INTO orders VALUES (1, 101, 'A', 100.0, 'paid', '2024-01-15'), (2, 102, 'A', 200.0, 'paid', '2024-01-15'), (3, 101, 'B', 300.0, 'pending', '2024-01-16'), (4, 103, 'B', 150.0, 'paid', '2024-01-16') """).collect()


创建 Dynamic Table

聚合汇总

source = session.table("orders")

定义转换逻辑:

agg_df = source.group_by("category").agg( F.count(F.col("id")).alias("order_count"), F.sum(F.col("amount")).alias("total_amount"), F.avg(F.col("amount")).alias("avg_amount") )

创建 Dynamic Table,每分钟自动刷新:

agg_df.create_or_replace_dynamic_table( "orders_summary", lag="1 minute", # 刷新间隔 warehouse="default" # 使用的计算集群 ) session.table("orders_summary").show()

+--------+-----------+------------+----------+ |category|order_count|total_amount|avg_amount| +--------+-----------+------------+----------+ | A| 2| 300.00| 150.00| | B| 2| 450.00| 225.00| +--------+-----------+------------+----------+

过滤 + 计算列

filtered_df = source \ .filter(F.col("amount") > 150) \ .with_column("amount_tax", F.col("amount") * 1.13) \ .with_column("year", F.year(F.to_date(F.col("order_date")))) filtered_df.create_or_replace_dynamic_table( "orders_filtered", lag="5 minutes", warehouse="default" )

多表 Join

先建 users 表:

session.sql(""" CREATE TABLE IF NOT EXISTS users ( user_id INT, name STRING, city STRING ) """).collect() session.sql(""" INSERT INTO users VALUES (101, 'Alice', 'Beijing'), (102, 'Bob', 'Shanghai'), (103, 'Carol', 'Guangzhou') """).collect()

orders = session.table("orders") users = session.table("users") paid = orders.filter(F.col("status") == "paid") \ .select(F.col("id"), orders["user_id"].alias("o_uid"), F.col("amount"), F.col("order_date"))

join 时先消除同名列歧义:

joined = paid.join(users, paid["o_uid"] == users["user_id"]) \ .select("user_id", "name", "amount", "order_date") joined.create_or_replace_dynamic_table( "paid_orders_with_user", lag="1 minute", warehouse="default" )


管理 Dynamic Table

查看结构

session.sql("DESC DYNAMIC TABLE orders_summary").collect()

暂停与恢复

暂停自动刷新:

session.sql("ALTER DYNAMIC TABLE orders_summary SUSPEND").collect()

恢复自动刷新:

session.sql("ALTER DYNAMIC TABLE orders_summary RESUME").collect()

手动触发刷新

session.sql("ALTER DYNAMIC TABLE orders_summary REFRESH").collect()

删除

session.sql("DROP DYNAMIC TABLE IF EXISTS orders_summary").collect()


Dynamic Table vs 手工调度任务

对比项Dynamic TableStudio SQL 任务 + 调度
定义方式DataFrame API 或 SQLSQL 脚本
刷新触发系统自动感知上游变化Cron 定时或手动触发
增量计算系统自动选择增量/全量需手写增量逻辑
依赖管理自动识别上下游依赖手动配置任务依赖
适用场景持续刷新的聚合/转换结果复杂业务逻辑、需要精确控制执行时机

注意事项

  • lag
    lag
    参数支持
    "1 minute"
    "1 minute"
    "5 minutes"
    "5 minutes"
    "1 hour"
    "1 hour"
    等格式,最小 1 分钟
  • Dynamic Table 首次创建时数据为空,等待第一次刷新后才有数据
  • 修改 Dynamic Table 的定义需用
    create_or_replace_dynamic_table
    create_or_replace_dynamic_table
    (会重建)
  • 上游表需要开启
    change_tracking
    change_tracking
    才能支持增量刷新

相关文档

文档说明
Dynamic Table 概述Dynamic Table 的完整概念和 SQL 语法
Zettapark 消费 Table Stream用 Zettapark 处理 CDC 增量变更
Zettapark 数据工程实战多表 join、窗口函数等场景
联系我们
预约咨询
微信咨询
电话咨询