CREATE SCHEMA IF NOT EXISTS best_practice_demand_forecast;
ODS 层:原始销售数据接入
ODS 层存放从业务系统同步来的原始表,不做任何业务逻辑变换。
建表
-- 门店主数据
CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_stores (
store_id INT,
city STRING
);
-- 商品主数据(含品类和基准价格)
CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_products (
product_id INT,
category_id INT,
supplier_id INT,
price DOUBLE
);
-- 促销活动(discount 为折扣百分比,如 24 表示 24% off)
CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_promotions (
promotion_id INT,
discount DOUBLE
);
-- 订单主表
CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_orders (
order_id INT,
customer_id INT,
store_id INT,
order_date DATE,
promotion_id INT
);
-- 订单明细(一条订单可含多个 SKU)
CREATE TABLE IF NOT EXISTS best_practice_demand_forecast.doc_order_items (
order_item_id INT,
order_id INT,
product_id INT,
qty INT,
price DOUBLE
);
配置 MySQL CDC 或 OSS PIPE 持续摄取
方式一:MySQL CDC(推荐,适合生产环境)
通过数据集成实时同步 MySQL 的
orders
orders
和
order_items
order_items
表到 ODS:
-- 创建 OSS PIPE 批量导入(适合每日离线批量场景)
CREATE PIPE IF NOT EXISTS best_practice_demand_forecast.pipe_orders_daily
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '300'
AS
COPY INTO best_practice_demand_forecast.doc_orders
FROM (
SELECT
$1::INT AS order_id,
$2::INT AS customer_id,
$3::INT AS store_id,
$4::DATE AS order_date,
$5::INT AS promotion_id
FROM VOLUME best_practice_demand_forecast_vol
)
USING csv
OPTIONS('header'='true', 'sep'=',');
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_demand_forecast.doc_stores
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_demand_forecast.doc_stores VALUES
(1,'Pune'),(2,'Pune'),(3,'Delhi'),(4,'Mumbai'),(5,'Mumbai'),
(8,'Bangalore'),(9,'Delhi'),(10,'Bangalore'),(11,'Delhi'),(12,'Pune');
-- 实际执行包含全部 100 家门店
INSERT INTO best_practice_demand_forecast.doc_orders VALUES
(1,45308,33,CAST('2021-08-26' AS DATE),24),
(2,10070,81,CAST('2022-03-19' AS DATE),3),
(5,36546,81,CAST('2022-09-14' AS DATE),33),
(10,28094,21,CAST('2022-06-03' AS DATE),21);
-- 实际执行包含 30 条订单
INSERT INTO best_practice_demand_forecast.doc_order_items VALUES
(1001,1,5,2,4495),(1002,1,12,1,3422),(1003,2,8,3,3686),
(1009,5,10,3,316),(1010,5,18,2,4115),(1019,10,13,1,1910);
-- 实际执行包含 60 条订单明细
验证 ODS 层行数:
SELECT 'orders' AS tbl, COUNT(*) AS cnt FROM best_practice_demand_forecast.doc_orders
UNION ALL
SELECT 'order_items', COUNT(*) FROM best_practice_demand_forecast.doc_order_items
UNION ALL
SELECT 'products', COUNT(*) FROM best_practice_demand_forecast.doc_products
UNION ALL
SELECT 'stores', COUNT(*) FROM best_practice_demand_forecast.doc_stores
UNION ALL
SELECT 'promotions', COUNT(*) FROM best_practice_demand_forecast.doc_promotions;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_demand_forecast.doc_dwd_sku_store_daily
AS
SELECT
o.order_date AS sales_date,
oi.product_id AS sku_id,
o.store_id,
s.city AS store_city,
p.category_id,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(oi.qty) AS total_qty,
SUM(oi.qty * oi.price) AS total_revenue,
MAX(COALESCE(promo.discount, 0)) AS max_discount_pct,
CASE WHEN MAX(COALESCE(promo.discount, 0)) > 0
THEN 1 ELSE 0 END AS has_promotion
FROM best_practice_demand_forecast.doc_orders o
JOIN best_practice_demand_forecast.doc_order_items oi ON o.order_id = oi.order_id
JOIN best_practice_demand_forecast.doc_stores s ON o.store_id = s.store_id
JOIN best_practice_demand_forecast.doc_products p ON oi.product_id = p.product_id
LEFT JOIN best_practice_demand_forecast.doc_promotions promo
ON o.promotion_id = promo.promotion_id
GROUP BY
o.order_date, oi.product_id, o.store_id, s.city, p.category_id;
SELECT
d.sku_id,
d.store_city,
d.category_id,
SUM(d.total_qty) AS hist_total_qty,
ROUND(AVG(d.total_qty), 2) AS hist_avg_daily_qty,
SUM(d.has_promotion) AS promo_days,
COUNT(DISTINCT d.sales_date) AS data_days
FROM best_practice_demand_forecast.doc_dwd_sku_store_daily d
GROUP BY d.sku_id, d.store_city, d.category_id
ORDER BY hist_total_qty DESC
LIMIT 10;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_demand_forecast.doc_gold_sku_store_features
AS
SELECT
sku_id,
store_id,
store_city,
category_id,
EXTRACT(YEAR FROM sales_date) AS yr,
EXTRACT(MONTH FROM sales_date) AS mon,
EXTRACT(DAYOFWEEK FROM sales_date) AS dow,
EXTRACT(WEEK FROM sales_date) AS week_of_year,
COUNT(DISTINCT sales_date) AS active_days,
SUM(total_qty) AS total_qty,
ROUND(AVG(total_qty), 2) AS avg_daily_qty,
MAX(total_qty) AS peak_daily_qty,
SUM(total_revenue) AS total_revenue,
ROUND(AVG(max_discount_pct), 2) AS avg_discount_pct,
SUM(has_promotion) AS promo_days,
ROUND(
SUM(CASE WHEN has_promotion = 1 THEN total_qty ELSE 0 END) /
NULLIF(SUM(CASE WHEN has_promotion = 0 THEN total_qty ELSE 0 END), 0),
2
) AS promo_lift_ratio
FROM best_practice_demand_forecast.doc_dwd_sku_store_daily
GROUP BY sku_id, store_id, store_city, category_id,
EXTRACT(YEAR FROM sales_date),
EXTRACT(MONTH FROM sales_date),
EXTRACT(DAYOFWEEK FROM sales_date),
EXTRACT(WEEK FROM sales_date);
SELECT
f.sku_id,
f.store_city,
f.forecast_date,
f.forecast_qty,
f.forecast_lower,
f.forecast_upper,
ROUND((f.forecast_upper - f.forecast_lower) / f.forecast_qty * 100, 1) AS uncertainty_pct
FROM best_practice_demand_forecast.doc_gold_forecast_results f
WHERE f.sku_id = 10
ORDER BY f.forecast_date;
SELECT
store_city,
SUM(forecast_qty) AS city_4w_forecast_qty,
COUNT(DISTINCT sku_id) AS sku_count
FROM best_practice_demand_forecast.doc_gold_forecast_results
GROUP BY store_city
ORDER BY city_4w_forecast_qty DESC;
SELECT
sku_id,
store_city,
ROUND(AVG(forecast_qty), 2) AS avg_4w_qty,
ROUND(AVG((forecast_upper - forecast_lower) / forecast_qty * 100), 1) AS avg_uncertainty_pct,
ROUND(SUM(forecast_upper), 2) AS safe_stock_ceiling
FROM best_practice_demand_forecast.doc_gold_forecast_results
GROUP BY sku_id, store_city
ORDER BY avg_uncertainty_pct DESC;
-- 先创建 API Connection(以阿里云 FC 为例,SageMaker 替换为 AWS Lambda + API Gateway)
CREATE API CONNECTION IF NOT EXISTS demand_forecast_fc_conn
PROVIDER = 'aliyun'
REGION = 'cn-hangzhou'
ROLE_ARN = 'acs:ram::xxx:role/xxx'
NAMESPACE = 'demand-forecast'
CODE_BUCKET = 'my-code-bucket';
-- 创建 External Function,封装推理调用
CREATE OR REPLACE EXTERNAL FUNCTION best_practice_demand_forecast.call_forecast_api(
sku_id INT,
store_id INT,
hist_qty STRING -- JSON 数组,如 '[3,4,2,5,3]'
)
RETURNS STRING -- JSON: {"forecast":[4.2,4.5,3.8,5.1],"lower":[...], "upper":[...]}
CONNECTION demand_forecast_fc_conn;
在 SQL 中批量调用:
SELECT
sku_id,
store_id,
best_practice_demand_forecast.call_forecast_api(
sku_id,
store_id,
TO_JSON(COLLECT_LIST(total_qty))
) AS forecast_json
FROM best_practice_demand_forecast.doc_dwd_sku_store_daily
GROUP BY sku_id, store_id;
💡 提示:External Function 适合将已上线的生产级模型(XGBoost、LSTM、自研算法)无缝接入 SQL 层,无需迁移训练框架。ZettaPark