Lakehouse CDC 变更数据捕获指南(Table Stream)

概述

变更数据捕获(CDC)是实时数据管道的基础。云器 Lakehouse 提供

TABLE STREAM
TABLE STREAM
对象,自动跟踪表上的
INSERT
INSERT
UPDATE
UPDATE
DELETE
DELETE
操作,并以增量数据流的形式供下游消费。本指南按业务场景分类,帮助你快速掌握 Table Stream 的创建与消费方法。

涉及的 SQL 命令

命令用途适用场景
CREATE TABLE STREAM
CREATE TABLE STREAM
创建变更数据流绑定源表,开启 CDC 跟踪
SELECT * FROM stream_name
SELECT * FROM stream_name
查询增量数据消费变更数据到下游表
SHOW TABLE STREAMS
SHOW TABLE STREAMS
查看 Stream 列表监控 Stream 状态和延迟
DROP TABLE STREAM
DROP TABLE STREAM
删除 Stream清理不再使用的 CDC 对象

前置准备

以下示例使用模拟的用户表

users_cdc
users_cdc

-- 创建源表 CREATE TABLE IF NOT EXISTS users_cdc ( user_id INT, user_name STRING, status STRING ); -- 插入初始数据 INSERT INTO users_cdc VALUES (1, 'Alice', 'active'), (2, 'Bob', 'active');


创建 Table Stream

使用

CREATE TABLE STREAM
CREATE TABLE STREAM
绑定源表。Stream 会记录自创建以来或自上次消费以来的所有变更。

-- 创建 Table Stream CREATE TABLE STREAM users_cdc_stream ON TABLE users_cdc WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');


消费变更数据

查询 Stream 即可获取增量变更记录。Stream 返回的列包含原始表的所有列,以及元数据列

__change_type
__change_type
(操作类型)。

-- 模拟源表变更 INSERT INTO users_cdc VALUES (3, 'Carol', 'active'); UPDATE users_cdc SET status = 'inactive' WHERE user_id = 2; -- 查询 Stream 获取变更 SELECT *, __change_type FROM users_cdc_stream ORDER BY user_id;

结果说明

user_iduser_namestatus__change_type
2BobactiveUPDATE_BEFORE
2BobinactiveUPDATE_AFTER
3CarolactiveINSERT

消费后偏移量推进

Table Stream 的偏移量会在下游 DML 操作消费 Stream 后自动推进。这意味着一旦你将 Stream 数据插入到目标表,Stream 的游标会前进,下次查询将只返回新的变更。

-- 创建目标表 CREATE TABLE IF NOT EXISTS users_sync ( user_id INT, user_name STRING, status STRING, sync_time TIMESTAMP ); -- 消费 Stream 并写入目标表(自动推进偏移量) INSERT INTO users_sync SELECT user_id, user_name, status, CURRENT_TIMESTAMP() FROM users_cdc_stream; -- 再次查询 Stream(应为空,因为偏移量已推进) SELECT COUNT(*) FROM users_cdc_stream;

结果说明

COUNT(*)
0

查看 Stream 信息

使用

SHOW TABLE STREAMS
SHOW TABLE STREAMS
查看 Stream 的状态、绑定表和消费延迟。

-- 查看 Stream 列表 SHOW TABLE STREAMS LIKE 'users_cdc_stream';

关键字段说明

  • table_name
    table_name
    :绑定的源表
  • mode
    mode
    :消费模式(STANDARD / APPEND_ONLY)
  • stale_after
    stale_after
    :Stream 过期时间(超过此时间未消费将失效)

删除 Table Stream

使用

DROP TABLE STREAM
DROP TABLE STREAM
删除不再需要的 Stream 对象。

-- 删除 Stream DROP TABLE STREAM users_cdc_stream;


清理测试数据

完成 CDC 验证后,建议清理测试表:

-- 删除测试表 DROP TABLE IF EXISTS users_cdc; DROP TABLE IF EXISTS users_cdc_stream; DROP TABLE IF EXISTS users_sync;


注意事项

  1. 偏移量推进机制:Stream 偏移量仅在 DML 语句(如
    INSERT INTO ... SELECT FROM stream
    INSERT INTO ... SELECT FROM stream
    )消费时推进。纯
    SELECT
    SELECT
    查询不会推进偏移量。
  2. 过期时间:Stream 依赖 Time Travel 保留周期。如果超过
    data_retention_days
    data_retention_days
    未消费,Stream 将变为
    STALE
    STALE
    状态,无法继续读取。
  3. APPEND_ONLY 模式:如果源表仅追加(无 UPDATE/DELETE),可创建
    APPEND_ONLY
    APPEND_ONLY
    模式的 Stream,性能更优:
    CREATE TABLE STREAM ... WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY')
    CREATE TABLE STREAM ... WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY')
  4. 动态表消费:Dynamic Table 可以基于 Table Stream 构建增量管道,实现端到端实时数仓。

相关文档

联系我们
预约咨询
微信咨询
电话咨询