Lakehouse CDC 变更数据捕获指南(Table Stream)
概述
变更数据捕获(CDC)是实时数据管道的基础。云器 Lakehouse 提供
TABLE STREAMTABLE STREAM
对象,自动跟踪表上的
INSERTINSERT
、
UPDATEUPDATE
和
DELETEDELETE
操作,并以增量数据流的形式供下游消费。本指南按业务场景分类,帮助你快速掌握 Table Stream 的创建与消费方法。
涉及的 SQL 命令
命令 用途 适用场景 CREATE TABLE STREAMCREATE TABLE STREAM
创建变更数据流 绑定源表,开启 CDC 跟踪 SELECT * FROM stream_nameSELECT * FROM stream_name
查询增量数据 消费变更数据到下游表 SHOW TABLE STREAMSSHOW TABLE STREAMS
查看 Stream 列表 监控 Stream 状态和延迟 DROP TABLE STREAMDROP TABLE STREAM
删除 Stream 清理不再使用的 CDC 对象
前置准备
以下示例使用模拟的用户表
users_cdcusers_cdc
:
-- 创建源表
CREATE TABLE IF NOT EXISTS users_cdc (
user_id INT,
user_name STRING,
status STRING
);
-- 插入初始数据
INSERT INTO users_cdc VALUES
(1, 'Alice', 'active'),
(2, 'Bob', 'active');
创建 Table Stream
使用
CREATE TABLE STREAMCREATE TABLE STREAM
绑定源表。Stream 会记录自创建以来或自上次消费以来的所有变更。
-- 创建 Table Stream
CREATE TABLE STREAM users_cdc_stream ON TABLE users_cdc
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
💡 提示 :创建 Stream 后,对源表的任何
INSERTINSERT
、
UPDATEUPDATE
、
DELETEDELETE
操作都会被记录。
消费变更数据
查询 Stream 即可获取增量变更记录。Stream 返回的列包含原始表的所有列,以及元数据列
__change_type__change_type
(操作类型)。
-- 模拟源表变更
INSERT INTO users_cdc VALUES (3, 'Carol', 'active');
UPDATE users_cdc SET status = 'inactive' WHERE user_id = 2;
-- 查询 Stream 获取变更
SELECT *, __change_type
FROM users_cdc_stream
ORDER BY user_id;
结果说明 :
user_id user_name status __change_type 2 Bob active UPDATE_BEFORE 2 Bob inactive UPDATE_AFTER 3 Carol active INSERT
⚠️ 注意 :
__change_type__change_type
的值包括
INSERTINSERT
、
UPDATE_AFTERUPDATE_AFTER
、
UPDATE_BEFOREUPDATE_BEFORE
、
DELETEDELETE
。Stream 还会返回
__commit_version__commit_version
和
__commit_timestamp__commit_timestamp
元数据列。
消费后偏移量推进
Table Stream 的偏移量会在下游 DML 操作消费 Stream 后自动推进 。这意味着一旦你将 Stream 数据插入到目标表,Stream 的游标会前进,下次查询将只返回新的变更。
-- 创建目标表
CREATE TABLE IF NOT EXISTS users_sync (
user_id INT,
user_name STRING,
status STRING,
sync_time TIMESTAMP
);
-- 消费 Stream 并写入目标表(自动推进偏移量)
INSERT INTO users_sync
SELECT user_id, user_name, status, CURRENT_TIMESTAMP()
FROM users_cdc_stream;
-- 再次查询 Stream(应为空,因为偏移量已推进)
SELECT COUNT(*) FROM users_cdc_stream;
结果说明 :
💡 提示 :如果仅执行
SELECTSELECT
而不进行 DML 消费,偏移量不会推进,下次查询仍会返回相同数据。
查看 Stream 信息
使用
SHOW TABLE STREAMSSHOW TABLE STREAMS
查看 Stream 的状态、绑定表和消费延迟。
-- 查看 Stream 列表
SHOW TABLE STREAMS LIKE 'users_cdc_stream';
关键字段说明 :
table_nametable_name
:绑定的源表
modemode
:消费模式(STANDARD / APPEND_ONLY)
stale_afterstale_after
:Stream 过期时间(超过此时间未消费将失效)
删除 Table Stream
使用
DROP TABLE STREAMDROP TABLE STREAM
删除不再需要的 Stream 对象。
-- 删除 Stream
DROP TABLE STREAM users_cdc_stream;
💡 提示 :删除 Stream 不会影响源表数据,但会丢失未消费的变更记录。
清理测试数据
完成 CDC 验证后,建议清理测试表:
-- 删除测试表
DROP TABLE IF EXISTS users_cdc;
DROP TABLE IF EXISTS users_cdc_stream;
DROP TABLE IF EXISTS users_sync;
💡 提示 :Lakehouse 支持
UNDROP TABLEUNDROP TABLE
,误删后可在保留期内恢复。
注意事项
偏移量推进机制 :Stream 偏移量仅在 DML 语句(如 INSERT INTO ... SELECT FROM streamINSERT INTO ... SELECT FROM stream
)消费时推进。纯 SELECTSELECT
查询不会推进偏移量。
过期时间 :Stream 依赖 Time Travel 保留周期。如果超过 data_retention_daysdata_retention_days
未消费,Stream 将变为 STALESTALE
状态,无法继续读取。
APPEND_ONLY 模式 :如果源表仅追加(无 UPDATE/DELETE),可创建 APPEND_ONLYAPPEND_ONLY
模式的 Stream,性能更优:CREATE TABLE STREAM ... WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY')CREATE TABLE STREAM ... WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY')
。
动态表消费 :Dynamic Table 可以基于 Table Stream 构建增量管道,实现端到端实时数仓。
相关文档