CREATE TABLE IF NOT EXISTS doc_customers (
customer_id STRING,
name STRING,
phone STRING,
email STRING,
credit_level STRING,
updated_at TIMESTAMP
);
⚠️ 注意:STANDARD 模式的 Table Stream 要求源表开启
change_tracking
change_tracking
,必须在创建 Stream 之前执行:
ALTER TABLE doc_customers SET PROPERTIES ('change_tracking' = 'true');
插入初始客户数据
INSERT INTO doc_customers VALUES
('C001', '张伟', '13800001111', 'zhangwei@example.com', 'gold', CAST('2026-01-10 09:00:00' AS TIMESTAMP)),
('C002', '李娜', '13800002222', 'lina@example.com', 'silver', CAST('2026-01-12 10:30:00' AS TIMESTAMP)),
('C003', '王芳', '13800003333', 'wangfang@example.com', 'gold', CAST('2026-01-15 14:00:00' AS TIMESTAMP)),
('C004', '赵磊', '13800004444', 'zhaolei@example.com', 'bronze', CAST('2026-02-01 08:00:00' AS TIMESTAMP)),
('C005', '陈静', '13800005555', 'chenjing@example.com', 'silver', CAST('2026-02-05 11:00:00' AS TIMESTAMP));
创建 Table Stream 和审计日志表
-- 创建 STANDARD 模式 Stream(只捕获建 Stream 之后的变更)
CREATE TABLE STREAM IF NOT EXISTS doc_customers_stream
ON TABLE doc_customers
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 信用等级升级
UPDATE doc_customers
SET credit_level = 'platinum',
updated_at = CAST('2026-05-28 19:00:00' AS TIMESTAMP)
WHERE customer_id = 'C001';
-- 手机号变更
UPDATE doc_customers
SET phone = '13900009999',
updated_at = CAST('2026-05-28 19:05:00' AS TIMESTAMP)
WHERE customer_id = 'C002';
-- 新增客户
INSERT INTO doc_customers VALUES
('C006', '刘洋', '13800006666', 'liuyang@example.com', 'silver',
CAST('2026-05-28 19:10:00' AS TIMESTAMP));
-- 注销客户
DELETE FROM doc_customers WHERE customer_id = 'C004';
查看 Stream 捕获的原始变更
SELECT __change_type, __commit_version, __commit_timestamp,
customer_id, name, phone, credit_level
FROM doc_customers_stream
ORDER BY __commit_version, customer_id, __change_type;
INSERT INTO doc_customer_audit_log
SELECT
CONCAT(customer_id, '_', CAST(__commit_version AS STRING), '_', __change_type) AS audit_id,
customer_id,
__change_type AS change_type,
'full_row' AS field_name,
NULL AS old_value,
NULL AS new_value,
__commit_version AS commit_version,
__commit_timestamp AS commit_time,
current_timestamp() AS recorded_at
FROM doc_customers_stream;
执行后 Stream offset 自动推进,再次查询 Stream 返回空结果:
SELECT COUNT(*) AS remaining FROM doc_customers_stream;
-- remaining = 0
查询审计日志:
SELECT audit_id, customer_id, change_type, commit_version, commit_time
FROM doc_customer_audit_log
ORDER BY commit_version, customer_id, change_type;
-- 第二轮变更
UPDATE doc_customers
SET credit_level = 'bronze',
updated_at = CAST('2026-05-28 19:20:00' AS TIMESTAMP)
WHERE customer_id = 'C003';
UPDATE doc_customers
SET email = 'liuyang_new@example.com',
updated_at = CAST('2026-05-28 19:25:00' AS TIMESTAMP)
WHERE customer_id = 'C006';
查询 Stream,只有本轮 4 条变更:
SELECT __change_type, __commit_version, customer_id, credit_level, email
FROM doc_customers_stream
ORDER BY __commit_version, customer_id, __change_type;