-- 门店主数据(100 家,分布在 Pune/Delhi/Mumbai/Bangalore)
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_stores (
store_id INT,
city STRING
);
-- 商品品类(30 个品类)
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_categories (
category_id INT,
category_name STRING
);
-- 商品信息
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_products (
product_id INT,
category_id INT,
supplier_id INT,
price DOUBLE
);
-- 订单主表(300K 行)
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_orders (
order_id INT,
customer_id INT,
store_id INT,
order_date DATE,
promotion_id INT
);
-- 订单行明细(600K 行)
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_order_items (
order_item_id INT,
order_id INT,
product_id INT,
qty INT,
price DOUBLE
);
验证行数:
SELECT
(SELECT COUNT(*) FROM best_practice_weather_retail.doc_stores) AS stores,
(SELECT COUNT(*) FROM best_practice_weather_retail.doc_orders) AS orders,
(SELECT COUNT(*) FROM best_practice_weather_retail.doc_order_items) AS order_items,
(SELECT COUNT(*) FROM best_practice_weather_retail.doc_products) AS products,
(SELECT COUNT(*) FROM best_practice_weather_retail.doc_categories) AS categories;
-- 先建 OSS Storage Connection(替换为实际 AK/SK)
CREATE STORAGE CONNECTION IF NOT EXISTS best_practice_weather_retail.conn_pos_oss
TYPE = OSS
ACCESS_ID = '<your-access-key-id>'
ACCESS_KEY = '<your-access-key-secret>'
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com';
-- 建 Volume 映射 OSS 桶路径
CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_weather_retail.vol_pos_daily
TYPE = OSS
BUCKET = '<your-oss-bucket>'
PATH = '/retail/pos-daily/'
CONNECTION = best_practice_weather_retail.conn_pos_oss;
-- 创建 PIPE,扫描新增 CSV 自动写入 doc_orders
CREATE PIPE IF NOT EXISTS best_practice_weather_retail.pipe_pos_orders
VIRTUAL_CLUSTER = 'DEFAULT'
AUTO_PURGE = TRUE
AS
COPY INTO best_practice_weather_retail.doc_orders
FROM (
SELECT $1::INT, $2::INT, $3::INT, TO_DATE($4, 'yyyy-MM-dd'), $5::INT
FROM best_practice_weather_retail.vol_pos_daily
)
USING csv
OPTIONS('header'='true', 'sep'=',');
⚠️ 注意:
AUTO_PURGE = TRUE
AUTO_PURGE = TRUE
会在文件成功摄取后自动从 Volume 删除源文件,防止重复摄取。若需保留原始文件用于审计,改为
AUTO_PURGE = FALSE
AUTO_PURGE = FALSE
并定期归档。
💡 提示:本文演示环境直接使用
INSERT INTO
INSERT INTO
写入测试数据,生产环境替换为 PIPE 即可,下游 Dynamic Table 逻辑完全相同。
天气数据:External Function 拉取 + 手动构造
方式一:通过 External Function 调用 OpenWeatherMap History API(推荐)
External Function 封装天气 API 调用,可在 SQL 中直接使用:
-- 假设已部署云函数(详见外部函数开发指南)
CREATE EXTERNAL FUNCTION IF NOT EXISTS best_practice_weather_retail.fetch_weather_history(
city STRING,
date_str STRING
)
RETURNS STRING
CONNECTION = '<your-api-connection>'
AS '<your-lambda-or-fc-arn>';
调用示例:
-- 拉取 Delhi 2023-06-14 天气
SELECT best_practice_weather_retail.fetch_weather_history('Delhi', '2023-06-14') AS weather_json;
返回 JSON 后解析写入
doc_weather_daily
doc_weather_daily
:
INSERT INTO best_practice_weather_retail.doc_weather_daily
SELECT
TO_DATE(date_str, 'yyyy-MM-dd') AS weather_date,
city,
GET_JSON_OBJECT(weather_json, '$.avg_temp_c')::DOUBLE AS avg_temp_c,
GET_JSON_OBJECT(weather_json, '$.min_temp_c')::DOUBLE AS min_temp_c,
GET_JSON_OBJECT(weather_json, '$.max_temp_c')::DOUBLE AS max_temp_c,
GET_JSON_OBJECT(weather_json, '$.precipitation_mm')::DOUBLE AS precipitation_mm,
GET_JSON_OBJECT(weather_json, '$.condition')::STRING AS weather_condition,
GET_JSON_OBJECT(weather_json, '$.humidity_pct')::INT AS humidity_pct
FROM (
SELECT city, date_str,
best_practice_weather_retail.fetch_weather_history(city, date_str) AS weather_json
FROM city_date_pairs -- 预先建好的城市 × 日期维度表
);
CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_weather_daily (
weather_date DATE,
city STRING,
avg_temp_c DOUBLE,
min_temp_c DOUBLE,
max_temp_c DOUBLE,
precipitation_mm DOUBLE,
weather_condition STRING, -- sunny / rainy / cloudy / heatwave / cold
humidity_pct INT
);
从本地 CSV 导入数据(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/doc_weather_daily.csv' TO USER VOLUME FILE 'doc_weather_daily.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_weather_retail.doc_weather_daily
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_weather_daily.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_weather_retail.doc_weather_daily VALUES
(CAST('2021-08-26' AS DATE), 'Pune', 28.5, 22.0, 34.0, 12.3, 'rainy', 82),
(CAST('2022-03-19' AS DATE), 'Delhi', 24.0, 16.5, 31.5, 0.0, 'sunny', 45),
(CAST('2021-01-21' AS DATE), 'Delhi', 14.0, 8.0, 20.0, 0.0, 'sunny', 40),
(CAST('2021-01-16' AS DATE), 'Mumbai', 26.5, 20.0, 33.0, 0.0, 'sunny', 62),
(CAST('2022-09-14' AS DATE), 'Delhi', 29.5, 24.0, 35.0, 8.5, 'cloudy', 75),
(CAST('2023-02-03' AS DATE), 'Mumbai', 28.0, 22.5, 33.5, 0.0, 'sunny', 58),
(CAST('2022-10-29' AS DATE), 'Delhi', 22.5, 15.0, 30.0, 2.1, 'cloudy', 52),
(CAST('2022-10-10' AS DATE), 'Bangalore', 21.0, 16.0, 26.0, 18.7, 'rainy', 88),
(CAST('2021-07-09' AS DATE), 'Bangalore', 22.0, 18.0, 26.0, 45.2, 'rainy', 92),
(CAST('2022-06-03' AS DATE), 'Delhi', 38.5, 30.0, 45.0, 0.0, 'heatwave', 28),
(CAST('2021-04-26' AS DATE), 'Mumbai', 32.5, 26.0, 39.0, 0.0, 'sunny', 65),
(CAST('2023-01-05' AS DATE), 'Mumbai', 27.0, 20.0, 34.0, 0.0, 'sunny', 60),
(CAST('2023-06-14' AS DATE), 'Delhi', 40.0, 33.5, 46.5, 0.0, 'heatwave', 22),
(CAST('2022-03-08' AS DATE), 'Mumbai', 29.5, 23.0, 36.0, 0.0, 'sunny', 62),
(CAST('2022-08-06' AS DATE), 'Mumbai', 29.0, 25.0, 33.0, 22.1, 'rainy', 86),
(CAST('2023-05-31' AS DATE), 'Bangalore', 23.5, 18.0, 29.0, 0.0, 'sunny', 55),
(CAST('2022-02-03' AS DATE), 'Pune', 22.0, 14.5, 29.5, 0.0, 'sunny', 48),
(CAST('2020-04-27' AS DATE), 'Delhi', 36.0, 28.0, 44.0, 0.0, 'heatwave', 25),
(CAST('2023-05-02' AS DATE), 'Bangalore', 24.0, 18.5, 29.5, 0.0, 'sunny', 52),
(CAST('2021-12-21' AS DATE), 'Delhi', 15.5, 9.0, 22.0, 0.0, 'cold', 35);
验证天气数据行数:
SELECT COUNT(*) AS weather_rows FROM best_practice_weather_retail.doc_weather_daily;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.ods_sales_raw
AS
SELECT
o.order_id,
o.customer_id,
o.order_date,
o.promotion_id,
s.store_id,
s.city AS store_city,
oi.order_item_id,
oi.product_id,
oi.qty,
oi.price AS unit_price,
oi.qty * oi.price AS line_revenue
FROM best_practice_weather_retail.doc_orders o
JOIN best_practice_weather_retail.doc_stores s ON o.store_id = s.store_id
JOIN best_practice_weather_retail.doc_order_items oi ON o.order_id = oi.order_id;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.dwd_sales_weather_fact
AS
SELECT
f.order_id,
f.order_date,
f.store_city,
f.product_id,
p.category_id,
c.category_name,
f.qty,
f.unit_price,
f.line_revenue,
-- 天气维度(来自 doc_weather_daily,按城市 × 日期关联)
w.avg_temp_c,
w.min_temp_c,
w.max_temp_c,
w.precipitation_mm,
w.weather_condition,
w.humidity_pct,
-- 温度区间标签,用于后续聚合分析
CASE
WHEN w.avg_temp_c >= 35 THEN 'extreme_heat'
WHEN w.avg_temp_c >= 28 THEN 'hot'
WHEN w.avg_temp_c >= 22 THEN 'warm'
WHEN w.avg_temp_c >= 15 THEN 'mild'
ELSE 'cold'
END AS temp_band
FROM best_practice_weather_retail.ods_sales_raw f
JOIN best_practice_weather_retail.doc_products p ON f.product_id = p.product_id
JOIN best_practice_weather_retail.doc_categories c ON p.category_id = c.category_id
LEFT JOIN best_practice_weather_retail.doc_weather_daily w
ON f.order_date = w.weather_date
AND f.store_city = w.city;
REFRESH DYNAMIC TABLE best_practice_weather_retail.dwd_sales_weather_fact;
SELECT COUNT(*) AS dwd_rows FROM best_practice_weather_retail.dwd_sales_weather_fact;
dwd_rows
--------
60
按温度区间查看销售分布:
SELECT
temp_band,
COUNT(*) AS order_cnt,
ROUND(SUM(line_revenue), 0) AS total_revenue
FROM best_practice_weather_retail.dwd_sales_weather_fact
WHERE weather_condition IS NOT NULL
GROUP BY temp_band
ORDER BY total_revenue DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.dws_category_climate_sensitivity
AS
SELECT
category_name,
weather_condition,
temp_band,
COUNT(DISTINCT order_id) AS order_count,
SUM(qty) AS total_qty,
ROUND(SUM(line_revenue), 0) AS total_revenue,
ROUND(AVG(avg_temp_c), 1) AS avg_temp,
ROUND(AVG(precipitation_mm), 1) AS avg_precip
FROM best_practice_weather_retail.dwd_sales_weather_fact
WHERE weather_condition IS NOT NULL
GROUP BY category_name, weather_condition, temp_band;
SELECT
category_name,
weather_condition,
total_revenue,
order_count
FROM best_practice_weather_retail.dws_category_climate_sensitivity
ORDER BY total_revenue DESC
LIMIT 10;
SELECT
weather_condition,
ROUND(AVG(unit_price * qty), 0) AS avg_order_value,
COUNT(DISTINCT order_id) AS orders,
SUM(qty) AS total_qty
FROM best_practice_weather_retail.dwd_sales_weather_fact
WHERE weather_condition IS NOT NULL
GROUP BY weather_condition
ORDER BY avg_order_value DESC;
SELECT
category_name,
order_date,
SUM(line_revenue) AS daily_revenue,
ROUND(AVG(SUM(line_revenue)) OVER (
PARTITION BY category_name
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
), 0) AS revenue_7d_avg,
ROUND(AVG(avg_temp_c), 1) AS avg_temp
FROM best_practice_weather_retail.dwd_sales_weather_fact
WHERE weather_condition IS NOT NULL
GROUP BY category_name, order_date, avg_temp_c
ORDER BY category_name, order_date
LIMIT 15;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.ads_replenishment_signal
AS
SELECT
f.store_city,
f.category_name,
f.weather_condition,
f.temp_band,
SUM(f.qty) AS total_qty,
ROUND(SUM(f.line_revenue), 0) AS total_revenue,
COUNT(DISTINCT f.order_id) AS order_count,
ROUND(AVG(f.avg_temp_c), 1) AS avg_temp,
ROUND(SUM(f.qty) / NULLIF(COUNT(DISTINCT f.order_id), 0), 1) AS avg_qty_per_order,
CASE
WHEN SUM(f.qty) >= 4
AND f.weather_condition IN ('sunny', 'hot')
THEN 'INCREASE_STOCK'
WHEN f.weather_condition = 'heatwave'
AND SUM(f.line_revenue) < 3000
THEN 'REDUCE_STOCK'
WHEN f.weather_condition IN ('rainy', 'cloudy')
THEN 'MONITOR'
ELSE 'NORMAL'
END AS replenishment_action
FROM best_practice_weather_retail.dwd_sales_weather_fact f
WHERE f.weather_condition IS NOT NULL
GROUP BY f.store_city, f.category_name, f.weather_condition, f.temp_band;
SELECT
store_city,
category_name,
weather_condition,
total_qty,
total_revenue,
replenishment_action
FROM best_practice_weather_retail.ads_replenishment_signal
ORDER BY total_revenue DESC
LIMIT 10;
SELECT
replenishment_action,
COUNT(*) AS signal_count,
SUM(total_qty) AS total_qty,
SUM(total_revenue) AS total_revenue
FROM best_practice_weather_retail.ads_replenishment_signal
GROUP BY replenishment_action
ORDER BY total_revenue DESC;