from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.clickzetta.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
建立源表和 Stream
建源表,开启变更追踪:
session.sql("""
CREATE TABLE IF NOT EXISTS orders (
id INT, name STRING, amount DECIMAL(10,2), status STRING
)
""").collect()
session.sql("""
ALTER TABLE orders SET PROPERTIES ('change_tracking' = 'true')
""").collect()
session.sql("INSERT INTO orders VALUES (4, 'Dave', 3000.00, 'active')").collect()
session.sql("UPDATE orders SET amount = 1500.00 WHERE id = 1").collect()
session.sql("DELETE FROM orders WHERE id = 3").collect()
session.sql("""
MERGE INTO orders_target AS t
USING orders_changes_tmp AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET
t.name = s.name, t.amount = s.amount, t.status = s.status
WHEN NOT MATCHED THEN INSERT (id, name, amount, status)
VALUES (s.id, s.name, s.amount, s.status)
""").collect()
session.sql("DROP TABLE IF EXISTS orders_changes_tmp").collect()