Zettapark 消费 Table Stream(CDC 增量处理)

Table Stream 记录表上发生的增量变更(INSERT / UPDATE / DELETE)。Zettapark 可以直接用

session.table()
session.table()
读取 Stream,将变更数据作为 DataFrame 处理,构建增量 ETL 管道。


前置准备

from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()


建立源表和 Stream

建源表,开启变更追踪:

session.sql(""" CREATE TABLE IF NOT EXISTS orders ( id INT, name STRING, amount DECIMAL(10,2), status STRING ) """).collect() session.sql(""" ALTER TABLE orders SET PROPERTIES ('change_tracking' = 'true') """).collect()

插入初始数据:

session.sql(""" INSERT INTO orders VALUES (1, 'Alice', 1000.00, 'active'), (2, 'Bob', 2000.00, 'active'), (3, 'Carol', 500.00, 'active') """).collect()

创建 Table Stream(STANDARD 模式,捕获 INSERT/UPDATE/DELETE):

session.sql(""" CREATE TABLE STREAM orders_stream ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD') """).collect()


读取 Stream 变更

session.table()
session.table()
直接读取 Stream,返回 DataFrame:

stream_df = session.table("orders_stream") stream_df.printSchema()

Stream DataFrame 包含以下元数据列:

列名类型说明
__change_type
__change_type
STRING变更类型:
INSERT
INSERT
/
UPDATE_BEFORE
UPDATE_BEFORE
/
UPDATE_AFTER
UPDATE_AFTER
/
DELETE
DELETE
__commit_version
__commit_version
BIGINT提交版本号
__commit_timestamp
__commit_timestamp
TIMESTAMP提交时间
原表所有列变更行的数据

产生变更并消费

产生变更:

session.sql("INSERT INTO orders VALUES (4, 'Dave', 3000.00, 'active')").collect() session.sql("UPDATE orders SET amount = 1500.00 WHERE id = 1").collect() session.sql("DELETE FROM orders WHERE id = 3").collect()

读取所有变更:

stream_df = session.table("orders_stream") stream_df.show()

+-------------+----------------+--------------------+---+-----+-------+------+ |__change_type|__commit_version| __commit_timestamp| id| name| amount|status| +-------------+----------------+--------------------+---+-----+-------+------+ | UPDATE_AFTER| 5|2024-01-15 10:00:...| 1|Alice|1500.00|active| | INSERT| 4|2024-01-15 10:00:...| 4| Dave|3000.00|active| |UPDATE_BEFORE| 3|2024-01-15 09:59:...| 1|Alice|1000.00|active| | DELETE| 3|2024-01-15 09:59:...| 3|Carol| 500.00|active| +-------------+----------------+--------------------+---+-----+-------+------+


按变更类型过滤

只取新增和更新后的数据(忽略 UPDATE_BEFORE 和 DELETE):

inserts_and_updates = stream_df.filter( F.col("`__change_type`").isin(["INSERT", "UPDATE_AFTER"]) ).select("id", "name", "amount", "status")

只取删除的数据:

deletes = stream_df.filter( F.col("`__change_type`") == "DELETE" ).select("id")


完整 CDC 增量管道

将 Stream 变更写入目标表,实现增量同步:

建目标表:

session.sql(""" CREATE TABLE IF NOT EXISTS orders_target ( id INT, name STRING, amount DECIMAL(10,2), status STRING ) """).collect()

读取 Stream,过滤出有效变更:

stream_df = session.table("orders_stream") changes = stream_df.filter( F.col("`__change_type`").isin(["INSERT", "UPDATE_AFTER"]) ).select("id", "name", "amount", "status")

写入目标表:

changes.write.save_as_table("orders_target", mode="append")

消费完成后,Stream offset 自动推进,下次读取 Stream 时只返回新的变更。

生产环境推荐用 MERGE INTO 实现 upsert 语义:

将 Stream 变更写入临时表,再用 SQL MERGE INTO 合并:

session.sql(""" MERGE INTO orders_target AS t USING orders_changes_tmp AS s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.name = s.name, t.amount = s.amount, t.status = s.status WHEN NOT MATCHED THEN INSERT (id, name, amount, status) VALUES (s.id, s.name, s.amount, s.status) """).collect() session.sql("DROP TABLE IF EXISTS orders_changes_tmp").collect()


注意事项

  • Stream offset 推进:每次读取 Stream 并执行 action(
    collect()
    collect()
    save_as_table()
    save_as_table()
    等)后,offset 自动推进,下次读取只返回新变更
  • UPDATE 产生两条记录
    UPDATE_BEFORE
    UPDATE_BEFORE
    (更新前)和
    UPDATE_AFTER
    UPDATE_AFTER
    (更新后),通常只需要
    UPDATE_AFTER
    UPDATE_AFTER
  • Stream 不存储数据:Stream 是游标对象,数据仍在源表,不占用额外存储
  • 建表时不能开启 change_tracking:必须建表后用
    ALTER TABLE SET PROPERTIES
    ALTER TABLE SET PROPERTIES
    开启

相关文档

文档说明
Table StreamTable Stream 概念和 SQL 语法
Zettapark 数据工程实战多表 join、窗口函数等场景
Dynamic Table自动增量刷新,替代手工调度
联系我们
预约咨询
微信咨询
电话咨询