cz-cli sql "CREATE SCHEMA IF NOT EXISTS best_practice_supply_chain" -p skill_test --write
执行结果:
{"data":{},"time_ms":101}
ODS 层:三路异构数据接入
ODS 层对应三个源系统,各用不同摄取方式接入。
建表
OMS 订单表(PostgreSQL CDC 目标表)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_orders (
order_id BIGINT,
order_date DATE,
customer_id BIGINT,
store_id INT,
status STRING,
total_amount DECIMAL(12,2),
currency STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
COMMENT 'ODS: raw orders from OMS (synced via PostgreSQL CDC)'
PARTITIONED BY (dt STRING);
OMS 订单明细表(PostgreSQL CDC 目标表)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_order_items (
item_id BIGINT,
order_id BIGINT,
product_id BIGINT,
sku_code STRING,
quantity INT,
unit_price DECIMAL(10,2),
discount DECIMAL(10,2),
warehouse_id INT,
created_at TIMESTAMP
)
COMMENT 'ODS: raw order line items from OMS'
PARTITIONED BY (dt STRING);
TMS 运单表(OSS PIPE 目标表)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_shipments (
shipment_id BIGINT,
order_id BIGINT,
carrier_code STRING,
tracking_number STRING,
origin_warehouse INT,
dest_city STRING,
dest_province STRING,
shipped_at TIMESTAMP,
expected_delivery DATE,
actual_delivery DATE,
status STRING,
created_at TIMESTAMP
)
COMMENT 'ODS: logistics shipment events from TMS / EDI files (via OSS PIPE)'
PARTITIONED BY (dt STRING);
WMS 供应商主数据表
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_suppliers (
supplier_id INT,
supplier_name STRING,
contact_name STRING,
country STRING,
city STRING,
sla_days INT,
tier STRING,
created_at TIMESTAMP
)
COMMENT 'ODS: supplier master data from WMS';
WMS 库存快照表(MySQL 批量同步目标表,按仓库 + 日期分区)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_inventory (
snapshot_id BIGINT,
snapshot_date DATE,
warehouse_id INT,
sku_code STRING,
product_id BIGINT,
quantity_on_hand INT,
quantity_reserved INT,
quantity_in_transit INT,
reorder_point INT,
created_at TIMESTAMP
)
COMMENT 'ODS: WMS inventory snapshots (synced via MySQL batch offline sync)'
PARTITIONED BY (dt STRING);
⚠️ 注意:
PARTITIONED BY
PARTITIONED BY
中的列不能与
columns
columns
定义中的列同名,否则报
key.found
key.found
错误。库存表虽按仓库和日期双维度查询,但分区列只定义一个
dt STRING
dt STRING
,仓库维度通过 WHERE 子句下推过滤。
OSS PIPE 接入 TMS EDI 文件
物流商每天凌晨将运单 EDI 文件上传到 OSS 存储桶,通过 PIPE 自动导入运单表:
-- 前提:已创建 OSS Storage Connection 和 External Volume
CREATE PIPE IF NOT EXISTS best_practice_supply_chain.pipe_ods_shipments
AS
COPY INTO best_practice_supply_chain.doc_ods_shipments
FROM VOLUME oss_logistics_vol
USING csv
OPTIONS('header'='true', 'sep'=',');
💡 提示:PIPE 默认使用
LIST_PURGE
LIST_PURGE
扫描模式(定期轮询 Volume 获取新文件),如果 OSS 已开启事件通知,可改为
INGEST_MODE = EVENT_NOTIFICATION
INGEST_MODE = EVENT_NOTIFICATION
实现秒级文件触发。
DWD 层:订单生命周期事件标准化
DWD 层将 ODS 三张核心表 JOIN 打宽,派生
delivery_flag
delivery_flag
(准时/延迟/逾期)和
transit_days
transit_days
(在途天数),提供统一的订单事件视图。
建表
订单事件宽表(Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_order_events
COMMENT 'DWD: standardized order lifecycle events with shipment join'
AS
SELECT
o.order_id,
o.order_date,
o.customer_id,
o.store_id,
o.status AS order_status,
o.total_amount,
o.currency,
o.created_at AS order_created_at,
o.updated_at AS order_updated_at,
oi.item_id,
oi.product_id,
oi.sku_code,
oi.quantity,
oi.unit_price,
oi.discount,
oi.warehouse_id,
(oi.unit_price * oi.quantity - oi.discount) AS line_amount,
s.shipment_id,
s.carrier_code,
s.tracking_number,
s.shipped_at,
s.expected_delivery,
s.actual_delivery,
s.status AS shipment_status,
s.dest_city,
s.dest_province,
CASE
WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery <= s.expected_delivery THEN 'on_time'
WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery > s.expected_delivery THEN 'delayed'
WHEN s.actual_delivery IS NULL AND CURRENT_DATE() > s.expected_delivery THEN 'overdue'
ELSE 'pending'
END AS delivery_flag,
DATEDIFF(COALESCE(s.actual_delivery, CURRENT_DATE()), s.shipped_at) AS transit_days
FROM best_practice_supply_chain.doc_ods_orders o
JOIN best_practice_supply_chain.doc_ods_order_items oi ON o.order_id = oi.order_id
LEFT JOIN best_practice_supply_chain.doc_ods_shipments s ON o.order_id = s.order_id;
库存事件宽表(Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_inventory_events
COMMENT 'DWD: enriched inventory snapshots with availability calculation'
AS
SELECT
inv.snapshot_date,
inv.warehouse_id,
inv.sku_code,
inv.product_id,
inv.quantity_on_hand,
inv.quantity_reserved,
inv.quantity_in_transit,
inv.reorder_point,
(inv.quantity_on_hand - inv.quantity_reserved) AS available_quantity,
CASE
WHEN (inv.quantity_on_hand - inv.quantity_reserved) <= 0 THEN 'out_of_stock'
WHEN (inv.quantity_on_hand - inv.quantity_reserved) < inv.reorder_point THEN 'low_stock'
ELSE 'normal'
END AS stock_status
FROM best_practice_supply_chain.doc_ods_inventory inv;
SELECT order_id, sku_code, order_status, delivery_flag, transit_days
FROM best_practice_supply_chain.doc_dwd_order_events
ORDER BY order_id
LIMIT 10;
order_id
sku_code
order_status
delivery_flag
transit_days
100001
SKU-A001
delivered
delayed
4
100001
SKU-B012
delivered
delayed
4
100002
SKU-C005
shipped
overdue
795
100003
SKU-A001
processing
pending
null
100004
SKU-E007
delivered
delayed
4
100005
SKU-B012
cancelled
pending
null
100006
SKU-C005
delivered
delayed
4
100007
SKU-F001
shipped
overdue
793
100008
SKU-A001
delivered
delayed
4
delivery_flag
delivery_flag
取值解读:
delayed
delayed
表示实际签收晚于预期到货日、
overdue
overdue
表示已发货但至今仍未签收(超过承诺时效)、
pending
pending
表示尚未发货或已取消。
transit_days
transit_days
对未签收运单使用
COALESCE(actual_delivery, CURRENT_DATE())
COALESCE(actual_delivery, CURRENT_DATE())
计算,未发货行为 null。
查询库存事件宽表:
SELECT warehouse_id, sku_code, quantity_on_hand, available_quantity, stock_status
FROM best_practice_supply_chain.doc_dwd_inventory_events
ORDER BY warehouse_id, sku_code;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_sku_daily_sales
COMMENT 'DWS: daily SKU-level sales and inventory turnover aggregation'
AS
SELECT
e.order_date,
e.sku_code,
e.warehouse_id,
COUNT(DISTINCT e.order_id) AS order_count,
SUM(e.quantity) AS total_quantity_sold,
SUM(e.line_amount) AS total_revenue,
AVG(e.unit_price) AS avg_unit_price,
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_count,
SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_count,
SUM(CASE WHEN e.delivery_flag = 'overdue' THEN 1 ELSE 0 END) AS overdue_count
FROM best_practice_supply_chain.doc_dwd_order_events e
WHERE e.order_status NOT IN ('cancelled')
GROUP BY e.order_date, e.sku_code, e.warehouse_id;
承运商路线时效聚合表(Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_carrier_timeliness
COMMENT 'DWS: carrier on-time delivery rate and route performance aggregation'
AS
SELECT
s.carrier_code,
s.dest_province,
DATE_TRUNC('week', s.shipped_at) AS ship_week,
COUNT(*) AS total_shipments,
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_shipments,
SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_shipments,
ROUND(
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2
) AS on_time_rate_pct,
AVG(e.transit_days) AS avg_transit_days
FROM best_practice_supply_chain.doc_dwd_order_events e
JOIN best_practice_supply_chain.doc_ods_shipments s ON e.shipment_id = s.shipment_id
WHERE e.shipment_status IN ('delivered', 'in_transit')
GROUP BY s.carrier_code, s.dest_province, DATE_TRUNC('week', s.shipped_at);
查询 DWS 聚合结果
SKU 销售汇总(按 SKU 合并所有日期):
SELECT
sku_code,
SUM(total_quantity_sold) AS qty,
ROUND(SUM(total_revenue), 2) AS revenue
FROM best_practice_supply_chain.doc_dws_sku_daily_sales
GROUP BY sku_code
ORDER BY revenue DESC;
SELECT
carrier_code,
SUM(total_shipments) AS shipments,
ROUND(AVG(on_time_rate_pct), 2) AS avg_ontime_pct,
ROUND(AVG(avg_transit_days), 1) AS avg_transit
FROM best_practice_supply_chain.doc_dws_carrier_timeliness
GROUP BY carrier_code
ORDER BY avg_ontime_pct DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_supplier_sla_report
COMMENT 'ADS: supplier SLA compliance report — monthly delivery performance vs contracted SLA days'
AS
SELECT
sup.supplier_id,
sup.supplier_name,
sup.tier AS supplier_tier,
sup.sla_days AS contracted_sla_days,
DATE_FORMAT(o.order_date, 'yyyy-MM') AS stat_month,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_orders,
SUM(CASE WHEN dwd.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_orders,
ROUND(
SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) AS on_time_rate_pct,
AVG(dwd.transit_days) AS avg_transit_days,
CASE
WHEN ROUND(
SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) >= 95 THEN 'SLA_MET'
WHEN ROUND(
SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) >= 80 THEN 'SLA_AT_RISK'
ELSE 'SLA_BREACH'
END AS sla_status
FROM best_practice_supply_chain.doc_dwd_order_events dwd
JOIN best_practice_supply_chain.doc_ods_orders o ON dwd.order_id = o.order_id
JOIN best_practice_supply_chain.doc_ods_order_items oi ON dwd.item_id = oi.item_id
JOIN best_practice_supply_chain.doc_ods_suppliers sup ON oi.warehouse_id = sup.supplier_id
WHERE dwd.order_status != 'cancelled'
AND dwd.shipment_status IS NOT NULL
GROUP BY
sup.supplier_id, sup.supplier_name, sup.tier, sup.sla_days,
DATE_FORMAT(o.order_date, 'yyyy-MM');
库存告警表(Dynamic Table,REFRESH 5min 高频刷新)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_inventory_alert
COMMENT 'ADS: real-time inventory alert — low stock and out-of-stock SKUs requiring reorder'
AS
SELECT
inv.snapshot_date,
inv.warehouse_id,
inv.sku_code,
inv.product_id,
inv.quantity_on_hand,
inv.available_quantity,
inv.reorder_point,
inv.stock_status,
CASE
WHEN inv.stock_status = 'out_of_stock' THEN 'URGENT'
WHEN inv.stock_status = 'low_stock' THEN 'WARNING'
ELSE NULL
END AS alert_level,
(inv.reorder_point * 2 - inv.quantity_on_hand) AS suggested_reorder_qty
FROM best_practice_supply_chain.doc_dwd_inventory_events inv
WHERE inv.stock_status IN ('out_of_stock', 'low_stock');
💡 提示:库存告警表只保留需要处置的 SKU(
WHERE stock_status IN ('out_of_stock', 'low_stock')
WHERE stock_status IN ('out_of_stock', 'low_stock')
),行数远小于全量 DWD 表,5分钟刷新的计算开销很低。
查询 ADS 告警数据
供应商 SLA 月度合规状态:
SELECT
supplier_name,
supplier_tier,
contracted_sla_days,
stat_month,
total_orders,
on_time_orders,
on_time_rate_pct,
ROUND(avg_transit_days, 1) AS avg_transit,
sla_status
FROM best_practice_supply_chain.doc_ads_supplier_sla_report
ORDER BY supplier_name;
supplier_name
supplier_tier
contracted_sla_days
stat_month
total_orders
on_time_orders
on_time_rate_pct
avg_transit
sla_status
IndiaMakers Inc.
B
7
2024-04
2
0
0.00
398.5
SLA_BREACH
ShenzhenTech Co.
A
3
2024-04
2
0
0.00
4.0
SLA_BREACH
VietnamFactory Ltd.
B
5
2024-04
3
0
0.00
267.7
SLA_BREACH
所有供应商当月均为
SLA_BREACH
SLA_BREACH
,因为测试数据的运单实际到货日均晚于预期(测试数据期现在已过期,
delivery_flag
delivery_flag
输出为
delayed
delayed
)。
contracted_sla_days
contracted_sla_days
字段来自
doc_ods_suppliers.sla_days
doc_ods_suppliers.sla_days
,记录了合同承诺的最大在途天数,可结合
avg_transit_days
avg_transit_days
直接评估供应商实际表现与合同约定的差距。
库存告警清单:
SELECT
snapshot_date,
warehouse_id,
sku_code,
available_quantity,
reorder_point,
stock_status,
alert_level,
suggested_reorder_qty
FROM best_practice_supply_chain.doc_ads_inventory_alert
ORDER BY alert_level, warehouse_id;