销售实时看板:用 Dynamic Table 构建多层增量数仓

业务背景

电商运营团队每天面对一个共同问题:促销活动进行中,各品类的 GMV 和排名在实时变化,但数据看板要等到次日才能刷新。错过实时数据意味着错过调整投放策略的窗口——某个品类已经卖爆了,补货和加推的决策却要等到明天。

传统解法是写一个定时 SQL 任务,每分钟全量重算一次。这个方案有两个问题:

  • 计算浪费:每次全量扫描所有历史订单,数据量越大越慢,成本随数据增长线性上升
  • 运维负担:需要自己管理调度、处理任务失败重试、保证幂等性

Dynamic Table 的价值在于:你只写全量语义的 SQL,系统自动识别新增数据,只计算变化部分,不重算历史。数据量翻 10 倍,增量计算的成本不变。

适用场景

场景说明
实时销售排行榜品类/商品/门店 GMV 排名,分钟级更新
促销活动监控活动期间实时跟踪各 SKU 销量和库存消耗
运营日报自动化每日 GMV、订单量、客单价自动汇总,无需人工跑数
多层数仓维护ODS → DWD → ADS 全链路增量刷新,替代定时全量任务

涉及的 SQL 命令

命令用途
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
定义自动刷新的计算表
REFRESH INTERVAL
REFRESH INTERVAL
设置数据更新频率
SHOW DYNAMIC TABLE REFRESH HISTORY
SHOW DYNAMIC TABLE REFRESH HISTORY
查看每层刷新状态和耗时
DROP DYNAMIC TABLE
DROP DYNAMIC TABLE
清理资源

数据架构

三层 Dynamic Table 链路,每层 1 分钟自动刷新:

外部数据源(MySQL / Kafka / 对象存储 / ...) │ 实时写入 ▼ doc_orders doc_products └──────────────────┘ │ JOIN ▼ doc_dwd_order_detail ← DWD:订单打宽(补充品类、品牌) │ ▼ doc_ads_category_gmv ← ADS:品类 GMV 汇总 + 均价 │ ▼ doc_ads_category_rank ← ADS:品类实时排行榜

底表数据如何实时写入

本文用

INSERT INTO
INSERT INTO
模拟数据写入,方便你在测试环境快速复现。生产环境中,订单数据通常来自业务数据库或消息队列,Lakehouse 提供多种方式持续写入底表:

数据来源推荐方式参考文档
MySQL / PostgreSQL / Oracle 等关系型数据库Studio 实时同步任务(CDC)实时同步任务
多张业务表一次性迁移Studio 多表实时同步多表实时同步任务
Kafka 消息队列Pipe 持续导入借助 read_kafka 函数持续导入
对象存储(OSS / COS / S3)文件Pipe 持续导入使用 Pipe 持续导入对象存储数据
离线批量同步(T+1 或按小时)Studio 离线同步任务离线同步任务

底表一旦有新数据写入,Dynamic Table 链路在下一个刷新周期自动感知并增量计算,无需任何额外配置。

前置准备

建源表

CREATE TABLE IF NOT EXISTS doc_orders ( order_id STRING, user_id STRING, product_id STRING, quantity INT, unit_price DECIMAL(10,2), amount DECIMAL(10,2), order_time TIMESTAMP, status STRING ); CREATE TABLE IF NOT EXISTS doc_products ( product_id STRING, product_name STRING, category STRING, brand STRING );

插入初始数据

INSERT INTO doc_products VALUES ('P001', '无线蓝牙耳机', '数码配件', 'SoundMax'), ('P002', '机械键盘', '电脑外设', 'KeyPro'), ('P003', '运动跑鞋', '运动户外', 'SpeedRun'), ('P004', '瑜伽垫', '运动户外', 'FlexFit'), ('P005', '保温杯', '家居生活', 'ThermoKeep'), ('P006', 'USB-C 充电器', '数码配件', 'ChargeFast'), ('P007', '笔记本支架', '电脑外设', 'DeskPro'), ('P008', '跳绳', '运动户外', 'JumpFit'); INSERT INTO doc_orders VALUES ('O001','U101','P001',2,299.00,598.00, CAST('2026-05-28 16:00:00' AS TIMESTAMP),'completed'), ('O002','U102','P002',1,459.00,459.00, CAST('2026-05-28 16:05:00' AS TIMESTAMP),'completed'), ('O003','U103','P003',1,389.00,389.00, CAST('2026-05-28 16:10:00' AS TIMESTAMP),'completed'), ('O004','U104','P001',1,299.00,299.00, CAST('2026-05-28 16:15:00' AS TIMESTAMP),'completed'), ('O005','U105','P004',2,128.00,256.00, CAST('2026-05-28 16:20:00' AS TIMESTAMP),'completed'), ('O006','U106','P005',3, 89.00,267.00, CAST('2026-05-28 16:25:00' AS TIMESTAMP),'completed'), ('O007','U107','P006',2,129.00,258.00, CAST('2026-05-28 16:30:00' AS TIMESTAMP),'completed'), ('O008','U108','P003',2,389.00,778.00, CAST('2026-05-28 16:35:00' AS TIMESTAMP),'completed'), ('O009','U109','P007',1,199.00,199.00, CAST('2026-05-28 16:40:00' AS TIMESTAMP),'completed'), ('O010','U110','P008',3, 49.00,147.00, CAST('2026-05-28 16:45:00' AS TIMESTAMP),'completed');

场景一:创建三层 Dynamic Table

第一层(DWD):订单打宽

将订单表与商品表 JOIN,补充品类和品牌信息,只保留已完成的订单。

CREATE OR REPLACE DYNAMIC TABLE doc_dwd_order_detail REFRESH INTERVAL '1' MINUTE VCLUSTER default AS SELECT o.order_id, o.user_id, o.product_id, p.product_name, p.category, p.brand, o.quantity, o.unit_price, o.amount, o.order_time, o.status FROM doc_orders o JOIN doc_products p ON o.product_id = p.product_id WHERE o.status = 'completed';

第二层(ADS):品类 GMV 汇总

按品类聚合,计算订单数、销量、GMV 和均价。

CREATE OR REPLACE DYNAMIC TABLE doc_ads_category_gmv REFRESH INTERVAL '1' MINUTE VCLUSTER default AS SELECT category, COUNT(DISTINCT order_id) AS order_cnt, SUM(quantity) AS total_qty, SUM(amount) AS gmv, CAST(SUM(amount) / SUM(quantity) AS DECIMAL(10,2)) AS avg_price FROM doc_dwd_order_detail GROUP BY category;

第三层(ADS):品类排行榜

基于 GMV 汇总表计算实时排名。

CREATE OR REPLACE DYNAMIC TABLE doc_ads_category_rank REFRESH INTERVAL '1' MINUTE VCLUSTER default AS SELECT RANK() OVER (ORDER BY gmv DESC) AS gmv_rank, category, order_cnt, total_qty, gmv, avg_price FROM doc_ads_category_gmv;

验证初始结果

等待约 2 分钟后查询排行榜:

SELECT * FROM doc_ads_category_rank ORDER BY gmv_rank;

+--------+---------+-----------+-----------+---------+-----------+ |gmv_rank|category |order_cnt |total_qty |gmv |avg_price | +--------+---------+-----------+-----------+---------+-----------+ | 1|运动户外 | 4| 8| 1570.00| 196.25| | 2|数码配件 | 3| 5| 1155.00| 231.00| | 3|电脑外设 | 2| 2| 658.00| 329.00| | 4|家居生活 | 1| 3| 267.00| 89.00| +--------+---------+-----------+-----------+---------+-----------+

运动户外以 1570 元 GMV 排名第一,来自跑鞋(2 笔)、瑜伽垫(1 笔)、跳绳(1 笔)共 4 笔订单,均价 196.25 元。

场景二:新订单写入,排名自动更新

模拟下一批订单进来——数码配件和电脑外设各有新单:

INSERT INTO doc_orders VALUES ('O011','U111','P005',2, 89.00,178.00, CAST('2026-05-28 17:00:00' AS TIMESTAMP),'completed'), ('O012','U112','P006',3,129.00,387.00, CAST('2026-05-28 17:02:00' AS TIMESTAMP),'completed'), ('O013','U113','P001',1,299.00,299.00, CAST('2026-05-28 17:05:00' AS TIMESTAMP),'completed'), ('O014','U114','P004',1,128.00,128.00, CAST('2026-05-28 17:08:00' AS TIMESTAMP),'completed'), ('O015','U115','P002',2,459.00,918.00, CAST('2026-05-28 17:10:00' AS TIMESTAMP),'completed');

等待约 2 分钟,再次查询排行榜:

SELECT * FROM doc_ads_category_rank ORDER BY gmv_rank;

+--------+---------+-----------+-----------+---------+-----------+ |gmv_rank|category |order_cnt |total_qty |gmv |avg_price | +--------+---------+-----------+-----------+---------+-----------+ | 1|数码配件 | 5| 9| 1841.00| 204.56| | 2|运动户外 | 5| 9| 1698.00| 188.67| | 3|电脑外设 | 3| 4| 1576.00| 394.00| | 4|家居生活 | 2| 5| 445.00| 89.00| +--------+---------+-----------+-----------+---------+-----------+

数码配件新增了保温杯(178 元)、USB-C 充电器(387 元)、无线蓝牙耳机(299 元)共 686 元,GMV 从 1155 升至 1841,超过运动户外跃升第一。排名变化由系统自动计算,无需任何手动干预。

场景三:修改 DT 定义

业务需求变化时,可以用

CREATE OR REPLACE DYNAMIC TABLE
CREATE OR REPLACE DYNAMIC TABLE
更新 SQL 逻辑,已有数据不会丢失。

适合用 OR REPLACE 的改动:修改 WHERE 过滤条件、新增透传列(直接来自上游的列,不参与计算)。

需要 DROP + 重建的改动:修改列类型、修改聚合逻辑(如新增

avg_price
avg_price
这类计算列)。

OR REPLACE 后,若上游没有新数据写入,系统不会重算已有行——新列的值会保持 null,直到下次有新数据触发刷新时才一并计算。

查看刷新状态

SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'doc_ads_category_rank' LIMIT 5;

实测返回示例:

staterefresh_modestart_timerows_inserted含义
SUCCEEDINCREMENTAL18:11:484正常增量刷新,写入 4 行
SUCCEEDNO_DATA18:12:48上游无新数据,跳过计算
FAILEDFULL18:09:41全量刷新失败,见 error_message

关键字段说明:

字段含义
state
state
SUCCEED
SUCCEED
成功 /
FAILED
FAILED
失败
refresh_mode
refresh_mode
INCREMENTAL
INCREMENTAL
增量 /
FULL
FULL
全量 /
NO_DATA
NO_DATA
无新数据跳过
refresh_trigger
refresh_trigger
SYSTEM_SCHEDULED
SYSTEM_SCHEDULED
自动调度 /
MANUAL
MANUAL
手动触发
start_time
start_time
/
end_time
end_time
刷新开始和结束时间
stats
stats
rows_inserted
rows_inserted
/
rows_deleted
rows_deleted
,本次写入和删除的行数
error_message
error_message
FAILED 时的错误详情

上游删除后的行为

删除上游 DT 后,下游 DT 不会报错、不会级联删除,刷新状态变为

NO_DATA
NO_DATA
,对象继续存在。

-- 删除 DWD 层 DROP DYNAMIC TABLE IF EXISTS doc_dwd_order_detail; -- gmv 层仍然存在,但刷新变为 NO_DATA SHOW DYNAMIC TABLE REFRESH HISTORY WHERE name = 'doc_ads_category_gmv' LIMIT 3; -- state: SUCCEED, refresh_mode: NO_DATA

这意味着:如果误删了上游 DT,下游数据不会消失,重建上游后下游会在下一个刷新周期自动恢复增量计算。

清理资源

DROP DYNAMIC TABLE IF EXISTS doc_ads_category_rank; DROP DYNAMIC TABLE IF EXISTS doc_ads_category_gmv; DROP DYNAMIC TABLE IF EXISTS doc_dwd_order_detail; DROP TABLE IF EXISTS doc_orders; DROP TABLE IF EXISTS doc_products;

关键点总结

  • 只写全量 SQL,系统做增量:三层 DT 的 SQL 都是普通的 SELECT,不需要写任何增量逻辑,系统自动识别新增数据并只计算变化部分
  • 链路自动传播:源表有新数据 → DWD 层刷新 → ADS 汇总层刷新 → 排行榜更新,整个链路无需人工触发
  • 端到端延迟约 1-2 分钟:实测 DWD 和 gmv 层在同一批次刷新,rank 层在下一个周期感知变化,端到端约 1-2 分钟(刷新间隔设为 1 分钟时)
  • OR REPLACE 有限制:不能改列类型,改聚合逻辑需 DROP + 重建;OR REPLACE 后若上游无新数据,新列值为 null 直到下次刷新

相关文档

联系我们
预约咨询
微信咨询
电话咨询