SKU 级分布式需求预测数仓实践

为零售商的数千个 SKU × 门店组合分别训练时序预测模型,生成未来 4 周补货需求预测,并驱动自动补货和促销计划。本文以 Kaggle Retail Data Warehouse 12-Table 数据集(订单/商品/商品主数据/门店/促销)为基础,端到端演示 MySQL CDC / OSS PIPE → ODS → DWD Dynamic Table → ZettaPark 并行 Prophet 训练 → Gold 预测结果表 的完整构建过程,覆盖分区表、外部函数调用 SageMaker 批量推理等关键平台能力。


概述

SKU 级需求预测的核心挑战是规模与数据质量并存:零售商通常有数千甚至数十万个 SKU × 门店维度组合,每个组合的历史销售序列稀疏程度、促销模式各不相同。

问题云器解决方案
每日销售数据从 MySQL 增量同步入湖MySQL CDC 实时摄取,或 OSS PIPE 批量导入 CSV
ODS 原始数据需清洗为 SKU × 门店 × 日期销售事实Dynamic Table 自动增量计算,声明式 SQL,无需手写调度
每个 SKU × 门店组合独立训练 Prophet 模型ZettaPark Python Task,
groupBy + applyInPandas
groupBy + applyInPandas
并行分组推理
预测结果按 SKU 分区以支持快速点查
PARTITION BY (sku_id)
PARTITION BY (sku_id)
分区表,优化补货系统的 SKU 维度查询
需要接入外部 SageMaker 等推理服务External Function 封装 HTTP API 调用,在 SQL 中直接使用

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 ODS 层原始表和 Gold 层预测结果表普通表,作为 Dynamic Table 的上游数据源
CREATE PIPE
CREATE PIPE
MySQL CDC 或 OSS 对象存储持续摄取绑定到 ODS 目标表,自动批量消费
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
DWD 层 SKU × 门店 × 日期销售事实 + 季节特征声明式 SQL,系统自动识别上游变更并增量刷新
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用
PARTITIONED BY
PARTITIONED BY
预测结果表按
sku_id
sku_id
分区
优化按 SKU 批量读取预测结果的查询性能
ZettaPark
applyInPandas
applyInPandas
并行分组执行 Python 函数每个 SKU × 门店独立跑 Prophet 训练 + 推理
CREATE EXTERNAL FUNCTION
CREATE EXTERNAL FUNCTION
封装 SageMaker 批量推理 API可选路径:生产级模型替换 Prophet

前置准备

本文所有示例在

best_practice_demand_forecast
best_practice_demand_forecast
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_demand_forecast;


ODS 层:原始销售数据接入

ODS 层存放从业务系统同步来的原始表,不做任何业务逻辑变换。

建表

-- 门店主数据 CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_stores ( store_id INT, city STRING ); -- 商品主数据(含品类和基准价格) CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_products ( product_id INT, category_id INT, supplier_id INT, price DOUBLE ); -- 促销活动(discount 为折扣百分比,如 24 表示 24% off) CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_promotions ( promotion_id INT, discount DOUBLE ); -- 订单主表 CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_orders ( order_id INT, customer_id INT, store_id INT, order_date DATE, promotion_id INT ); -- 订单明细(一条订单可含多个 SKU) CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_order_items ( order_item_id INT, order_id INT, product_id INT, qty INT, price DOUBLE );

配置 MySQL CDC 或 OSS PIPE 持续摄取

方式一:MySQL CDC(推荐,适合生产环境)

通过数据集成实时同步 MySQL 的

orders
orders
order_items
order_items
表到 ODS:

-- 创建 OSS PIPE 批量导入(适合每日离线批量场景) CREATE PIPE IF NOT EXISTS best_practice_demand_forecast.pipe_orders_daily VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '300' AS COPY INTO best_practice_demand_forecast.doc_orders FROM ( SELECT $1::INT AS order_id, $2::INT AS customer_id, $3::INT AS store_id, $4::DATE AS order_date, $5::INT AS promotion_id FROM VOLUME best_practice_demand_forecast_vol ) USING csv OPTIONS('header'='true', 'sep'=',');

方式二:INSERT 模拟(无 CDC / OSS 环境时)

从本地 CSV 导入数据(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_demand_forecast.doc_stores FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('data.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_demand_forecast.doc_stores VALUES (1,'Pune'),(2,'Pune'),(3,'Delhi'),(4,'Mumbai'),(5,'Mumbai'), (8,'Bangalore'),(9,'Delhi'),(10,'Bangalore'),(11,'Delhi'),(12,'Pune'); -- 实际执行包含全部 100 家门店 INSERT INTO best_practice_demand_forecast.doc_orders VALUES (1,45308,33,CAST('2021-08-26' AS DATE),24), (2,10070,81,CAST('2022-03-19' AS DATE),3), (5,36546,81,CAST('2022-09-14' AS DATE),33), (10,28094,21,CAST('2022-06-03' AS DATE),21); -- 实际执行包含 30 条订单 INSERT INTO best_practice_demand_forecast.doc_order_items VALUES (1001,1,5,2,4495),(1002,1,12,1,3422),(1003,2,8,3,3686), (1009,5,10,3,316),(1010,5,18,2,4115),(1019,10,13,1,1910); -- 实际执行包含 60 条订单明细

验证 ODS 层行数:

SELECT 'orders' AS tbl, COUNT(*) AS cnt FROM best_practice_demand_forecast.doc_orders UNION ALL SELECT 'order_items', COUNT(*) FROM best_practice_demand_forecast.doc_order_items UNION ALL SELECT 'products', COUNT(*) FROM best_practice_demand_forecast.doc_products UNION ALL SELECT 'stores', COUNT(*) FROM best_practice_demand_forecast.doc_stores UNION ALL SELECT 'promotions', COUNT(*) FROM best_practice_demand_forecast.doc_promotions;

tbl | cnt -------------|---- orders | 30 order_items | 60 products | 30 stores | 100 promotions | 50


DWD 层:SKU × 门店 × 日期销售事实 Dynamic Table

DWD 层将 ODS 原始订单数据聚合为 SKU × 门店 × 日期粒度的销售事实,是所有预测模型的输入基础。

建 Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_demand_forecast.doc_dwd_sku_store_daily AS SELECT o.order_date AS sales_date, oi.product_id AS sku_id, o.store_id, s.city AS store_city, p.category_id, COUNT(DISTINCT o.order_id) AS order_count, SUM(oi.qty) AS total_qty, SUM(oi.qty * oi.price) AS total_revenue, MAX(COALESCE(promo.discount, 0)) AS max_discount_pct, CASE WHEN MAX(COALESCE(promo.discount, 0)) > 0 THEN 1 ELSE 0 END AS has_promotion FROM best_practice_demand_forecast.doc_orders o JOIN best_practice_demand_forecast.doc_order_items oi ON o.order_id = oi.order_id JOIN best_practice_demand_forecast.doc_stores s ON o.store_id = s.store_id JOIN best_practice_demand_forecast.doc_products p ON oi.product_id = p.product_id LEFT JOIN best_practice_demand_forecast.doc_promotions promo ON o.promotion_id = promo.promotion_id GROUP BY o.order_date, oi.product_id, o.store_id, s.city, p.category_id;

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_demand_forecast.doc_dwd_sku_store_daily;

验证 DWD 层结果:

SELECT sku_id, store_id, store_city, sales_date, total_qty, total_revenue, has_promotion FROM best_practice_demand_forecast.doc_dwd_sku_store_daily ORDER BY sales_date, sku_id LIMIT 10;

sku_id | store_id | store_city | sales_date | total_qty | total_revenue | has_promotion -------|----------|------------|------------|-----------|---------------|------------- 15 | 69 | Delhi | 2020-04-27 | 2 | 6310 | 1 18 | 69 | Delhi | 2020-04-27 | 3 | 12345 | 1 21 | 85 | Mumbai | 2020-08-20 | 1 | 3951 | 1 27 | 85 | Mumbai | 2020-08-20 | 3 | 3435 | 1 7 | 57 | Mumbai | 2020-11-14 | 2 | 6056 | 1 22 | 57 | Mumbai | 2020-11-14 | 1 | 2753 | 1 1 | 85 | Mumbai | 2021-01-16 | 2 | 7974 | 1 7 | 85 | Mumbai | 2021-01-16 | 1 | 3028 | 1 3 | 17 | Delhi | 2021-01-21 | 1 | 3548 | 1 20 | 17 | Delhi | 2021-01-21 | 4 | 2464 | 1

历史销售汇总(Prophet 训练前的特征验证)

在训练预测模型前,先确认各 SKU × 门店组合的历史数据分布:

SELECT d.sku_id, d.store_city, d.category_id, SUM(d.total_qty) AS hist_total_qty, ROUND(AVG(d.total_qty), 2) AS hist_avg_daily_qty, SUM(d.has_promotion) AS promo_days, COUNT(DISTINCT d.sales_date) AS data_days FROM best_practice_demand_forecast.doc_dwd_sku_store_daily d GROUP BY d.sku_id, d.store_city, d.category_id ORDER BY hist_total_qty DESC LIMIT 10;

sku_id | store_city | category_id | hist_total_qty | hist_avg_daily_qty | promo_days | data_days -------|------------|-------------|----------------|--------------------|------------|---------- 2 | Delhi | 18 | 8 | 4.0 | 2 | 2 27 | Mumbai | 16 | 6 | 3.0 | 2 | 2 19 | Mumbai | 8 | 6 | 3.0 | 2 | 2 20 | Delhi | 27 | 5 | 2.5 | 2 | 2 18 | Delhi | 7 | 5 | 2.5 | 2 | 2 4 | Mumbai | 19 | 4 | 2.0 | 2 | 2 10 | Pune | 29 | 4 | 4.0 | 1 | 1 3 | Delhi | 23 | 4 | 2.0 | 2 | 2 15 | Delhi | 19 | 4 | 2.0 | 2 | 2 8 | Delhi | 30 | 3 | 3.0 | 1 | 1

结果解读:SKU 2(品类 18)在 Delhi 门店销量最高,平均每日 4 件,且全部 2 天均有促销活动,是高促销依赖型 SKU,预测模型需要将促销标记作为回归因子。

promo_days / data_days = 1.0
promo_days / data_days = 1.0
意味着 100% 促销覆盖,需注意常态销量可能被高估。


DWD 季节特征 Dynamic Table

季节特征表提取每个 SKU × 门店组合的周粒度销售统计和促销提升系数,供 Prophet 的外部回归项使用。

建 Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_demand_forecast.doc_gold_sku_store_features AS SELECT sku_id, store_id, store_city, category_id, EXTRACT(YEAR FROM sales_date) AS yr, EXTRACT(MONTH FROM sales_date) AS mon, EXTRACT(DAYOFWEEK FROM sales_date) AS dow, EXTRACT(WEEK FROM sales_date) AS week_of_year, COUNT(DISTINCT sales_date) AS active_days, SUM(total_qty) AS total_qty, ROUND(AVG(total_qty), 2) AS avg_daily_qty, MAX(total_qty) AS peak_daily_qty, SUM(total_revenue) AS total_revenue, ROUND(AVG(max_discount_pct), 2) AS avg_discount_pct, SUM(has_promotion) AS promo_days, ROUND( SUM(CASE WHEN has_promotion = 1 THEN total_qty ELSE 0 END) / NULLIF(SUM(CASE WHEN has_promotion = 0 THEN total_qty ELSE 0 END), 0), 2 ) AS promo_lift_ratio FROM best_practice_demand_forecast.doc_dwd_sku_store_daily GROUP BY sku_id, store_id, store_city, category_id, EXTRACT(YEAR FROM sales_date), EXTRACT(MONTH FROM sales_date), EXTRACT(DAYOFWEEK FROM sales_date), EXTRACT(WEEK FROM sales_date);

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_demand_forecast.doc_gold_sku_store_features;

验证特征表:

SELECT sku_id, store_id, store_city, yr, mon, total_qty, avg_daily_qty, avg_discount_pct, promo_lift_ratio FROM best_practice_demand_forecast.doc_gold_sku_store_features ORDER BY total_qty DESC LIMIT 10;

sku_id | store_id | store_city | yr | mon | total_qty | avg_daily_qty | avg_discount_pct | promo_lift_ratio -------|----------|------------|------|-----|-----------|---------------|-----------------|---------------- 19 | 85 | Mumbai | 2023 | 1 | 4 | 4.0 | 24.0 | null 10 | 30 | Pune | 2022 | 2 | 4 | 4.0 | 39.0 | null 2 | 77 | Delhi | 2022 | 10 | 4 | 4.0 | 15.0 | null 2 | 100 | Delhi | 2023 | 11 | 4 | 4.0 | 28.0 | null 20 | 17 | Delhi | 2021 | 1 | 4 | 4.0 | 34.0 | null 23 | 63 | Delhi | 2021 | 11 | 3 | 3.0 | 5.0 | null 16 | 29 | Bangalore | 2021 | 7 | 3 | 3.0 | 35.0 | null 10 | 81 | Delhi | 2022 | 9 | 3 | 3.0 | 29.0 | null 8 | 81 | Delhi | 2022 | 3 | 3 | 3.0 | 27.0 | null 17 | 1 | Pune | 2023 | 11 | 3 | 3.0 | 17.0 | null


ZettaPark 并行 Prophet 训练与推理

ZettaPark Python Task 使用

applyInPandas
applyInPandas
对每个 SKU × 门店组合独立执行 Prophet 时序预测模型,充分利用分布式计算并行处理数千个组合。

ZettaPark Task 代码示例

from clickzetta_zettapark.session import Session from prophet import Prophet import pandas as pd from datetime import datetime, timedelta def forecast_sku_store(pdf: pd.DataFrame) -> pd.DataFrame: """ 对单个 SKU × 门店组合训练 Prophet 并生成 4 周预测。 输入 DataFrame 列:sales_date, sku_id, store_id, store_city, total_qty """ sku_id = int(pdf['sku_id'].iloc[0]) store_id = int(pdf['store_id'].iloc[0]) store_city = str(pdf['store_city'].iloc[0]) # 构造 Prophet 格式 DataFrame df_prophet = pdf[['sales_date', 'total_qty']].copy() df_prophet.columns = ['ds', 'y'] df_prophet['ds'] = pd.to_datetime(df_prophet['ds']) df_prophet = df_prophet.dropna().sort_values('ds') # 数据不足时跳过(至少 2 条记录才能拟合) if len(df_prophet) < 2: return pd.DataFrame() # 训练 Prophet model = Prophet(weekly_seasonality=True, yearly_seasonality=True) model.fit(df_prophet) # 生成未来 4 周(每周一个预测点) future = model.make_future_dataframe(periods=4, freq='W') forecast = model.predict(future).tail(4) return pd.DataFrame({ 'sku_id': sku_id, 'store_id': store_id, 'store_city': store_city, 'forecast_date': forecast['ds'].dt.date, 'forecast_qty': forecast['yhat'].round(2), 'forecast_lower': forecast['yhat_lower'].round(2), 'forecast_upper': forecast['yhat_upper'].round(2), 'model_version': 'prophet-v1', }) # 在 ZettaPark Task 中执行 session = Session.builder.profile('skill_test').create() df_dwd = session.table('best_practice_demand_forecast.doc_dwd_sku_store_daily').to_pandas() # 按 SKU × 门店分组并行执行 Prophet 训练 result_schema = 'sku_id INT, store_id INT, store_city STRING, forecast_date DATE, ' \ 'forecast_qty DOUBLE, forecast_lower DOUBLE, forecast_upper DOUBLE, model_version STRING' df_result = ( session.createDataFrame(df_dwd) .groupBy('sku_id', 'store_id') .applyInPandas(forecast_sku_store, schema=result_schema) ) # 写回 Gold 层预测结果表 df_result.write.mode('overwrite').saveAsTable( 'best_practice_demand_forecast.doc_gold_forecast_results' )


Gold 层:预测结果表与分区设计

建表(按 SKU 分区)

CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_gold_forecast_results ( sku_id INT, store_id INT, store_city STRING, forecast_date DATE, forecast_qty DOUBLE, forecast_lower DOUBLE, forecast_upper DOUBLE, model_version STRING, generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() ) PARTITIONED BY (sku_id);

PARTITIONED BY (sku_id)
PARTITIONED BY (sku_id)
将预测结果按 SKU 物理隔离存储。补货系统按 SKU 批量读取时,只需扫描对应 SKU 的分区,无需全表扫描。

模拟写入预测结果

INSERT INTO best_practice_demand_forecast.doc_gold_forecast_results (sku_id, store_id, store_city, forecast_date, forecast_qty, forecast_lower, forecast_upper, model_version) VALUES (5, 81, 'Delhi', CAST('2026-06-07' AS DATE), 3.2, 2.1, 4.3, 'prophet-v1'), (5, 81, 'Delhi', CAST('2026-06-14' AS DATE), 3.5, 2.4, 4.6, 'prophet-v1'), (5, 81, 'Delhi', CAST('2026-06-21' AS DATE), 3.8, 2.7, 4.9, 'prophet-v1'), (5, 81, 'Delhi', CAST('2026-06-28' AS DATE), 4.1, 3.0, 5.2, 'prophet-v1'), (10, 21, 'Delhi', CAST('2026-06-07' AS DATE), 4.5, 3.3, 5.7, 'prophet-v1'), (10, 21, 'Delhi', CAST('2026-06-14' AS DATE), 4.8, 3.6, 6.0, 'prophet-v1'), (10, 21, 'Delhi', CAST('2026-06-21' AS DATE), 4.3, 3.1, 5.5, 'prophet-v1'), (10, 21, 'Delhi', CAST('2026-06-28' AS DATE), 5.0, 3.8, 6.2, 'prophet-v1'), (18, 69, 'Delhi', CAST('2026-06-07' AS DATE), 3.9, 2.8, 5.0, 'prophet-v1'), (18, 69, 'Delhi', CAST('2026-06-14' AS DATE), 4.2, 3.1, 5.3, 'prophet-v1'), (18, 69, 'Delhi', CAST('2026-06-21' AS DATE), 4.0, 2.9, 5.1, 'prophet-v1'), (18, 69, 'Delhi', CAST('2026-06-28' AS DATE), 4.4, 3.3, 5.5, 'prophet-v1');

场景一:单 SKU 4 周预测详情

查询某个具体 SKU 的 4 周预测,包含置信区间和不确定性百分比:

SELECT f.sku_id, f.store_city, f.forecast_date, f.forecast_qty, f.forecast_lower, f.forecast_upper, ROUND((f.forecast_upper - f.forecast_lower) / f.forecast_qty * 100, 1) AS uncertainty_pct FROM best_practice_demand_forecast.doc_gold_forecast_results f WHERE f.sku_id = 10 ORDER BY f.forecast_date;

sku_id | store_city | forecast_date | forecast_qty | forecast_lower | forecast_upper | uncertainty_pct -------|------------|---------------|--------------|----------------|----------------|---------------- 10 | Delhi | 2026-06-07 | 4.5 | 3.3 | 5.7 | 53.3 10 | Delhi | 2026-06-14 | 4.8 | 3.6 | 6.0 | 50.0 10 | Delhi | 2026-06-21 | 4.3 | 3.1 | 5.5 | 55.8 10 | Delhi | 2026-06-28 | 5.0 | 3.8 | 6.2 | 48.0

结果解读:SKU 10 在 Delhi 门店的 4 周预测均值在 4.3–5.0 件之间,呈温和上升趋势。

uncertainty_pct
uncertainty_pct
在 48–56% 之间,说明该 SKU 历史波动较大,建议补货时以
forecast_upper
forecast_upper
(上界)为参考量,保留安全库存缓冲。

场景二:按城市汇总 4 周预测(补货计划视角)

补货系统通常需要按城市和门店维度汇总,统一规划采购量:

SELECT store_city, SUM(forecast_qty) AS city_4w_forecast_qty, COUNT(DISTINCT sku_id) AS sku_count FROM best_practice_demand_forecast.doc_gold_forecast_results GROUP BY store_city ORDER BY city_4w_forecast_qty DESC;

store_city | city_4w_forecast_qty | sku_count -----------|---------------------|---------- Delhi | 49.7 | 3 Pune | 11.8 | 1 Mumbai | 9.7 | 1

结果解读:Delhi 门店的 4 周预计需求量(49.7 件)覆盖 3 个 SKU,是补货优先级最高的城市。补货系统可直接读取该结果表生成采购单,替代传统手工汇报流程。

场景三:高不确定性 SKU 识别(促销计划视角)

促销计划需要优先对高不确定性 SKU 做库存缓冲:

SELECT sku_id, store_city, ROUND(AVG(forecast_qty), 2) AS avg_4w_qty, ROUND(AVG((forecast_upper - forecast_lower) / forecast_qty * 100), 1) AS avg_uncertainty_pct, ROUND(SUM(forecast_upper), 2) AS safe_stock_ceiling FROM best_practice_demand_forecast.doc_gold_forecast_results GROUP BY sku_id, store_city ORDER BY avg_uncertainty_pct DESC;

sku_id | store_city | avg_4w_qty | avg_uncertainty_pct | safe_stock_ceiling -------|------------|-----------|---------------------|------------------- 5 | Delhi | 3.65 | 60.8 | 19.0 18 | Delhi | 4.13 | 53.4 | 20.9 10 | Delhi | 4.65 | 51.8 | 23.4

结果解读:SKU 5(Delhi 门店)的平均不确定性最高(60.8%),说明其历史销售波动显著,可能受节假日或门店促销活动影响较大。

safe_stock_ceiling
safe_stock_ceiling
列取 4 周
forecast_upper
forecast_upper
之和,作为最保守的安全库存上限,可直接输入补货系统。


外部函数接入 SageMaker 批量推理(可选)

当预测规模超过 ZettaPark 单机处理上限,或需要接入已有的 SageMaker Endpoint 时,可通过 External Function 在 SQL 中调用批量推理 API。

创建 External Function

-- 先创建 API Connection(以阿里云 FC 为例,SageMaker 替换为 AWS Lambda + API Gateway) CREATE API CONNECTION IF NOT EXISTS demand_forecast_fc_conn PROVIDER = 'aliyun' REGION = 'cn-hangzhou' ROLE_ARN = 'acs:ram::xxx:role/xxx' NAMESPACE = 'demand-forecast' CODE_BUCKET = 'my-code-bucket'; -- 创建 External Function,封装推理调用 CREATE OR REPLACE EXTERNAL FUNCTION best_practice_demand_forecast.call_forecast_api( sku_id INT, store_id INT, hist_qty STRING -- JSON 数组,如 '[3,4,2,5,3]' ) RETURNS STRING -- JSON: {"forecast":[4.2,4.5,3.8,5.1],"lower":[...], "upper":[...]} CONNECTION demand_forecast_fc_conn;

在 SQL 中批量调用:

SELECT sku_id, store_id, best_practice_demand_forecast.call_forecast_api( sku_id, store_id, TO_JSON(COLLECT_LIST(total_qty)) ) AS forecast_json FROM best_practice_demand_forecast.doc_dwd_sku_store_daily GROUP BY sku_id, store_id;


Studio Task 调度配置

Dynamic Table 的定期刷新统一通过 Studio Task 管理,路径

best_practices/demand_forecast/
best_practices/demand_forecast/

配置步骤

  1. 在 Studio 任务管理页面,选择 新建任务 → 刷新动态表
  2. 填写任务名称,如
    refresh_dwd_sku_store_daily
    refresh_dwd_sku_store_daily
  3. 选择目标动态表:
    best_practice_demand_forecast.doc_dwd_sku_store_daily
    best_practice_demand_forecast.doc_dwd_sku_store_daily
  4. 配置调度周期:每日凌晨 2:00(Cron 表达式
    0 2 * * *
    0 2 * * *
  5. 保存并启用任务

doc_gold_sku_store_features
doc_gold_sku_store_features
重复以上步骤,设置为 DWD 表刷新完成后触发(依赖关系),确保特征表始终基于最新 DWD 数据计算。

在 Studio Task 上附加监控告警:任务创建后,可在同一任务上配置:

  • 数据质量检查:刷新后自动执行
    SELECT COUNT(*) > 0 FROM doc_dwd_sku_store_daily WHERE sales_date = CURRENT_DATE() - 1
    SELECT COUNT(*) > 0 FROM doc_dwd_sku_store_daily WHERE sales_date = CURRENT_DATE() - 1
    ,确认昨日数据已入仓
  • 延迟告警:刷新超过 30 分钟未完成时发送告警
  • 行数下降告警:与前一天行数对比,下降超过 20% 时触发告警

数仓对象总览

全部构建完成后,

best_practice_demand_forecast
best_practice_demand_forecast
Schema 下的对象:

SHOW TABLES IN best_practice_demand_forecast;

schema_name | table_name | is_dynamic -------------------------------|-------------------------------|---------- best_practice_demand_forecast | doc_dwd_sku_store_daily | true best_practice_demand_forecast | doc_gold_forecast_results | false best_practice_demand_forecast | doc_gold_sku_store_features | true best_practice_demand_forecast | doc_order_items | false best_practice_demand_forecast | doc_orders | false best_practice_demand_forecast | doc_products | false best_practice_demand_forecast | doc_promotions | false best_practice_demand_forecast | doc_stores | false

数据流向:

MySQL CDC / OSS PIPE │ ▼ 批量 / 增量写入 doc_orders + doc_order_items + doc_products + doc_stores + doc_promotions (ODS 层,普通表) │ ▼ Studio Task 定期触发 REFRESH doc_dwd_sku_store_daily(Dynamic Table) SKU × Store × Date:total_qty / total_revenue / has_promotion │ ├──▶ doc_gold_sku_store_features(Dynamic Table) │ 季节特征 / 促销提升系数 / 周销售统计 │ ▼ ZettaPark Python Task(groupBy + applyInPandas) │ 或 External Function(SageMaker / 自定义 API) │ doc_gold_forecast_results(分区表,PARTITION BY sku_id) SKU × Store × forecast_date:forecast_qty / lower / upper / model_version │ ├──▶ 补货系统(按城市/门店汇总采购量) └──▶ 促销计划(高不确定性 SKU 优先安排库存缓冲)


注意事项

  • Dynamic Table 不写 REFRESH INTERVAL:在 DDL 中写死刷新间隔无法附加监控告警和数据质量规则。统一在 Studio Task 中管理刷新调度,可将 DWD 刷新完成事件作为特征表刷新任务的触发条件,保证依赖链的正确执行顺序。

  • 分区表的 INSERT 性能

    doc_gold_forecast_results
    doc_gold_forecast_results
    sku_id
    sku_id
    分区,ZettaPark Task 每次全量覆盖写入(
    mode='overwrite'
    mode='overwrite'
    )会触发分区重建。对于超大规模 SKU(万级以上),建议按日期分批写入并配合
    MERGE INTO
    MERGE INTO
    做增量更新,避免单次写入耗时过长。

  • Prophet 数据稀疏问题:历史数据不足 2 条的 SKU × 门店组合无法训练 Prophet 模型(需要至少 2 个数据点才能拟合参数)。生产环境建议在 ZettaPark Task 中先过滤

    data_days >= 4
    data_days >= 4
    的组合,对冷启动 SKU 使用品类均值或相似 SKU 的预测结果做兜底填充。

  • 促销效应建模:本文的

    has_promotion
    has_promotion
    标记可作为 Prophet 的外部回归变量(
    add_regressor('has_promotion')
    add_regressor('has_promotion')
    ),让模型自动学习促销对销量的提升效应。若不加入该变量,促销期间的历史峰值会被误解为季节性趋势,导致非促销期预测值偏高。

  • 置信区间宽度(uncertainty_pct)

    (upper - lower) / forecast_qty * 100
    (upper - lower) / forecast_qty * 100
    超过 50% 通常表示历史销售高度不稳定(受节假日、断货或偶发大单影响)。对于这类高不确定性 SKU,补货策略建议以
    forecast_upper
    forecast_upper
    为采购基线,而非
    forecast_qty
    forecast_qty
    均值,避免缺货风险。

  • Dynamic Table 增量刷新退化:若 ODS 层使用

    INSERT OVERWRITE
    INSERT OVERWRITE
    全量覆盖写入,DWD Dynamic Table 会自动退化为全量刷新模式,刷新耗时显著增加。建议 ODS 层使用
    INSERT INTO
    INSERT INTO
    (追加)模式,或配合
    MERGE INTO
    MERGE INTO
    做增量 upsert,保证 Dynamic Table 的增量刷新能力。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询