Table Stream
Table Stream 是 Lakehouse 的变更数据捕获(CDC)机制,用于捕获表上发生的 INSERT、UPDATE、DELETE 变更,供下游任务消费。
类比:Table Stream 像表的"变更日志订阅"——你订阅一张表的 Stream,每次通过 DML 消费后,位点自动推进,下次只拿到新产生的变更记录。

核心概念
Offset(消费位点):Stream 记录上次消费到的位置。每次通过 DML 消费 Stream 后,offset 自动推进;仅执行
SELECT
SELECT
不会推进 offset。
变更类型列:Stream 查询结果包含
__change_type
__change_type
元数据列,值为
INSERT
INSERT
、
UPDATE_BEFORE
UPDATE_BEFORE
、
UPDATE_AFTER
UPDATE_AFTER
、
DELETE
DELETE
。
STANDARD 模式:捕获所有 DML 变更(INSERT + UPDATE + DELETE),UPDATE 会产生
UPDATE_BEFORE
UPDATE_BEFORE
(旧值)和
UPDATE_AFTER
UPDATE_AFTER
(新值)两行记录。
APPEND_ONLY 模式:仅捕获 INSERT 操作,不记录 UPDATE 和 DELETE,性能更轻量。
⚠️ 重要:即使 WHERE 条件过滤了部分行,Stream 的 offset 仍会在 DML 消费后推进。被过滤掉的变更数据会永久丢失,无法重新消费。
适用场景与选型
适用场景
| 场景 | 原因 |
|---|
| 增量 ETL 加工 | 仅处理源表新增/变更数据,避免全表扫描 |
| 实时数据同步 | 将 Lakehouse 表变更同步到下游系统(如 ES、ClickHouse) |
| 审计与合规 | 记录数据变更历史,满足审计要求 |
不适用场景
| 场景 | 推荐替代方案 | 原因 |
|---|
| 持续从 Kafka/OSS 导入数据 | Pipe | Pipe 专为外部数据源持续导入设计 |
| 自动维护聚合结果 | 动态表 | 动态表自动增量计算并存储结果 |
| 外部数据库 CDC 同步到 Lakehouse | Studio 实时同步任务 | 直接对接 MySQL/PostgreSQL Binlog,无需中间层 |
快速示例
创建 Stream 并消费变更
-- 创建源表
CREATE TABLE IF NOT EXISTS orders_cdc (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING
);
-- 创建 Stream(STANDARD 模式)
CREATE TABLE STREAM orders_stream
ON TABLE orders_cdc
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 插入初始数据
INSERT INTO orders_cdc VALUES (1, 101, 99.00, 'created');
-- 查询 Stream,看到 INSERT 变更
SELECT order_id, user_id, amount, status, __change_type
FROM orders_stream;
-- 结果:
-- +----------+---------+--------+---------+---------------+
-- | order_id | user_id | amount | status | __change_type |
-- +----------+---------+--------+---------+---------------+
-- | 1 | 101 | 99.00 | created | INSERT |
-- +----------+---------+--------+---------+---------------+
-- 创建目标表并消费 Stream(offset 自动推进)
CREATE TABLE IF NOT EXISTS dwd_orders (
order_id BIGINT, user_id BIGINT, amount DECIMAL(10,2), status STRING
);
INSERT INTO dwd_orders
SELECT order_id, user_id, amount, status
FROM orders_stream;
-- 再次查询 Stream,应为空(offset 已推进)
SELECT COUNT(*) AS cnt FROM orders_stream;
-- 结果:
-- +-----+
-- | cnt |
-- +-----+
-- | 0 |
-- +-----+
捕获 UPDATE 和 DELETE
-- 更新源表
UPDATE orders_cdc SET status = 'completed' WHERE order_id = 1;
-- 删除源表记录
DELETE FROM orders_cdc WHERE order_id = 1;
-- 查询 Stream(SELECT 不推进 offset,累积显示所有未消费变更)
SELECT order_id, status, __change_type FROM orders_stream ORDER BY __change_type;
-- 结果:
-- +----------+-----------+---------------+
-- | order_id | status | __change_type |
-- +----------+-----------+---------------+
-- | 1 | completed | DELETE |
-- | 1 | completed | UPDATE_AFTER |
-- | 1 | created | UPDATE_BEFORE |
-- +----------+-----------+---------------+
消费位点推进规则
| 操作 | Offset 是否推进 | 说明 |
|---|
SELECT * FROM stream
SELECT * FROM stream | ❌ 不推进 | 仅查看,数据下次查询仍在 |
INSERT INTO ... SELECT ... FROM stream
INSERT INTO ... SELECT ... FROM stream | ✅ 推进 | DML 消费后推进 |
MERGE INTO ... USING stream ...
MERGE INTO ... USING stream ... | ✅ 推进 | DML 消费后推进 |
| 事务回滚 | ❌ 不推进 | 数据仍在,下次可重新消费 |
⚠️ 注意:一个 Stream 只能被一个消费者完整消费。当 A 任务通过 DML 消费了 Stream,offset 推进后,B 任务再查同一个 Stream 就看不到那批变更了。如需多个下游消费同一张表的变更,为每个消费者单独创建一个 Stream。
常见问题
常见问题 1:WHERE 过滤导致数据丢失
问题:
INSERT INTO target SELECT ... FROM stream WHERE __change_type = 'INSERT'
INSERT INTO target SELECT ... FROM stream WHERE __change_type = 'INSERT'
过滤了 UPDATE/DELETE。
症状:被过滤的 UPDATE/DELETE 变更永久丢失,无法重新消费。
解决:
- Stream 的 offset 在 DML 消费后推进,与 WHERE 条件无关
- 如需选择性消费,先将全部变更写入中间表,再从中间表过滤处理
常见问题 2:Stream 过期(STALE)
问题:Stream 长时间未消费,超过源表的 Time Travel 保留期。
症状:Stream 变为 STALE 状态,无法继续读取。
解决:
- Stream 消费频率应远短于源表的
data_retention_days
data_retention_days
(默认 1 天)
- 重要表的
data_retention_days
data_retention_days
建议设置为 7 天或更长
常见问题 3:APPEND_ONLY 模式误用
问题:源表有 UPDATE/DELETE,但创建了 APPEND_ONLY 模式的 Stream。
症状:UPDATE/DELETE 变更不会被捕获,下游数据不一致。
解决:
- 仅追加场景(如日志表)用 APPEND_ONLY,性能更优
- 需要捕获完整变更用 STANDARD 模式
成本影响
存储成本
- Stream 本身不存储数据,只存储 offset(元数据),成本极低
- 多个 Stream 共享同一份源表历史版本,额外成本可忽略
计算成本
- Stream 查询消耗 VCluster CRU,与查询的数据量相关
- STANDARD 模式比 APPEND_ONLY 模式开销更大(需跟踪所有变更)
生命周期管理
创建 Stream → 源表产生变更 → DML 消费(offset 推进)→ Stream 过期/删除
↓ ↓ ↓ ↓
记录 offset INSERT/UPDATE 位点自动前进 超保留期或手动删除
/DELETE 被捕获
创建和删除
-- 创建 Stream
CREATE TABLE STREAM my_stream
ON TABLE my_source
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 查看 Stream 列表
SHOW TABLE STREAMS;
-- 删除 Stream(不影响源表数据)
DROP TABLE STREAM my_stream;
相关文档