客户数据变更追踪:用 Table Stream 留存完整变更历史

业务背景

金融、医疗、电商等行业都面临合规要求:关键业务数据的每一次变更,必须留有可追溯的记录——谁改了什么、改成了什么、什么时候改的。

典型场景:

  • 金融合规:客户信用等级调整必须留存变更前后值,供监管审查
  • 电商售后:订单状态变更历史,用于纠纷仲裁和退款核查
  • HR 系统:薪资、职级调整记录,供审计和劳动仲裁使用
  • 医疗系统:患者信息修改日志,满足 HIPAA 等数据保护法规

传统做法是在应用层写双写逻辑——每次 UPDATE 同时写一条审计记录。这种方式的问题是:应用层代码容易遗漏、重构时容易丢失、无法捕获直接在数据库执行的 SQL 变更。

Table Stream STANDARD 模式在数据库层面捕获所有 DML 变更,包括

UPDATE_BEFORE
UPDATE_BEFORE
(变更前旧值)和
UPDATE_AFTER
UPDATE_AFTER
(变更后新值),不依赖应用层代码,不会遗漏。

涉及的 SQL 命令

命令用途
ALTER TABLE ... SET PROPERTIES ('change_tracking' = 'true')
ALTER TABLE ... SET PROPERTIES ('change_tracking' = 'true')
开启变更追踪(STANDARD 模式必须)
CREATE TABLE STREAM ... TABLE_STREAM_MODE = 'STANDARD'
CREATE TABLE STREAM ... TABLE_STREAM_MODE = 'STANDARD'
创建捕获全量变更的 Stream
SELECT ... FROM stream
SELECT ... FROM stream
查看变更(不推进 offset)
INSERT INTO ... SELECT FROM stream
INSERT INTO ... SELECT FROM stream
消费变更并推进 offset
DROP TABLE STREAM
DROP TABLE STREAM
清理 Stream

数据架构

外部数据源(业务数据库 / 应用系统 / ...) │ 实时写入 ▼ doc_customers(客户信息表) │ │ 变更自动捕获 ▼ doc_customers_stream(Table Stream,STANDARD 模式) │ │ 消费(INSERT INTO ... SELECT) ▼ doc_customer_audit_log(审计日志表)

底表数据如何实时写入

本文用

INSERT INTO
INSERT INTO
模拟客户信息的增删改,方便你在测试环境快速复现。生产环境中,客户数据通常来自 CRM、ERP 等业务系统,Lakehouse 提供多种方式持续同步到底表:

数据来源推荐方式说明参考文档
MySQL / PostgreSQL / Oracle 等Studio 实时同步任务(CDC)捕获源库的 binlog,毫秒级延迟同步到 Lakehouse实时同步任务
多张业务表(如订单、客户、商品)Studio 多表实时同步一次配置同步整库,自动处理表结构变更多表实时同步任务
Kafka 事件流Pipe 持续导入适合应用层已将变更事件发到 Kafka 的场景借助 read_kafka 函数持续导入
离线批量同步Studio 离线同步任务适合 T+1 或按小时同步的场景离线同步任务

前置准备

建客户信息表并开启变更追踪

CREATE TABLE IF NOT EXISTS doc_customers ( customer_id STRING, name STRING, phone STRING, email STRING, credit_level STRING, updated_at TIMESTAMP );

ALTER TABLE doc_customers SET PROPERTIES ('change_tracking' = 'true');

插入初始客户数据

INSERT INTO doc_customers VALUES ('C001', '张伟', '13800001111', 'zhangwei@example.com', 'gold', CAST('2026-01-10 09:00:00' AS TIMESTAMP)), ('C002', '李娜', '13800002222', 'lina@example.com', 'silver', CAST('2026-01-12 10:30:00' AS TIMESTAMP)), ('C003', '王芳', '13800003333', 'wangfang@example.com', 'gold', CAST('2026-01-15 14:00:00' AS TIMESTAMP)), ('C004', '赵磊', '13800004444', 'zhaolei@example.com', 'bronze', CAST('2026-02-01 08:00:00' AS TIMESTAMP)), ('C005', '陈静', '13800005555', 'chenjing@example.com', 'silver', CAST('2026-02-05 11:00:00' AS TIMESTAMP));

创建 Table Stream 和审计日志表

-- 创建 STANDARD 模式 Stream(只捕获建 Stream 之后的变更) CREATE TABLE STREAM IF NOT EXISTS doc_customers_stream ON TABLE doc_customers WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

CREATE TABLE IF NOT EXISTS doc_customer_audit_log ( audit_id STRING, customer_id STRING, change_type STRING, field_name STRING, old_value STRING, new_value STRING, commit_version BIGINT, commit_time TIMESTAMP, recorded_at TIMESTAMP );

场景一:捕获客户信息变更

模拟四类典型业务操作:信用等级调整、手机号更新、新增客户、注销客户。

-- 信用等级升级 UPDATE doc_customers SET credit_level = 'platinum', updated_at = CAST('2026-05-28 19:00:00' AS TIMESTAMP) WHERE customer_id = 'C001'; -- 手机号变更 UPDATE doc_customers SET phone = '13900009999', updated_at = CAST('2026-05-28 19:05:00' AS TIMESTAMP) WHERE customer_id = 'C002'; -- 新增客户 INSERT INTO doc_customers VALUES ('C006', '刘洋', '13800006666', 'liuyang@example.com', 'silver', CAST('2026-05-28 19:10:00' AS TIMESTAMP)); -- 注销客户 DELETE FROM doc_customers WHERE customer_id = 'C004';

查看 Stream 捕获的原始变更

SELECT __change_type, __commit_version, __commit_timestamp, customer_id, name, phone, credit_level FROM doc_customers_stream ORDER BY __commit_version, customer_id, __change_type;

+---------------+------------------+-------------------------+------------+------+-------------+-------------+ |__change_type |__commit_version |__commit_timestamp |customer_id |name |phone |credit_level | +---------------+------------------+-------------------------+------------+------+-------------+-------------+ |UPDATE_BEFORE |3 |2026-05-28T18:51:47.209 |C001 |张伟 |138****1111 |gold | |UPDATE_BEFORE |3 |2026-05-28T18:51:47.209 |C002 |李娜 |138****2222 |silver | |DELETE |3 |2026-05-28T18:51:47.209 |C004 |赵磊 |138****4444 |bronze | |UPDATE_AFTER |4 |2026-05-28T18:52:12.318 |C001 |张伟 |138****1111 |platinum | |UPDATE_AFTER |5 |2026-05-28T18:52:14.061 |C002 |李娜 |139****9999 |silver | |INSERT |6 |2026-05-28T18:52:16.225 |C006 |刘洋 |138****6666 |silver | +---------------+------------------+-------------------------+------------+------+-------------+-------------+

结果解读:

  • C001
    C001
    UPDATE_BEFORE
    UPDATE_BEFORE
    (旧值 gold)+
    UPDATE_AFTER
    UPDATE_AFTER
    (新值 platinum),完整记录变更前后
  • C002
    C002
    :手机号从
    138****2222
    138****2222
    变为
    139****9999
    139****9999
    ,同样有前后两行
  • C004
    C004
    :只有
    DELETE
    DELETE
    一行,记录被删除时的数据快照
  • C006
    C006
    :只有
    INSERT
    INSERT
    一行,记录新增时的初始值

场景二:消费变更,写入审计日志

INSERT INTO doc_customer_audit_log SELECT CONCAT(customer_id, '_', CAST(__commit_version AS STRING), '_', __change_type) AS audit_id, customer_id, __change_type AS change_type, 'full_row' AS field_name, NULL AS old_value, NULL AS new_value, __commit_version AS commit_version, __commit_timestamp AS commit_time, current_timestamp() AS recorded_at FROM doc_customers_stream;

执行后 Stream offset 自动推进,再次查询 Stream 返回空结果:

SELECT COUNT(*) AS remaining FROM doc_customers_stream; -- remaining = 0

查询审计日志:

SELECT audit_id, customer_id, change_type, commit_version, commit_time FROM doc_customer_audit_log ORDER BY commit_version, customer_id, change_type;

+----------------------+------------+---------------+----------------+-------------------------+ |audit_id |customer_id |change_type |commit_version |commit_time | +----------------------+------------+---------------+----------------+-------------------------+ |C001_3_UPDATE_BEFORE |C001 |UPDATE_BEFORE |3 |2026-05-28T18:51:47.209 | |C002_3_UPDATE_BEFORE |C002 |UPDATE_BEFORE |3 |2026-05-28T18:51:47.209 | |C004_3_DELETE |C004 |DELETE |3 |2026-05-28T18:51:47.209 | |C001_4_UPDATE_AFTER |C001 |UPDATE_AFTER |4 |2026-05-28T18:52:12.318 | |C002_5_UPDATE_AFTER |C002 |UPDATE_AFTER |5 |2026-05-28T18:52:14.061 | |C006_6_INSERT |C006 |INSERT |6 |2026-05-28T18:52:16.225 | +----------------------+------------+---------------+----------------+-------------------------+

场景三:持续消费,只处理新变更

消费后再次发生变更,Stream 只包含新一批数据,不会重复出现已消费的记录。

-- 第二轮变更 UPDATE doc_customers SET credit_level = 'bronze', updated_at = CAST('2026-05-28 19:20:00' AS TIMESTAMP) WHERE customer_id = 'C003'; UPDATE doc_customers SET email = 'liuyang_new@example.com', updated_at = CAST('2026-05-28 19:25:00' AS TIMESTAMP) WHERE customer_id = 'C006';

查询 Stream,只有本轮 4 条变更:

SELECT __change_type, __commit_version, customer_id, credit_level, email FROM doc_customers_stream ORDER BY __commit_version, customer_id, __change_type;

+---------------+------------------+------------+-------------+------------------+ |__change_type |__commit_version |customer_id |credit_level |email | +---------------+------------------+------------+-------------+------------------+ |UPDATE_BEFORE |3 |C003 |gold |w***@example.com | |UPDATE_BEFORE |6 |C006 |silver |l***@example.com | |UPDATE_AFTER |8 |C003 |bronze |w***@example.com | |UPDATE_AFTER |9 |C006 |silver |l***@example.com | +---------------+------------------+------------+-------------+------------------+

第一轮已消费的 C001、C002、C004、C006 的变更不再出现。再次消费写入审计日志后,完整历史共 10 条记录。

清理资源

DROP TABLE STREAM IF EXISTS doc_customers_stream; DROP TABLE IF EXISTS doc_customer_audit_log; DROP TABLE IF EXISTS doc_customers;

关键点总结

  • STANDARD 模式前置条件:源表必须先执行
    ALTER TABLE ... SET PROPERTIES ('change_tracking' = 'true')
    ALTER TABLE ... SET PROPERTIES ('change_tracking' = 'true')
    ,否则创建 Stream 后无法捕获变更
  • UPDATE 产生两行:每次 UPDATE 在 Stream 中产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    (旧值)和
    UPDATE_AFTER
    UPDATE_AFTER
    (新值)两条记录,可精确对比变更前后
  • SELECT 不消费:查询 Stream 不推进 offset,可以反复查看;只有包含 Stream 的 DML(
    INSERT INTO
    INSERT INTO
    MERGE INTO
    MERGE INTO
    )才推进 offset
  • offset 推进后不重复:消费后 Stream 清空,下次只看到新变更,天然防止重复消费
  • __commit_version
    __commit_version
    全局递增
    :可用于排序、去重和追溯变更顺序

相关文档

联系我们
预约咨询
微信咨询
电话咨询