天气 × 零售交叉分析最佳实践

将气象历史数据与门店销售数据关联,分析天气对不同商品品类销售的拉动或抑制效应,支持动态补货决策。本文以印度四城市 100 家门店、30 万订单为零售数据集,配合手动构造的天气观测数据,端到端演示 OSS PIPE → ODS → DWD → DWS → ADS 的天气增强型数仓全链路,并覆盖 External Function 集成天气 API、Dynamic Table 自动增量计算、窗口函数滑动均值三项关键能力。


概述

零售补货决策的难点在于:历史销量数据在系统内,但影响销量的天气数据在外部。两者的关联需要按城市 × 日期维度拼接,并定期刷新以反映最新气象预报。

云器 Lakehouse 通过以下组合解决核心问题:

问题解决方案
门店 POS 文件每日自动导入,无需手写消费者OSS PIPE(LIST_PURGE 模式),扫描新增文件自动摄取
天气 API 数据拉取并写入 LakehouseExternal Function 封装 HTTP 调用,SQL 层直接调用
销售 × 天气多维关联,自动增量计算Dynamic Table,声明式 SQL,系统自动调度依赖链
7/30 天滑动均值识别天气拉动效应窗口函数(
ROWS BETWEEN N PRECEDING AND CURRENT ROW
ROWS BETWEEN N PRECEDING AND CURRENT ROW
多步骤管道调度(天气拉取 → 关联计算 → 补货信号)Studio Task DAG,在
best_practices/weather_retail/
best_practices/weather_retail/
下统一管理

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 ODS 基础表(门店、订单、商品、天气)作为 Dynamic Table 上游原始表
CREATE PIPE
CREATE PIPE
创建 OSS PIPE,自动摄取门店 POS CSV
LIST_PURGE
LIST_PURGE
选项防止重复摄取
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
构建 ODS/DWD/DWS/ADS 各层增量计算表不写
REFRESH INTERVAL
REFRESH INTERVAL
,由 Studio Task 调度
AVG() OVER (ROWS BETWEEN ... AND ...)
AVG() OVER (ROWS BETWEEN ... AND ...)
计算 7/30 天滑动均值识别天气效应的滑动窗口
CASE WHEN ... THEN ...
CASE WHEN ... THEN ...
按温度区间打
temp_band
temp_band
标签
extreme_heat / hot / warm / mild / cold
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次全量刷新初次构建或 Studio Task 调度内调用

前置准备

本文所有示例在

best_practice_weather_retail
best_practice_weather_retail
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_weather_retail;

实际执行结果:

{"data":{},"time_ms":94}


数据源层:基础表与天气数据

零售基础表

数据来源:Retail Data Warehouse - 12 Table 1M+ Rows Dataset,包含 100 家门店、30 万订单、60 万订单行。

-- 门店主数据(100 家,分布在 Pune/Delhi/Mumbai/Bangalore) CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_stores ( store_id INT, city STRING ); -- 商品品类(30 个品类) CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_categories ( category_id INT, category_name STRING ); -- 商品信息 CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_products ( product_id INT, category_id INT, supplier_id INT, price DOUBLE ); -- 订单主表(300K 行) CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_orders ( order_id INT, customer_id INT, store_id INT, order_date DATE, promotion_id INT ); -- 订单行明细(600K 行) CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_order_items ( order_item_id INT, order_id INT, product_id INT, qty INT, price DOUBLE );

验证行数:

SELECT (SELECT COUNT(*) FROM best_practice_weather_retail.doc_stores) AS stores, (SELECT COUNT(*) FROM best_practice_weather_retail.doc_orders) AS orders, (SELECT COUNT(*) FROM best_practice_weather_retail.doc_order_items) AS order_items, (SELECT COUNT(*) FROM best_practice_weather_retail.doc_products) AS products, (SELECT COUNT(*) FROM best_practice_weather_retail.doc_categories) AS categories;

stores | orders | order_items | products | categories -------+--------+-------------+----------+----------- 100 | 50 | 60 | 50 | 30

(文档演示使用前 50 条订单子集,生产环境全量导入 300K+ 行。)

OSS PIPE:自动摄取门店 POS 文件

生产环境中,门店 POS 系统每日将销售记录导出为 CSV 并上传到 OSS,通过 PIPE 实现无人值守自动摄取:

-- 先建 OSS Storage Connection(替换为实际 AK/SK) CREATE STORAGE CONNECTION IF NOT EXISTS best_practice_weather_retail.conn_pos_oss TYPE = OSS ACCESS_ID = '<your-access-key-id>' ACCESS_KEY = '<your-access-key-secret>' ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'; -- 建 Volume 映射 OSS 桶路径 CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_weather_retail.vol_pos_daily TYPE = OSS BUCKET = '<your-oss-bucket>' PATH = '/retail/pos-daily/' CONNECTION = best_practice_weather_retail.conn_pos_oss; -- 创建 PIPE,扫描新增 CSV 自动写入 doc_orders CREATE PIPE IF NOT EXISTS best_practice_weather_retail.pipe_pos_orders VIRTUAL_CLUSTER = 'DEFAULT' AUTO_PURGE = TRUE AS COPY INTO best_practice_weather_retail.doc_orders FROM ( SELECT $1::INT, $2::INT, $3::INT, TO_DATE($4, 'yyyy-MM-dd'), $5::INT FROM best_practice_weather_retail.vol_pos_daily ) USING csv OPTIONS('header'='true', 'sep'=',');

天气数据:External Function 拉取 + 手动构造

方式一:通过 External Function 调用 OpenWeatherMap History API(推荐)

External Function 封装天气 API 调用,可在 SQL 中直接使用:

-- 假设已部署云函数(详见外部函数开发指南) CREATE EXTERNAL FUNCTION IF NOT EXISTS best_practice_weather_retail.fetch_weather_history( city STRING, date_str STRING ) RETURNS STRING CONNECTION = '<your-api-connection>' AS '<your-lambda-or-fc-arn>';

调用示例:

-- 拉取 Delhi 2023-06-14 天气 SELECT best_practice_weather_retail.fetch_weather_history('Delhi', '2023-06-14') AS weather_json;

返回 JSON 后解析写入

doc_weather_daily
doc_weather_daily

INSERT INTO best_practice_weather_retail.doc_weather_daily SELECT TO_DATE(date_str, 'yyyy-MM-dd') AS weather_date, city, GET_JSON_OBJECT(weather_json, '$.avg_temp_c')::DOUBLE AS avg_temp_c, GET_JSON_OBJECT(weather_json, '$.min_temp_c')::DOUBLE AS min_temp_c, GET_JSON_OBJECT(weather_json, '$.max_temp_c')::DOUBLE AS max_temp_c, GET_JSON_OBJECT(weather_json, '$.precipitation_mm')::DOUBLE AS precipitation_mm, GET_JSON_OBJECT(weather_json, '$.condition')::STRING AS weather_condition, GET_JSON_OBJECT(weather_json, '$.humidity_pct')::INT AS humidity_pct FROM ( SELECT city, date_str, best_practice_weather_retail.fetch_weather_history(city, date_str) AS weather_json FROM city_date_pairs -- 预先建好的城市 × 日期维度表 );

方式二:手动 INSERT 构造(无 API 环境时)

若暂未配置 External Function,可直接 INSERT 模拟天气观测数据,验证后续 Dynamic Table 和分析逻辑:

CREATE TABLE IF NOT EXISTS best_practice_weather_retail.doc_weather_daily ( weather_date DATE, city STRING, avg_temp_c DOUBLE, min_temp_c DOUBLE, max_temp_c DOUBLE, precipitation_mm DOUBLE, weather_condition STRING, -- sunny / rainy / cloudy / heatwave / cold humidity_pct INT );

从本地 CSV 导入数据(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/your/doc_weather_daily.csv' TO USER VOLUME FILE 'doc_weather_daily.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_weather_retail.doc_weather_daily FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_weather_daily.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_weather_retail.doc_weather_daily VALUES (CAST('2021-08-26' AS DATE), 'Pune', 28.5, 22.0, 34.0, 12.3, 'rainy', 82), (CAST('2022-03-19' AS DATE), 'Delhi', 24.0, 16.5, 31.5, 0.0, 'sunny', 45), (CAST('2021-01-21' AS DATE), 'Delhi', 14.0, 8.0, 20.0, 0.0, 'sunny', 40), (CAST('2021-01-16' AS DATE), 'Mumbai', 26.5, 20.0, 33.0, 0.0, 'sunny', 62), (CAST('2022-09-14' AS DATE), 'Delhi', 29.5, 24.0, 35.0, 8.5, 'cloudy', 75), (CAST('2023-02-03' AS DATE), 'Mumbai', 28.0, 22.5, 33.5, 0.0, 'sunny', 58), (CAST('2022-10-29' AS DATE), 'Delhi', 22.5, 15.0, 30.0, 2.1, 'cloudy', 52), (CAST('2022-10-10' AS DATE), 'Bangalore', 21.0, 16.0, 26.0, 18.7, 'rainy', 88), (CAST('2021-07-09' AS DATE), 'Bangalore', 22.0, 18.0, 26.0, 45.2, 'rainy', 92), (CAST('2022-06-03' AS DATE), 'Delhi', 38.5, 30.0, 45.0, 0.0, 'heatwave', 28), (CAST('2021-04-26' AS DATE), 'Mumbai', 32.5, 26.0, 39.0, 0.0, 'sunny', 65), (CAST('2023-01-05' AS DATE), 'Mumbai', 27.0, 20.0, 34.0, 0.0, 'sunny', 60), (CAST('2023-06-14' AS DATE), 'Delhi', 40.0, 33.5, 46.5, 0.0, 'heatwave', 22), (CAST('2022-03-08' AS DATE), 'Mumbai', 29.5, 23.0, 36.0, 0.0, 'sunny', 62), (CAST('2022-08-06' AS DATE), 'Mumbai', 29.0, 25.0, 33.0, 22.1, 'rainy', 86), (CAST('2023-05-31' AS DATE), 'Bangalore', 23.5, 18.0, 29.0, 0.0, 'sunny', 55), (CAST('2022-02-03' AS DATE), 'Pune', 22.0, 14.5, 29.5, 0.0, 'sunny', 48), (CAST('2020-04-27' AS DATE), 'Delhi', 36.0, 28.0, 44.0, 0.0, 'heatwave', 25), (CAST('2023-05-02' AS DATE), 'Bangalore', 24.0, 18.5, 29.5, 0.0, 'sunny', 52), (CAST('2021-12-21' AS DATE), 'Delhi', 15.5, 9.0, 22.0, 0.0, 'cold', 35);

验证天气数据行数:

SELECT COUNT(*) AS weather_rows FROM best_practice_weather_retail.doc_weather_daily;

weather_rows ------------ 20


ODS 层:销售原始宽表

ODS 层将三张事实表(订单、门店、订单行)关联为一张宽表,方便后续 DWD 层直接 JOIN 天气数据。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.ods_sales_raw AS SELECT o.order_id, o.customer_id, o.order_date, o.promotion_id, s.store_id, s.city AS store_city, oi.order_item_id, oi.product_id, oi.qty, oi.price AS unit_price, oi.qty * oi.price AS line_revenue FROM best_practice_weather_retail.doc_orders o JOIN best_practice_weather_retail.doc_stores s ON o.store_id = s.store_id JOIN best_practice_weather_retail.doc_order_items oi ON o.order_id = oi.order_id;

Studio Task 调度

在 Studio

best_practices/weather_retail/
best_practices/weather_retail/
路径下创建刷新任务:

# 创建任务 cz-cli task create "refresh_ods_sales_raw" -p skill_test --type SQL --folder best_practices/weather_retail # 写入任务内容 cz-cli task save-content refresh_ods_sales_raw -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_weather_retail.ods_sales_raw;" # 设置每日 01:00 调度 cz-cli task save-cron refresh_ods_sales_raw -p skill_test --cron "0 1 * * *"

手动触发首次刷新

REFRESH DYNAMIC TABLE best_practice_weather_retail.ods_sales_raw; SELECT COUNT(*) AS ods_rows FROM best_practice_weather_retail.ods_sales_raw;

ods_rows -------- 60

60 行:50 条订单 × 平均 1.2 个订单行,验证 JOIN 逻辑正常。


DWD 层:销售 × 天气事实宽表

DWD 层是本文的核心。在 ODS 宽表基础上,按门店城市 × 订单日期维度 LEFT JOIN 天气数据,并附加商品品类、温度区间标签。LEFT JOIN 确保无天气数据的订单行不被过滤丢失。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.dwd_sales_weather_fact AS SELECT f.order_id, f.order_date, f.store_city, f.product_id, p.category_id, c.category_name, f.qty, f.unit_price, f.line_revenue, -- 天气维度(来自 doc_weather_daily,按城市 × 日期关联) w.avg_temp_c, w.min_temp_c, w.max_temp_c, w.precipitation_mm, w.weather_condition, w.humidity_pct, -- 温度区间标签,用于后续聚合分析 CASE WHEN w.avg_temp_c >= 35 THEN 'extreme_heat' WHEN w.avg_temp_c >= 28 THEN 'hot' WHEN w.avg_temp_c >= 22 THEN 'warm' WHEN w.avg_temp_c >= 15 THEN 'mild' ELSE 'cold' END AS temp_band FROM best_practice_weather_retail.ods_sales_raw f JOIN best_practice_weather_retail.doc_products p ON f.product_id = p.product_id JOIN best_practice_weather_retail.doc_categories c ON p.category_id = c.category_id LEFT JOIN best_practice_weather_retail.doc_weather_daily w ON f.order_date = w.weather_date AND f.store_city = w.city;

温度区间标准说明

区间定义典型场景
extreme_heat
extreme_heat
≥ 35°C北京 / 德里夏季高温,清凉饮品拉动明显
hot
hot
28–35°C普通夏日,防晒品、空调相关品类走量
warm
warm
22–28°C舒适温度区间,常规销售
mild
mild
15–22°C秋季,部分品类季节性需求开始上升
cold
cold
< 15°C冬季,保暖类、热饮类品类受益

Studio Task 调度

cz-cli task save-content refresh_dwd_sales_weather_fact -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_weather_retail.dwd_sales_weather_fact;" cz-cli task save-cron refresh_dwd_sales_weather_fact -p skill_test --cron "0 2 * * *"

刷新并验证

REFRESH DYNAMIC TABLE best_practice_weather_retail.dwd_sales_weather_fact; SELECT COUNT(*) AS dwd_rows FROM best_practice_weather_retail.dwd_sales_weather_fact;

dwd_rows -------- 60

按温度区间查看销售分布:

SELECT temp_band, COUNT(*) AS order_cnt, ROUND(SUM(line_revenue), 0) AS total_revenue FROM best_practice_weather_retail.dwd_sales_weather_fact WHERE weather_condition IS NOT NULL GROUP BY temp_band ORDER BY total_revenue DESC;

temp_band | order_cnt | total_revenue -------------+-----------+-------------- hot | 11 | 63395 warm | 10 | 54881 cold | 3 | 23101 mild | 1 | 6312 extreme_heat | 3 | 5500

结果显示:

hot
hot
warm
warm
区间贡献了绝大多数收入(约 ¥118K),而
extreme_heat
extreme_heat
(高温热浪)区间收入偏低(¥5,500),说明极端高温对整体消费有明显抑制作用。


DWS 层:品类气候敏感度指标

DWS 层以品类 × 天气状况为维度聚合,输出每个品类在不同天气条件下的销售量、销售额和平均温度,为补货决策提供数据支撑。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.dws_category_climate_sensitivity AS SELECT category_name, weather_condition, temp_band, COUNT(DISTINCT order_id) AS order_count, SUM(qty) AS total_qty, ROUND(SUM(line_revenue), 0) AS total_revenue, ROUND(AVG(avg_temp_c), 1) AS avg_temp, ROUND(AVG(precipitation_mm), 1) AS avg_precip FROM best_practice_weather_retail.dwd_sales_weather_fact WHERE weather_condition IS NOT NULL GROUP BY category_name, weather_condition, temp_band;

Studio Task 调度

cz-cli task save-content refresh_dws_category_climate -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_weather_retail.dws_category_climate_sensitivity;" cz-cli task save-cron refresh_dws_category_climate -p skill_test --cron "0 3 * * *"

查询气候敏感度 Top 10

SELECT category_name, weather_condition, total_revenue, order_count FROM best_practice_weather_retail.dws_category_climate_sensitivity ORDER BY total_revenue DESC LIMIT 10;

category_name | weather_condition | total_revenue | order_count --------------+-------------------+---------------+------------ Cat_10 | sunny | 18986 | 1 Cat_18 | sunny | 17648 | 1 Cat_14 | sunny | 10266 | 1 Cat_8 | sunny | 10256 | 1 Cat_9 | rainy | 9134 | 1 Cat_1 | sunny | 8414 | 1 Cat_10 | cloudy | 8259 | 1 Cat_19 | sunny | 7931 | 2 Cat_6 | rainy | 7902 | 1 Cat_8 | rainy | 6991 | 1

Cat_10
Cat_10
在晴天和阴天均有较高销售额,天气适应性强;
Cat_9
Cat_9
Cat_6
Cat_6
在雨天表现突出,可能是雨具或室内娱乐类商品。

天气条件对单均值的影响

SELECT weather_condition, ROUND(AVG(unit_price * qty), 0) AS avg_order_value, COUNT(DISTINCT order_id) AS orders, SUM(qty) AS total_qty FROM best_practice_weather_retail.dwd_sales_weather_fact WHERE weather_condition IS NOT NULL GROUP BY weather_condition ORDER BY avg_order_value DESC;

weather_condition | avg_order_value | orders | total_qty ------------------+-----------------+--------+----------- sunny | 6880 | 9 | 33 cold | 6312 | 1 | 4 rainy | 5031 | 3 | 9 cloudy | 3979 | 2 | 8 heatwave | 1833 | 3 | 4

晴天单均最高(¥6,880),热浪天单均最低(¥1,833),差距约 3.75 倍。这一指标可直接驱动晴天天气预报到来时的预防性备货。


窗口函数:7 天滑动均值识别天气效应

滑动均值可以平滑偶发订单的噪声,凸显天气持续影响的趋势。

SELECT category_name, order_date, SUM(line_revenue) AS daily_revenue, ROUND(AVG(SUM(line_revenue)) OVER ( PARTITION BY category_name ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW ), 0) AS revenue_7d_avg, ROUND(AVG(avg_temp_c), 1) AS avg_temp FROM best_practice_weather_retail.dwd_sales_weather_fact WHERE weather_condition IS NOT NULL GROUP BY category_name, order_date, avg_temp_c ORDER BY category_name, order_date LIMIT 15;

category_name | order_date | daily_revenue | revenue_7d_avg | avg_temp --------------+------------+---------------+----------------+--------- Cat_1 | 2021-01-16 | 8414 | 8414 | 26.5 Cat_10 | 2021-01-16 | 5506 | 5506 | 26.5 Cat_10 | 2021-01-21 | 18986 | 12246 | 14.0 Cat_10 | 2022-09-14 | 8259 | 10917 | 29.5 Cat_14 | 2022-03-19 | 10266 | 10266 | 24.0 Cat_15 | 2021-12-21 | 6312 | 6312 | 15.5 Cat_17 | 2022-06-03 | 2634 | 2634 | 38.5 Cat_18 | 2022-03-08 | 17648 | 17648 | 29.5 Cat_19 | 2022-02-03 | 5913 | 5913 | 22.0 Cat_19 | 2022-10-29 | 3155 | 4534 | 22.5 Cat_19 | 2023-05-02 | 2018 | 3695 | 24.0 Cat_26 | 2023-06-14 | 2234 | 2234 | 40.0 Cat_29 | 2020-04-27 | 632 | 632 | 36.0 Cat_29 | 2022-09-14 | 1844 | 1238 | 29.5 Cat_29 | 2023-02-03 | 3688 | 2055 | 28.0

可以观察到:

Cat_10
Cat_10
在 2021-01-21(气温 14°C,偏冷)当天收入(¥18,986)大幅高于 7 日均值(¥12,246),说明该品类可能是保暖类商品;
Cat_26
Cat_26
在 2023-06-14 气温 40°C 极端高温时收入仅 ¥2,234,低于前期均值,印证了热浪对部分品类的抑制效应。


ADS 层:补货信号输出

ADS 层将 DWD 层的明细数据聚合为城市 × 品类 × 天气状况粒度的补货建议,输出

INCREASE_STOCK
INCREASE_STOCK
REDUCE_STOCK
REDUCE_STOCK
MONITOR
MONITOR
NORMAL
NORMAL
四档信号。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_weather_retail.ads_replenishment_signal AS SELECT f.store_city, f.category_name, f.weather_condition, f.temp_band, SUM(f.qty) AS total_qty, ROUND(SUM(f.line_revenue), 0) AS total_revenue, COUNT(DISTINCT f.order_id) AS order_count, ROUND(AVG(f.avg_temp_c), 1) AS avg_temp, ROUND(SUM(f.qty) / NULLIF(COUNT(DISTINCT f.order_id), 0), 1) AS avg_qty_per_order, CASE WHEN SUM(f.qty) >= 4 AND f.weather_condition IN ('sunny', 'hot') THEN 'INCREASE_STOCK' WHEN f.weather_condition = 'heatwave' AND SUM(f.line_revenue) < 3000 THEN 'REDUCE_STOCK' WHEN f.weather_condition IN ('rainy', 'cloudy') THEN 'MONITOR' ELSE 'NORMAL' END AS replenishment_action FROM best_practice_weather_retail.dwd_sales_weather_fact f WHERE f.weather_condition IS NOT NULL GROUP BY f.store_city, f.category_name, f.weather_condition, f.temp_band;

补货规则说明

规则条件业务含义
INCREASE_STOCK
INCREASE_STOCK
晴/热天累计销量 ≥ 4 件阳光好天气持续拉动需求,预防性增加备货
REDUCE_STOCK
REDUCE_STOCK
热浪天且收入 < ¥3,000极端高温抑制消费,减少易腐或时效性商品备货
MONITOR
MONITOR
雨天或阴天天气不稳定,关注实时销速,不主动调整
NORMAL
NORMAL
其余情况正常备货节奏

Studio Task 调度

cz-cli task save-content refresh_ads_replenishment -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_weather_retail.ads_replenishment_signal;" cz-cli task save-cron refresh_ads_replenishment -p skill_test --cron "30 3 * * *"

在 Studio 任务上还可以附加数据质量规则:如

INCREASE_STOCK
INCREASE_STOCK
信号行数为 0 时触发告警,提醒排查上游天气数据是否断连。

查询补货信号

SELECT store_city, category_name, weather_condition, total_qty, total_revenue, replenishment_action FROM best_practice_weather_retail.ads_replenishment_signal ORDER BY total_revenue DESC LIMIT 10;

store_city | category_name | weather_condition | total_qty | total_revenue | replenishment_action -----------+---------------+-------------------+-----------+---------------+--------------------- Delhi | Cat_10 | sunny | 5 | 18986 | INCREASE_STOCK Mumbai | Cat_18 | sunny | 4 | 17648 | INCREASE_STOCK Delhi | Cat_14 | sunny | 3 | 10266 | NORMAL Mumbai | Cat_8 | sunny | 4 | 10256 | INCREASE_STOCK Bangalore | Cat_9 | rainy | 2 | 9134 | MONITOR Mumbai | Cat_1 | sunny | 2 | 8414 | NORMAL Delhi | Cat_10 | cloudy | 3 | 8259 | MONITOR Mumbai | Cat_6 | rainy | 2 | 7902 | MONITOR Pune | Cat_8 | rainy | 3 | 6991 | MONITOR Delhi | Cat_15 | cold | 4 | 6312 | NORMAL

补货信号汇总:

SELECT replenishment_action, COUNT(*) AS signal_count, SUM(total_qty) AS total_qty, SUM(total_revenue) AS total_revenue FROM best_practice_weather_retail.ads_replenishment_signal GROUP BY replenishment_action ORDER BY total_revenue DESC;

replenishment_action | signal_count | total_qty | total_revenue ---------------------+--------------+-----------+-------------- NORMAL | 11 | 24 | 55747 INCREASE_STOCK | 3 | 13 | 46890 MONITOR | 9 | 17 | 45052 REDUCE_STOCK | 3 | 4 | 5500

3 个

INCREASE_STOCK
INCREASE_STOCK
信号(Delhi Cat_10、Mumbai Cat_18、Mumbai Cat_8)合计拉动收入 ¥46,890,是主动备货的优先目标;3 个
REDUCE_STOCK
REDUCE_STOCK
信号均来自极端高温天气,对应收入仅 ¥5,500,减少这些品类库存可降低呆滞损失。


Studio Task 完整调度链

以下是

best_practices/weather_retail/
best_practices/weather_retail/
下 4 个 Studio Task 的完整调度链:

任务名SQLCron依赖
refresh_ods_sales_raw
refresh_ods_sales_raw
REFRESH DYNAMIC TABLE ... .ods_sales_raw
REFRESH DYNAMIC TABLE ... .ods_sales_raw
0 1 * * *
0 1 * * *
(每日 01:00)
refresh_dwd_sales_weather_fact
refresh_dwd_sales_weather_fact
REFRESH DYNAMIC TABLE ... .dwd_sales_weather_fact
REFRESH DYNAMIC TABLE ... .dwd_sales_weather_fact
0 2 * * *
0 2 * * *
(每日 02:00)
ods_sales_raw 刷新完成
refresh_dws_category_climate
refresh_dws_category_climate
REFRESH DYNAMIC TABLE ... .dws_category_climate_sensitivity
REFRESH DYNAMIC TABLE ... .dws_category_climate_sensitivity
0 3 * * *
0 3 * * *
(每日 03:00)
dwd_sales_weather_fact 刷新完成
refresh_ads_replenishment
refresh_ads_replenishment
REFRESH DYNAMIC TABLE ... .ads_replenishment_signal
REFRESH DYNAMIC TABLE ... .ads_replenishment_signal
30 3 * * *
30 3 * * *
(每日 03:30)
dws 刷新完成

# 查看任务列表 cz-cli task list -p skill_test

refresh_ads_replenishment
refresh_ads_replenishment
任务上可追加:

  • 数据质量规则:
    INCREASE_STOCK
    INCREASE_STOCK
    行数为 0 时告警
  • 告警通知:接入飞书/钉钉 webhook,每日推送补货信号摘要

注意事项

  • Dynamic Table DDL 中不写
    REFRESH INTERVAL
    REFRESH INTERVAL
    ,所有调度通过 Studio Task 管理。这样可以在同一任务上附加数据质量检查、告警和依赖配置,不需要修改 DDL。
  • DWD 层使用
    LEFT JOIN
    LEFT JOIN
    天气表,确保无天气覆盖的订单行保留在事实表中,不影响销售汇总的完整性。
    WHERE weather_condition IS NOT NULL
    WHERE weather_condition IS NOT NULL
    过滤只在需要天气数据的分析中使用。
  • temp_band
    temp_band
    分类基于阈值规则,可根据城市气候特征调整。例如印度城市 28°C 属于普通气温,若分析欧洲市场需将阈值下调。
  • OSS PIPE 的
    AUTO_PURGE = TRUE
    AUTO_PURGE = TRUE
    会删除源文件,生产使用前确认是否需要保留原始文件用于审计回溯。
  • External Function 调用天气 API 会产生 API 费用和云函数计算费用,建议按城市 × 日期批量拉取,而非每行订单单独调用。
  • NULLIF(COUNT(DISTINCT order_id), 0)
    NULLIF(COUNT(DISTINCT order_id), 0)
    防止除零错误,ADS 层引用此模式时注意保留。
  • Bloomfilter Index 适合
    store_city
    store_city
    category_name
    category_name
    等高基数过滤列,生产环境建议在 DWD 层主表上创建。

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询