-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/stores.csv' TO USER VOLUME FILE 'stores.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_retail_pos.doc_ods_stores
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('stores.csv');
对其余各表(
doc_ods_orders
doc_ods_orders
、
doc_ods_order_items
doc_ods_order_items
、
doc_ods_products
doc_ods_products
、
doc_ods_customers
doc_ods_customers
、
doc_ods_payments
doc_ods_payments
、
doc_ods_returns
doc_ods_returns
等)重复上述步骤,分别上传对应 CSV 文件并执行 COPY INTO。
也可直接内联插入小批量测试数据(不需要 CSV 文件):
-- 示例:写入门店主数据(100 家门店)
INSERT INTO best_practice_retail_pos.doc_ods_stores VALUES
(1,'Pune'),(2,'Pune'),(3,'Delhi'),(4,'Mumbai'),(5,'Mumbai'),
-- ... 共 100 行
(100,'Delhi');
-- 写入订单(100 笔)及订单明细(200 条)
INSERT INTO best_practice_retail_pos.doc_ods_orders VALUES
(1,45,33,CAST('2021-08-26' AS DATE),24),
(2,10,81,CAST('2022-03-19' AS DATE),3),
-- ...
(100,31,63,CAST('2022-03-30' AS DATE),33);
验证 ODS 各表行数:
SELECT
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_stores) AS stores,
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_orders) AS orders,
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_order_items) AS items,
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_products) AS products,
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_payments) AS payments,
(SELECT COUNT(*) FROM best_practice_retail_pos.doc_ods_returns) AS returns;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_retail_pos.doc_dwd_sales_detail
AS
SELECT
oi.order_item_id,
o.order_id,
o.store_id,
s.city AS store_city,
o.order_date,
o.customer_id,
c.city AS customer_city,
oi.product_id,
p.category_id,
cat.category_name,
p.price AS list_price,
oi.qty,
oi.price AS unit_price,
CAST(oi.qty * oi.price AS DOUBLE) AS gross_amount,
COALESCE(pr.discount, 0) AS discount_pct,
ROUND(oi.qty * oi.price * (1.0 - COALESCE(pr.discount, 0) / 100.0), 2) AS net_amount,
CASE WHEN r.return_id IS NOT NULL THEN 1 ELSE 0 END AS is_returned,
COALESCE(r.refund, 0) AS refund_amount
FROM best_practice_retail_pos.doc_ods_order_items oi
JOIN best_practice_retail_pos.doc_ods_orders o ON oi.order_id = o.order_id
JOIN best_practice_retail_pos.doc_ods_stores s ON o.store_id = s.store_id
JOIN best_practice_retail_pos.doc_ods_customers c ON o.customer_id = c.customer_id
JOIN best_practice_retail_pos.doc_ods_products p ON oi.product_id = p.product_id
JOIN best_practice_retail_pos.doc_ods_categories cat ON p.category_id = cat.category_id
LEFT JOIN best_practice_retail_pos.doc_ods_promotions pr ON o.promotion_id = pr.promotion_id
LEFT JOIN best_practice_retail_pos.doc_ods_returns r ON oi.order_item_id = r.order_item_id;
⚠️ 注意:
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
的 DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
。刷新调度通过 Studio Task 管理(见下文),这样可以在同一任务上附加数据质量检查和告警规则。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_retail_pos.doc_dws_store_daily_sales
AS
SELECT
store_id,
store_city,
order_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(order_item_id) AS item_count,
SUM(qty) AS total_qty,
ROUND(SUM(gross_amount), 2) AS gross_revenue,
ROUND(SUM(net_amount), 2) AS net_revenue,
ROUND(AVG(discount_pct), 2) AS avg_discount_pct,
SUM(is_returned) AS return_count,
ROUND(SUM(refund_amount), 2) AS total_refund,
COUNT(DISTINCT product_id) AS sku_count
FROM best_practice_retail_pos.doc_dwd_sales_detail
GROUP BY store_id, store_city, order_date;
REFRESH DYNAMIC TABLE best_practice_retail_pos.doc_dws_store_daily_sales;
SELECT store_id, store_city, order_date, order_count, total_qty,
gross_revenue, net_revenue, avg_discount_pct, return_count, sku_count
FROM best_practice_retail_pos.doc_dws_store_daily_sales
ORDER BY net_revenue DESC
LIMIT 8;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_retail_pos.doc_dws_store_date_partition (
store_id, order_date, order_count, item_count, total_qty, net_revenue
)
PARTITIONED BY (store_id)
AS
SELECT
store_id,
order_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(order_item_id) AS item_count,
SUM(qty) AS total_qty,
ROUND(SUM(net_amount), 2) AS net_revenue
FROM best_practice_retail_pos.doc_dwd_sales_detail
WHERE store_id = CAST(SESSION_CONFIGS()['dt.args.store_id'] AS INT)
GROUP BY store_id, order_date;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_retail_pos.doc_ads_sku_velocity
AS
SELECT
product_id,
category_id,
category_name,
SUM(qty) AS total_sold_qty,
COUNT(DISTINCT order_id) AS order_count,
ROUND(SUM(net_amount), 2) AS total_net_revenue,
COUNT(DISTINCT store_id) AS store_coverage,
SUM(is_returned) AS return_count,
ROUND(SUM(is_returned) * 100.0 / NULLIF(COUNT(*), 0), 2) AS return_rate_pct,
CASE
WHEN SUM(qty) >= 10 THEN 'fast_moving'
WHEN SUM(qty) >= 5 THEN 'normal'
ELSE 'slow_moving'
END AS velocity_label,
ROUND(SUM(net_amount) / NULLIF(COUNT(DISTINCT store_id), 0), 2) AS revenue_per_store
FROM best_practice_retail_pos.doc_dwd_sales_detail
GROUP BY product_id, category_id, category_name;
REFRESH DYNAMIC TABLE best_practice_retail_pos.doc_ads_sku_velocity;
-- 查看各速度档位 SKU 数和收入分布
SELECT velocity_label, COUNT(*) AS sku_count,
ROUND(SUM(total_net_revenue), 2) AS label_revenue
FROM best_practice_retail_pos.doc_ads_sku_velocity
GROUP BY velocity_label
ORDER BY label_revenue DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_retail_pos.doc_ads_store_ranking
AS
SELECT
store_id,
store_city,
SUM(order_count) AS total_orders,
SUM(item_count) AS total_items,
SUM(total_qty) AS total_qty,
ROUND(SUM(gross_revenue), 2) AS gross_revenue,
ROUND(SUM(net_revenue), 2) AS net_revenue,
ROUND(AVG(avg_discount_pct), 2) AS avg_discount_pct,
SUM(return_count) AS total_returns,
ROUND(SUM(return_count) * 100.0 / NULLIF(SUM(item_count), 0), 2) AS return_rate_pct,
RANK() OVER (ORDER BY SUM(net_revenue) DESC) AS revenue_rank
FROM best_practice_retail_pos.doc_dws_store_daily_sales
GROUP BY store_id, store_city;
REFRESH DYNAMIC TABLE best_practice_retail_pos.doc_ads_store_ranking;
SELECT store_id, store_city, total_orders, total_qty,
gross_revenue, net_revenue, avg_discount_pct,
total_returns, return_rate_pct, revenue_rank
FROM best_practice_retail_pos.doc_ads_store_ranking
ORDER BY revenue_rank
LIMIT 10;
场景:月末财务核对——假设月末截止时刻为 version 3(60 笔订单),现在发现系统中有 100 笔,需要识别截止后补录的 40 笔:
-- 回溯月末截止时刻的门店收入快照
SELECT
snap.order_date,
snap.store_id,
COUNT(*) AS orders_in_snapshot,
SUM(p.amount) AS snapshot_revenue
FROM best_practice_retail_pos.doc_ods_orders TIMESTAMP AS OF '2026-06-06 14:41:15.488' snap
JOIN best_practice_retail_pos.doc_ods_payments p ON snap.order_id = p.order_id
GROUP BY snap.order_date, snap.store_id
ORDER BY snapshot_revenue DESC
LIMIT 5;
-- 找出月末截止后补录的订单(在当前库中有,但历史快照中没有)
SELECT o_cur.order_id, o_cur.store_id, o_cur.order_date
FROM best_practice_retail_pos.doc_ods_orders o_cur
LEFT JOIN (
SELECT order_id
FROM best_practice_retail_pos.doc_ods_orders
TIMESTAMP AS OF '2026-06-06 14:41:15.488'
) snap ON o_cur.order_id = snap.order_id
WHERE snap.order_id IS NULL
ORDER BY o_cur.order_id
LIMIT 10;