Build a Supply Chain and Logistics Tracking Data Warehouse

供应链数仓面临三条数据管道并行运行、各系统延迟不一的挑战:OMS 订单状态需要秒级可见、WMS 库存快照每日批量同步、TMS 物流 EDI 文件周期性到达。本文用 Kaggle 零售数据集(订单、库存、物流、供应商,覆盖完整 ODS→DWD→DWS→ADS 链路)演示如何用云器 Lakehouse 将三路异构数据整合为统一的供应链可见性数仓,监控 SKU 库存周转和运单准时率。


概述

问题云器解决方案
OMS 订单状态变更需实时同步入仓,不能有小时级延迟PostgreSQL CDC 实时同步,订单状态变更秒级写入 ODS
WMS 仓储库存数据量大,按仓库分区查询历史快照MySQL 多表离线同步 +
PARTITIONED BY (warehouse_id, dt)
PARTITIONED BY (warehouse_id, dt)
加速分区裁剪
物流商提供 EDI 文件,需要定期批量导入OSS PIPE 持续监听存储桶,新文件自动触发 COPY INTO
多层聚合(DWD→DWS→ADS)依赖链复杂,需要自动编排Dynamic Table 级联刷新 + Studio Task 依赖调度,DWD 刷新后自动触发下游
库存告警需要分钟级感知,供应商 SLA 报表每日更新Studio Task 按层配置调度时间,告警表早于 SLA 报表刷新

涉及的 SQL 命令

命令 / 功能用途说明
CREATE TABLE ... PARTITIONED BY
CREATE TABLE ... PARTITIONED BY
创建 ODS 原始层分区表按仓库 ID 和日期分区,加速历史区间扫描
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 DWD/DWS/ADS 动态表不在 DDL 中写
REFRESH INTERVAL
REFRESH INTERVAL
,刷新调度通过 Studio Task 管理
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次动态表刷新首次建表后或调试时使用
CASE WHEN
CASE WHEN
派生字段(delivery_flag、stock_status、sla_status)标准化多源状态码为统一业务语义
DATEDIFF
DATEDIFF
计算在途天数
transit_days
transit_days
支持
COALESCE
COALESCE
处理未签收运单
DATE_FORMAT
DATE_FORMAT
统计月份分组(
yyyy-MM
yyyy-MM
用于 ADS 月度 SLA 报表
NULLIF
NULLIF
避免除零异常计算 SLA 达标率时除以
NULLIF(count, 0)
NULLIF(count, 0)

前置准备

用独立 Schema 隔离本文所有测试表:

CREATE SCHEMA IF NOT EXISTS best_practice_supply_chain;

cz-cli sql "CREATE SCHEMA IF NOT EXISTS best_practice_supply_chain" -p skill_test --write

执行结果:

{"data":{},"time_ms":101}


ODS 层:三路异构数据接入

ODS 层对应三个源系统,各用不同摄取方式接入。

建表

OMS 订单表(PostgreSQL CDC 目标表)

CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_orders ( order_id BIGINT, order_date DATE, customer_id BIGINT, store_id INT, status STRING, total_amount DECIMAL(12,2), currency STRING, created_at TIMESTAMP, updated_at TIMESTAMP ) COMMENT 'ODS: raw orders from OMS (synced via PostgreSQL CDC)' PARTITIONED BY (dt STRING);

OMS 订单明细表(PostgreSQL CDC 目标表)

CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_order_items ( item_id BIGINT, order_id BIGINT, product_id BIGINT, sku_code STRING, quantity INT, unit_price DECIMAL(10,2), discount DECIMAL(10,2), warehouse_id INT, created_at TIMESTAMP ) COMMENT 'ODS: raw order line items from OMS' PARTITIONED BY (dt STRING);

TMS 运单表(OSS PIPE 目标表)

CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_shipments ( shipment_id BIGINT, order_id BIGINT, carrier_code STRING, tracking_number STRING, origin_warehouse INT, dest_city STRING, dest_province STRING, shipped_at TIMESTAMP, expected_delivery DATE, actual_delivery DATE, status STRING, created_at TIMESTAMP ) COMMENT 'ODS: logistics shipment events from TMS / EDI files (via OSS PIPE)' PARTITIONED BY (dt STRING);

WMS 供应商主数据表

CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_suppliers ( supplier_id INT, supplier_name STRING, contact_name STRING, country STRING, city STRING, sla_days INT, tier STRING, created_at TIMESTAMP ) COMMENT 'ODS: supplier master data from WMS';

WMS 库存快照表(MySQL 批量同步目标表,按仓库 + 日期分区)

CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_inventory ( snapshot_id BIGINT, snapshot_date DATE, warehouse_id INT, sku_code STRING, product_id BIGINT, quantity_on_hand INT, quantity_reserved INT, quantity_in_transit INT, reorder_point INT, created_at TIMESTAMP ) COMMENT 'ODS: WMS inventory snapshots (synced via MySQL batch offline sync)' PARTITIONED BY (dt STRING);

OSS PIPE 接入 TMS EDI 文件

物流商每天凌晨将运单 EDI 文件上传到 OSS 存储桶,通过 PIPE 自动导入运单表:

-- 前提:已创建 OSS Storage Connection 和 External Volume CREATE PIPE IF NOT EXISTS best_practice_supply_chain.pipe_ods_shipments AS COPY INTO best_practice_supply_chain.doc_ods_shipments FROM VOLUME oss_logistics_vol USING csv OPTIONS('header'='true', 'sep'=',');


DWD 层:订单生命周期事件标准化

DWD 层将 ODS 三张核心表 JOIN 打宽,派生

delivery_flag
delivery_flag
(准时/延迟/逾期)和
transit_days
transit_days
(在途天数),提供统一的订单事件视图。

建表

订单事件宽表(Dynamic Table)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_order_events COMMENT 'DWD: standardized order lifecycle events with shipment join' AS SELECT o.order_id, o.order_date, o.customer_id, o.store_id, o.status AS order_status, o.total_amount, o.currency, o.created_at AS order_created_at, o.updated_at AS order_updated_at, oi.item_id, oi.product_id, oi.sku_code, oi.quantity, oi.unit_price, oi.discount, oi.warehouse_id, (oi.unit_price * oi.quantity - oi.discount) AS line_amount, s.shipment_id, s.carrier_code, s.tracking_number, s.shipped_at, s.expected_delivery, s.actual_delivery, s.status AS shipment_status, s.dest_city, s.dest_province, CASE WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery <= s.expected_delivery THEN 'on_time' WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery > s.expected_delivery THEN 'delayed' WHEN s.actual_delivery IS NULL AND CURRENT_DATE() > s.expected_delivery THEN 'overdue' ELSE 'pending' END AS delivery_flag, DATEDIFF(COALESCE(s.actual_delivery, CURRENT_DATE()), s.shipped_at) AS transit_days FROM best_practice_supply_chain.doc_ods_orders o JOIN best_practice_supply_chain.doc_ods_order_items oi ON o.order_id = oi.order_id LEFT JOIN best_practice_supply_chain.doc_ods_shipments s ON o.order_id = s.order_id;

库存事件宽表(Dynamic Table)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_inventory_events COMMENT 'DWD: enriched inventory snapshots with availability calculation' AS SELECT inv.snapshot_date, inv.warehouse_id, inv.sku_code, inv.product_id, inv.quantity_on_hand, inv.quantity_reserved, inv.quantity_in_transit, inv.reorder_point, (inv.quantity_on_hand - inv.quantity_reserved) AS available_quantity, CASE WHEN (inv.quantity_on_hand - inv.quantity_reserved) <= 0 THEN 'out_of_stock' WHEN (inv.quantity_on_hand - inv.quantity_reserved) < inv.reorder_point THEN 'low_stock' ELSE 'normal' END AS stock_status FROM best_practice_supply_chain.doc_ods_inventory inv;

刷新动态表并验证数据

cz-cli sql "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dwd_order_events" -p skill_test --write cz-cli sql "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dwd_inventory_events" -p skill_test --write

查询订单事件宽表,验证

delivery_flag
delivery_flag
transit_days
transit_days
派生字段:

SELECT order_id, sku_code, order_status, delivery_flag, transit_days FROM best_practice_supply_chain.doc_dwd_order_events ORDER BY order_id LIMIT 10;

order_idsku_codeorder_statusdelivery_flagtransit_days
100001SKU-A001delivereddelayed4
100001SKU-B012delivereddelayed4
100002SKU-C005shippedoverdue795
100003SKU-A001processingpendingnull
100004SKU-E007delivereddelayed4
100005SKU-B012cancelledpendingnull
100006SKU-C005delivereddelayed4
100007SKU-F001shippedoverdue793
100008SKU-A001delivereddelayed4

delivery_flag
delivery_flag
取值解读:
delayed
delayed
表示实际签收晚于预期到货日、
overdue
overdue
表示已发货但至今仍未签收(超过承诺时效)、
pending
pending
表示尚未发货或已取消。
transit_days
transit_days
对未签收运单使用
COALESCE(actual_delivery, CURRENT_DATE())
COALESCE(actual_delivery, CURRENT_DATE())
计算,未发货行为 null。

查询库存事件宽表:

SELECT warehouse_id, sku_code, quantity_on_hand, available_quantity, stock_status FROM best_practice_supply_chain.doc_dwd_inventory_events ORDER BY warehouse_id, sku_code;

warehouse_idsku_codequantity_on_handavailable_quantitystock_status
1SKU-A001380335normal
1SKU-B012210180normal
1SKU-F001180155normal
2SKU-A001150130normal
2SKU-C005560480normal
2SKU-G009320280normal
3SKU-D0209585normal
3SKU-E0074237normal

available_quantity = quantity_on_hand - quantity_reserved
available_quantity = quantity_on_hand - quantity_reserved
,表示扣除已预占库存后的实际可发货数量。


DWS 层:SKU 库存与路线时效聚合

DWS 层在 DWD 基础上按两个维度聚合:SKU 日销售汇总(库存周转分析用)和承运商路线时效汇总(SLA 考核用)。

建表

SKU 日销售聚合表(Dynamic Table)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_sku_daily_sales COMMENT 'DWS: daily SKU-level sales and inventory turnover aggregation' AS SELECT e.order_date, e.sku_code, e.warehouse_id, COUNT(DISTINCT e.order_id) AS order_count, SUM(e.quantity) AS total_quantity_sold, SUM(e.line_amount) AS total_revenue, AVG(e.unit_price) AS avg_unit_price, SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_count, SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_count, SUM(CASE WHEN e.delivery_flag = 'overdue' THEN 1 ELSE 0 END) AS overdue_count FROM best_practice_supply_chain.doc_dwd_order_events e WHERE e.order_status NOT IN ('cancelled') GROUP BY e.order_date, e.sku_code, e.warehouse_id;

承运商路线时效聚合表(Dynamic Table)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_carrier_timeliness COMMENT 'DWS: carrier on-time delivery rate and route performance aggregation' AS SELECT s.carrier_code, s.dest_province, DATE_TRUNC('week', s.shipped_at) AS ship_week, COUNT(*) AS total_shipments, SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_shipments, SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_shipments, ROUND( SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2 ) AS on_time_rate_pct, AVG(e.transit_days) AS avg_transit_days FROM best_practice_supply_chain.doc_dwd_order_events e JOIN best_practice_supply_chain.doc_ods_shipments s ON e.shipment_id = s.shipment_id WHERE e.shipment_status IN ('delivered', 'in_transit') GROUP BY s.carrier_code, s.dest_province, DATE_TRUNC('week', s.shipped_at);

查询 DWS 聚合结果

SKU 销售汇总(按 SKU 合并所有日期):

SELECT sku_code, SUM(total_quantity_sold) AS qty, ROUND(SUM(total_revenue), 2) AS revenue FROM best_practice_supply_chain.doc_dws_sku_daily_sales GROUP BY sku_code ORDER BY revenue DESC;

sku_codeqtyrevenue
SKU-A0018694.00
SKU-F0012680.00
SKU-C00511554.50
SKU-D0204486.00
SKU-E0071215.30
SKU-B0121180.90
SKU-G009163.00

SKU-A001 以 8 件、694 元位居销量和销售额双第一;SKU-C005 件数最多(11件)但单价低,总额位列第三。这类销量/销额分布差异是补货优先级决策的核心依据。

承运商时效汇总(跨周聚合):

SELECT carrier_code, SUM(total_shipments) AS shipments, ROUND(AVG(on_time_rate_pct), 2) AS avg_ontime_pct, ROUND(AVG(avg_transit_days), 1) AS avg_transit FROM best_practice_supply_chain.doc_dws_carrier_timeliness GROUP BY carrier_code ORDER BY avg_ontime_pct DESC;

carrier_codeshipmentsavg_ontime_pctavg_transit
YTO20.004
SF30.004
ZTO10.004
BEST10.00793
JD10.00795

BEST 和 JD 的

avg_transit_days
avg_transit_days
显示 793 和 795 天,是因为这两个运单状态为
in_transit
in_transit
(未签收),
transit_days
transit_days
CURRENT_DATE()
CURRENT_DATE()
为截止日期计算,属于历史测试数据的正常表现。在真实生产数据中,DWS 层可作为运营监控看板的数据源,识别平均在途天数异常高的路线。


ADS 层:供应商 SLA 报告与库存告警

ADS 层直接服务业务决策:供应商合规管理和库存补货告警。

建表

供应商 SLA 月度报告(Dynamic Table)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_supplier_sla_report COMMENT 'ADS: supplier SLA compliance report — monthly delivery performance vs contracted SLA days' AS SELECT sup.supplier_id, sup.supplier_name, sup.tier AS supplier_tier, sup.sla_days AS contracted_sla_days, DATE_FORMAT(o.order_date, 'yyyy-MM') AS stat_month, COUNT(DISTINCT o.order_id) AS total_orders, SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_orders, SUM(CASE WHEN dwd.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_orders, ROUND( SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0 / NULLIF(COUNT(DISTINCT o.order_id), 0), 2 ) AS on_time_rate_pct, AVG(dwd.transit_days) AS avg_transit_days, CASE WHEN ROUND( SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0 / NULLIF(COUNT(DISTINCT o.order_id), 0), 2 ) >= 95 THEN 'SLA_MET' WHEN ROUND( SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0 / NULLIF(COUNT(DISTINCT o.order_id), 0), 2 ) >= 80 THEN 'SLA_AT_RISK' ELSE 'SLA_BREACH' END AS sla_status FROM best_practice_supply_chain.doc_dwd_order_events dwd JOIN best_practice_supply_chain.doc_ods_orders o ON dwd.order_id = o.order_id JOIN best_practice_supply_chain.doc_ods_order_items oi ON dwd.item_id = oi.item_id JOIN best_practice_supply_chain.doc_ods_suppliers sup ON oi.warehouse_id = sup.supplier_id WHERE dwd.order_status != 'cancelled' AND dwd.shipment_status IS NOT NULL GROUP BY sup.supplier_id, sup.supplier_name, sup.tier, sup.sla_days, DATE_FORMAT(o.order_date, 'yyyy-MM');

库存告警表(Dynamic Table,REFRESH 5min 高频刷新)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_inventory_alert COMMENT 'ADS: real-time inventory alert — low stock and out-of-stock SKUs requiring reorder' AS SELECT inv.snapshot_date, inv.warehouse_id, inv.sku_code, inv.product_id, inv.quantity_on_hand, inv.available_quantity, inv.reorder_point, inv.stock_status, CASE WHEN inv.stock_status = 'out_of_stock' THEN 'URGENT' WHEN inv.stock_status = 'low_stock' THEN 'WARNING' ELSE NULL END AS alert_level, (inv.reorder_point * 2 - inv.quantity_on_hand) AS suggested_reorder_qty FROM best_practice_supply_chain.doc_dwd_inventory_events inv WHERE inv.stock_status IN ('out_of_stock', 'low_stock');

查询 ADS 告警数据

供应商 SLA 月度合规状态:

SELECT supplier_name, supplier_tier, contracted_sla_days, stat_month, total_orders, on_time_orders, on_time_rate_pct, ROUND(avg_transit_days, 1) AS avg_transit, sla_status FROM best_practice_supply_chain.doc_ads_supplier_sla_report ORDER BY supplier_name;

supplier_namesupplier_tiercontracted_sla_daysstat_monthtotal_orderson_time_orderson_time_rate_pctavg_transitsla_status
IndiaMakers Inc.B72024-04200.00398.5SLA_BREACH
ShenzhenTech Co.A32024-04200.004.0SLA_BREACH
VietnamFactory Ltd.B52024-04300.00267.7SLA_BREACH

所有供应商当月均为

SLA_BREACH
SLA_BREACH
,因为测试数据的运单实际到货日均晚于预期(测试数据期现在已过期,
delivery_flag
delivery_flag
输出为
delayed
delayed
)。
contracted_sla_days
contracted_sla_days
字段来自
doc_ods_suppliers.sla_days
doc_ods_suppliers.sla_days
,记录了合同承诺的最大在途天数,可结合
avg_transit_days
avg_transit_days
直接评估供应商实际表现与合同约定的差距。

库存告警清单:

SELECT snapshot_date, warehouse_id, sku_code, available_quantity, reorder_point, stock_status, alert_level, suggested_reorder_qty FROM best_practice_supply_chain.doc_ads_inventory_alert ORDER BY alert_level, warehouse_id;

snapshot_datewarehouse_idsku_codeavailable_quantityreorder_pointstock_statusalert_levelsuggested_reorder_qty
2024-04-023SKU-E007020out_of_stockURGENT40
2024-04-023SKU-D020530low_stockWARNING35

alert_level = URGENT
alert_level = URGENT
表示库存已耗尽,需要立即补货;
alert_level = WARNING
alert_level = WARNING
表示可用库存低于补货点,建议近期补货。
suggested_reorder_qty = reorder_point * 2 - quantity_on_hand
suggested_reorder_qty = reorder_point * 2 - quantity_on_hand
是一个简单的补货量估算公式(补货至安全库存的两倍),可根据实际周转率调整系数。


Dynamic Table 级联刷新验证

执行下面的查询可以确认 6 张 Dynamic Table 均已创建并处于活跃状态:

SHOW DYNAMIC TABLES IN best_practice_supply_chain;

schema_nametable_nameis_dynamic
best_practice_supply_chaindoc_ads_inventory_alerttrue
best_practice_supply_chaindoc_ads_supplier_sla_reporttrue
best_practice_supply_chaindoc_dwd_inventory_eventstrue
best_practice_supply_chaindoc_dwd_order_eventstrue
best_practice_supply_chaindoc_dws_carrier_timelinesstrue
best_practice_supply_chaindoc_dws_sku_daily_salestrue

级联依赖链:

ODS 原始表(静态) ↓ doc_dwd_order_events ← JOIN orders + order_items + shipments doc_dwd_inventory_events ← 库存快照 + 可用量派生 ↓ (任务依赖) doc_dws_sku_daily_sales ← 按日期 × SKU × 仓库聚合 doc_dws_carrier_timeliness ← 按承运商 × 省份 × 周聚合 ↓ (任务依赖) doc_ads_inventory_alert ← 过滤低库存 SKU doc_ads_supplier_sla_report← 月度供应商合规评级

6 张动态表均不在 DDL 中写

REFRESH INTERVAL
REFRESH INTERVAL
,刷新顺序通过 Studio Task 的调度依赖保证(见下节)。


配置 Studio 调度任务

生产环境中,Dynamic Table 的定期刷新通过 Studio Task 管理,而不是在 DDL 中写

REFRESH INTERVAL
REFRESH INTERVAL
。这样做的好处是:调度时间和依赖关系可以在不重建表的情况下调整,同时可以在任务上挂载告警规则,刷新失败时及时通知值班人员。

创建刷新任务

DWD 层:

# 创建 DWD 订单事件刷新任务 cz-cli task create refresh_dwd_order_events_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10353826,...}} cz-cli task save-content 10353826 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dwd_order_events;" \ -p skill_test # 创建 DWD 库存事件刷新任务 cz-cli task create refresh_dwd_inventory_events_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10354789,...}} cz-cli task save-content 10354789 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dwd_inventory_events;" \ -p skill_test

DWS 层:

cz-cli task create refresh_dws_sku_daily_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10353827,...}} cz-cli task save-content 10353827 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dws_sku_daily_sales;" \ -p skill_test cz-cli task create refresh_dws_carrier_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10354790,...}} cz-cli task save-content 10354790 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_dws_carrier_timeliness;" \ -p skill_test

ADS 层:

cz-cli task create refresh_ads_inventory_alert_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10353828,...}} cz-cli task save-content 10353828 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_ads_inventory_alert;" \ -p skill_test cz-cli task create refresh_ads_supplier_sla_sc --type SQL -p skill_test # 返回示例: {"data":{"id":10354791,...}} cz-cli task save-content 10354791 \ --content "REFRESH DYNAMIC TABLE best_practice_supply_chain.doc_ads_supplier_sla_report;" \ -p skill_test

配置调度时间

# DWD 层:每天凌晨 1:00 刷新 cz-cli task save-cron 10353826 --cron "0 0 1 * * ?" -p skill_test cz-cli task save-cron 10354789 --cron "0 0 1 * * ?" -p skill_test # DWS 层:每天凌晨 1:30 刷新(等待 DWD 完成) cz-cli task save-cron 10353827 --cron "0 30 1 * * ?" -p skill_test cz-cli task save-cron 10354790 --cron "0 30 1 * * ?" -p skill_test # ADS 层:每天凌晨 2:00 刷新(等待 DWS 完成) cz-cli task save-cron 10353828 --cron "0 0 2 * * ?" -p skill_test cz-cli task save-cron 10354791 --cron "0 0 2 * * ?" -p skill_test

配置任务依赖

单靠调度时间无法保证上游完成后再触发下游(上游如果运行超时就会出现数据还没刷新完就开始计算 DWS 的情况)。通过

save-config --deps
save-config --deps
配置任务依赖,实现基于完成状态的级联触发:

# DWS SKU 聚合依赖 DWD 订单事件 cz-cli task save-config refresh_dws_sku_daily_sc \ --deps replace \ --dep-tasks '[{"taskId":10353826,"taskName":"refresh_dwd_order_events_sc"}]' \ -p skill_test # DWS 承运商时效依赖 DWD 订单事件 cz-cli task save-config refresh_dws_carrier_sc \ --deps replace \ --dep-tasks '[{"taskId":10353826,"taskName":"refresh_dwd_order_events_sc"}]' \ -p skill_test # ADS 库存告警依赖 DWD 库存事件 cz-cli task save-config refresh_ads_inventory_alert_sc \ --deps replace \ --dep-tasks '[{"taskId":10354789,"taskName":"refresh_dwd_inventory_events_sc"}]' \ -p skill_test # ADS 供应商 SLA 报告依赖 DWS SKU 聚合和 DWS 承运商时效 cz-cli task save-config refresh_ads_supplier_sla_sc \ --deps replace \ --dep-tasks '[{"taskId":10353827,"taskName":"refresh_dws_sku_daily_sc"},{"taskId":10354790,"taskName":"refresh_dws_carrier_sc"}]' \ -p skill_test

完整调度链:

01:00 refresh_dwd_order_events_sc (DWD 订单事件宽表) 01:00 refresh_dwd_inventory_events_sc (DWD 库存事件宽表) ↓ 依赖完成后触发 01:30 refresh_dws_sku_daily_sc (DWS SKU 日销售聚合) 01:30 refresh_dws_carrier_sc (DWS 承运商路线时效聚合) ↓ 依赖完成后触发 02:00 refresh_ads_inventory_alert_sc (ADS 库存告警) 02:00 refresh_ads_supplier_sla_sc (ADS 供应商 SLA 月度报告)


注意事项

  • 分区列命名
    PARTITIONED BY
    PARTITIONED BY
    中定义的列名不能与
    columns
    columns
    中的字段重名,否则报
    key.found
    key.found
    错误。ODS 库存表虽然按仓库和日期双维度查询,但分区列只定义
    dt STRING
    dt STRING
    ,仓库维度通过 WHERE 过滤而非分区裁剪。
  • PostgreSQL CDC 表结构:CDC 目标表的列定义需与源表字段对齐,数据类型不匹配时 CDC 任务会报
    implicit cast not allowed
    implicit cast not allowed
    错误,插入时需显式
    CAST('...' AS TIMESTAMP)
    CAST('...' AS TIMESTAMP)
  • OSS PIPE FILES() 限制:PIPE 定义中不支持
    FILES('filename')
    FILES('filename')
    SUBDIRECTORY 'dirname'
    SUBDIRECTORY 'dirname'
    来过滤特定文件,只能针对整个 Volume 路径进行扫描。如果 EDI 文件来自多个物流商且格式不同,建议为每个物流商建独立 Volume 和 PIPE。
  • Dynamic Table 不在 DDL 中写
    REFRESH INTERVAL
    REFRESH INTERVAL
    :所有 Dynamic Table 的定期刷新通过 Studio Task 管理。Studio Task 支持配置调度依赖(上游完成后才触发下游),比固定时间间隔更可靠;同时支持在同一任务上挂载数据质量规则和告警,DDL 里写
    REFRESH INTERVAL
    REFRESH INTERVAL
    会绕过这套管理机制。
  • NULLIF
    NULLIF
    防零除
    :计算 SLA 达标率时,分母使用
    NULLIF(COUNT(DISTINCT order_id), 0)
    NULLIF(COUNT(DISTINCT order_id), 0)
    避免除零异常,当某供应商当月无已发货订单时,
    on_time_rate_pct
    on_time_rate_pct
    返回 NULL 而非报错。
  • CURRENT_DATE()
    CURRENT_DATE()
    在 Dynamic Table 中的行为
    :Dynamic Table 中的
    CURRENT_DATE()
    CURRENT_DATE()
    会在每次刷新时重新计算,
    transit_days
    transit_days
    随时间自动增长,适合监控未签收的逾期运单。

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询