-- 客户基础信息表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_customers (
customer_id BIGINT,
city STRING,
signup_date DATE
);
-- 订单表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_orders (
order_id BIGINT,
customer_id BIGINT,
store_id BIGINT,
order_date DATE,
promotion_id BIGINT
);
-- 订单明细表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_order_items (
order_item_id BIGINT,
order_id BIGINT,
product_id BIGINT,
qty INT,
price DOUBLE
);
-- 支付表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_payments (
payment_id BIGINT,
order_id BIGINT,
amount DOUBLE
);
-- 退货表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_returns (
return_id BIGINT,
order_item_id BIGINT,
refund DOUBLE
);
-- Kafka 行为事件原始表(接收 JSON 字符串)
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_kafka_behavior_raw (
value STRING,
ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
-- 行为事件解析表
CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_behavior_events (
event_id STRING,
customer_id BIGINT,
event_type STRING,
page STRING,
product_id BIGINT,
event_time TIMESTAMP,
ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
创建 Bloomfilter Index
doc_ods_orders
doc_ods_orders
和
doc_ods_behavior_events
doc_ods_behavior_events
都会按
customer_id
customer_id
频繁过滤,该列基数达到客户总量级(高基数),适合 Bloomfilter Index。
CREATE BLOOMFILTER INDEX idx_bf_customer_id
ON TABLE doc_ods_orders (customer_id);
CREATE BLOOMFILTER INDEX idx_bf_event_customer
ON TABLE doc_ods_behavior_events (customer_id);
⚠️ 注意:
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
不支持
IF NOT EXISTS
IF NOT EXISTS
语法,且要求与目标表在同一 Schema 上下文。通过 cz-cli 执行时须加
-s best_practice_customer_360
-s best_practice_customer_360
参数指定 Schema,或在 Studio 中选中对应 Schema 后执行,否则报"index and table must in the same schema"错误。
CREATE PIPE IF NOT EXISTS best_practice_customer_360.pipe_behavior_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_customer_360.doc_ods_kafka_behavior_raw
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'retail_behavior_events', -- topic 名称
'',
'cz_c360_consumer', -- consumer group
'','','','',
'raw','raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/behavior_events_data.csv' TO USER VOLUME FILE 'behavior_events_data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_customer_360.doc_ods_behavior_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('behavior_events_data.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_customer_360.doc_ods_behavior_events
(event_id, customer_id, event_type, page, product_id, event_time)
VALUES
('E001',1,'view','product',472,CAST('2025-11-01 10:00:00' AS TIMESTAMP)),
('E002',1,'cart','product',472,CAST('2025-11-01 10:05:00' AS TIMESTAMP)),
('E003',1,'purchase','checkout',472,CAST('2025-11-01 10:10:00' AS TIMESTAMP)),
('E004',2,'view','product',1666,CAST('2025-11-02 09:00:00' AS TIMESTAMP)),
('E005',3,'purchase','checkout',9246,CAST('2025-11-03 14:30:00' AS TIMESTAMP)),
('E006',2,'view','product',1666,CAST('2025-11-03 11:00:00' AS TIMESTAMP)),
('E007',2,'cart','product',1666,CAST('2025-11-03 11:10:00' AS TIMESTAMP)),
('E008',4,'view','product',321,CAST('2025-11-04 08:30:00' AS TIMESTAMP)),
('E009',5,'view','product',889,CAST('2025-11-04 09:00:00' AS TIMESTAMP)),
('E010',5,'cart','product',889,CAST('2025-11-04 09:15:00' AS TIMESTAMP)),
('E011',6,'purchase','checkout',512,CAST('2025-11-05 14:00:00' AS TIMESTAMP)),
('E012',7,'view','product',734,CAST('2025-11-06 10:30:00' AS TIMESTAMP)),
('E013',8,'view','product',201,CAST('2025-11-06 13:00:00' AS TIMESTAMP)),
('E014',8,'cart','product',201,CAST('2025-11-06 13:20:00' AS TIMESTAMP)),
('E015',9,'purchase','checkout',3301,CAST('2025-11-07 16:00:00' AS TIMESTAMP)),
('E016',10,'view','product',4412,CAST('2025-11-08 09:45:00' AS TIMESTAMP)),
('E017',10,'purchase','checkout',4412,CAST('2025-11-08 10:00:00' AS TIMESTAMP)),
('E018',11,'view','home',0,CAST('2025-11-09 08:00:00' AS TIMESTAMP)),
('E019',12,'view','product',671,CAST('2025-11-10 11:00:00' AS TIMESTAMP)),
('E020',13,'view','product',882,CAST('2025-11-10 14:00:00' AS TIMESTAMP)),
('E021',13,'cart','product',882,CAST('2025-11-10 14:30:00' AS TIMESTAMP)),
('E022',14,'purchase','checkout',993,CAST('2025-11-11 10:00:00' AS TIMESTAMP)),
('E023',15,'view','product',1104,CAST('2025-11-12 09:00:00' AS TIMESTAMP)),
('E024',16,'view','product',2215,CAST('2025-11-13 15:00:00' AS TIMESTAMP)),
('E025',17,'purchase','checkout',3326,CAST('2025-11-14 11:30:00' AS TIMESTAMP)),
('E026',18,'view','product',4437,CAST('2025-11-15 13:00:00' AS TIMESTAMP)),
('E027',19,'cart','product',5548,CAST('2025-11-16 10:00:00' AS TIMESTAMP)),
('E028',20,'view','product',6659,CAST('2025-11-17 09:30:00' AS TIMESTAMP)),
('E029',21,'purchase','checkout',7770,CAST('2025-11-18 14:00:00' AS TIMESTAMP)),
('E030',22,'view','product',8881,CAST('2025-11-19 11:00:00' AS TIMESTAMP)),
('E031',23,'view','product',9992,CAST('2025-11-20 10:30:00' AS TIMESTAMP)),
('E032',24,'cart','product',1113,CAST('2025-11-21 16:00:00' AS TIMESTAMP)),
('E033',25,'purchase','checkout',2224,CAST('2025-11-22 09:00:00' AS TIMESTAMP)),
('E034',26,'view','product',3335,CAST('2025-11-23 14:30:00' AS TIMESTAMP)),
('E035',27,'view','product',4446,CAST('2025-11-24 11:00:00' AS TIMESTAMP)),
('E036',28,'cart','product',5557,CAST('2025-11-25 10:00:00' AS TIMESTAMP)),
('E037',29,'purchase','checkout',6668,CAST('2025-11-26 15:00:00' AS TIMESTAMP)),
('E038',30,'view','product',7779,CAST('2025-11-27 09:00:00' AS TIMESTAMP)),
('E039',31,'view','product',8880,CAST('2025-11-28 13:00:00' AS TIMESTAMP)),
('E040',32,'cart','product',9991,CAST('2025-11-29 11:30:00' AS TIMESTAMP)),
('E041',33,'purchase','checkout',1102,CAST('2025-11-30 10:00:00' AS TIMESTAMP)),
('E042',34,'view','product',2213,CAST('2025-12-01 09:30:00' AS TIMESTAMP)),
('E043',35,'view','product',3324,CAST('2025-12-02 14:00:00' AS TIMESTAMP)),
('E044',36,'cart','product',4435,CAST('2025-12-03 11:00:00' AS TIMESTAMP)),
('E045',37,'purchase','checkout',5546,CAST('2025-12-04 10:30:00' AS TIMESTAMP)),
('E046',38,'view','product',6657,CAST('2025-12-05 15:00:00' AS TIMESTAMP)),
('E047',39,'view','product',7768,CAST('2025-12-06 09:00:00' AS TIMESTAMP)),
('E048',40,'cart','product',8879,CAST('2025-12-07 13:00:00' AS TIMESTAMP)),
('E049',41,'purchase','checkout',9980,CAST('2025-12-08 11:00:00' AS TIMESTAMP)),
('E050',43,'purchase','checkout',2986,CAST('2025-12-13 14:00:00' AS TIMESTAMP));
验证 ODS 层数据量:
SELECT 'customers' AS tbl, COUNT(*) AS cnt FROM best_practice_customer_360.doc_ods_customers
UNION ALL
SELECT 'orders', COUNT(*) FROM best_practice_customer_360.doc_ods_orders
UNION ALL
SELECT 'order_items', COUNT(*) FROM best_practice_customer_360.doc_ods_order_items
UNION ALL
SELECT 'payments', COUNT(*) FROM best_practice_customer_360.doc_ods_payments
UNION ALL
SELECT 'returns', COUNT(*) FROM best_practice_customer_360.doc_ods_returns
UNION ALL
SELECT 'behavior', COUNT(*) FROM best_practice_customer_360.doc_ods_behavior_events;
MERGE INTO best_practice_customer_360.doc_ods_customers AS tgt
USING (
SELECT 51 AS customer_id, 'Chennai' AS city, CAST('2024-01-15' AS DATE) AS signup_date
UNION ALL
SELECT 1, 'Mumbai', CAST('2021-02-16' AS DATE)
) AS src ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN
UPDATE SET city = src.city, signup_date = src.signup_date
WHEN NOT MATCHED THEN
INSERT (customer_id, city, signup_date)
VALUES (src.customer_id, src.city, src.signup_date);
CREATE OR REPLACE FUNCTION best_practice_customer_360.calc_churn_score(
days_since_last_order INT,
total_orders INT,
avg_order_value DOUBLE,
return_rate DOUBLE
)
RETURNS DOUBLE
AS LEAST(100.0, GREATEST(0.0,
CASE WHEN days_since_last_order > 180 THEN 40.0
WHEN days_since_last_order > 90 THEN 25.0
WHEN days_since_last_order > 60 THEN 15.0
ELSE 5.0
END
+ CASE WHEN total_orders <= 1 THEN 20.0
WHEN total_orders <= 3 THEN 10.0
ELSE 0.0
END
+ CASE WHEN return_rate > 0.3 THEN 20.0
WHEN return_rate > 0.1 THEN 10.0
ELSE 0.0
END
+ CASE WHEN avg_order_value < 1000 THEN 10.0
ELSE 0.0
END
));
验证三类典型客户的评分:
SELECT
best_practice_customer_360.calc_churn_score(200, 1, 800.0, 0.35) AS score_high_risk,
best_practice_customer_360.calc_churn_score(45, 8, 12000.0, 0.02) AS score_low_risk,
best_practice_customer_360.calc_churn_score(100, 3, 2000.0, 0.12) AS score_medium_risk;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_dwd_customer_orders
AS
SELECT
c.customer_id,
c.city,
c.signup_date,
o.order_id,
o.order_date,
o.store_id,
o.promotion_id,
p.amount AS payment_amount,
DATEDIFF(CAST('2026-06-06' AS DATE), o.order_date) AS days_since_order
FROM best_practice_customer_360.doc_ods_customers c
JOIN best_practice_customer_360.doc_ods_orders o ON c.customer_id = o.customer_id
LEFT JOIN best_practice_customer_360.doc_ods_payments p ON o.order_id = p.order_id;
⚠️ 注意:Dynamic Table DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
。刷新调度通过 Studio Task 管理——这样可以在同一任务上附加数据质量检查和告警规则。
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dwd_customer_orders;
SELECT COUNT(*) AS silver_count FROM best_practice_customer_360.doc_dwd_customer_orders;
silver_count
------------
100
查看订单宽表样本:
SELECT customer_id, city, order_id, order_date, payment_amount, days_since_order
FROM best_practice_customer_360.doc_dwd_customer_orders
ORDER BY customer_id
LIMIT 8;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_dws_customer_rfm
AS
SELECT
o.customer_id,
c.city,
c.signup_date,
COUNT(DISTINCT o.order_id) AS frequency,
MIN(o.days_since_order) AS recency_days,
ROUND(SUM(o.payment_amount), 2) AS monetary,
ROUND(AVG(o.payment_amount), 2) AS avg_order_value,
DATEDIFF(CAST('2026-06-06' AS DATE), c.signup_date) AS customer_age_days
FROM best_practice_customer_360.doc_dwd_customer_orders o
JOIN best_practice_customer_360.doc_ods_customers c ON o.customer_id = c.customer_id
GROUP BY o.customer_id, c.city, c.signup_date;
手动触发刷新并查看高价值客户:
REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dws_customer_rfm;
SELECT customer_id, city, frequency, recency_days, monetary, avg_order_value, customer_age_days
FROM best_practice_customer_360.doc_dws_customer_rfm
ORDER BY monetary DESC
LIMIT 8;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_ads_churn_score
AS
WITH return_stats AS (
SELECT
o.customer_id,
COUNT(DISTINCT r.return_id) AS return_count,
COUNT(DISTINCT oi.order_item_id) AS item_count
FROM best_practice_customer_360.doc_ods_order_items oi
JOIN best_practice_customer_360.doc_ods_orders o ON oi.order_id = o.order_id
LEFT JOIN best_practice_customer_360.doc_ods_returns r ON oi.order_item_id = r.order_item_id
GROUP BY o.customer_id
),
behavior_stats AS (
SELECT
customer_id,
COUNT(*) AS total_events,
COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) AS purchase_events
FROM best_practice_customer_360.doc_ods_behavior_events
GROUP BY customer_id
)
SELECT
rfm.customer_id,
rfm.city,
rfm.frequency,
rfm.recency_days,
rfm.monetary,
rfm.avg_order_value,
COALESCE(rs.return_count, 0) AS return_count,
COALESCE(rs.item_count, 0) AS item_count,
CASE WHEN rs.item_count > 0
THEN ROUND(CAST(rs.return_count AS DOUBLE) / rs.item_count, 4)
ELSE 0.0 END AS return_rate,
COALESCE(bs.total_events, 0) AS total_events,
COALESCE(bs.purchase_events, 0) AS purchase_events,
ROUND(best_practice_customer_360.calc_churn_score(
CAST(rfm.recency_days AS INT),
CAST(rfm.frequency AS INT),
rfm.avg_order_value,
CASE WHEN rs.item_count > 0
THEN CAST(rs.return_count AS DOUBLE) / rs.item_count
ELSE 0.0 END
), 2) AS churn_score,
CASE
WHEN best_practice_customer_360.calc_churn_score(
CAST(rfm.recency_days AS INT), CAST(rfm.frequency AS INT),
rfm.avg_order_value,
CASE WHEN rs.item_count > 0
THEN CAST(rs.return_count AS DOUBLE) / rs.item_count ELSE 0.0 END
) >= 60 THEN 'HIGH'
WHEN best_practice_customer_360.calc_churn_score(
CAST(rfm.recency_days AS INT), CAST(rfm.frequency AS INT),
rfm.avg_order_value,
CASE WHEN rs.item_count > 0
THEN CAST(rs.return_count AS DOUBLE) / rs.item_count ELSE 0.0 END
) >= 30 THEN 'MEDIUM'
ELSE 'LOW'
END AS churn_risk
FROM best_practice_customer_360.doc_dws_customer_rfm rfm
LEFT JOIN return_stats rs ON rfm.customer_id = rs.customer_id
LEFT JOIN behavior_stats bs ON rfm.customer_id = bs.customer_id;
⚠️ 注意:
calc_churn_score
calc_churn_score
UDF 的第二个参数
total_orders
total_orders
类型为
INT
INT
,而 Dynamic Table 中
frequency
frequency
(来自
COUNT DISTINCT
COUNT DISTINCT
)是
BIGINT
BIGINT
,需要显式
CAST(rfm.frequency AS INT)
CAST(rfm.frequency AS INT)
,否则报类型不匹配错误。
手动触发刷新并查看高流失风险客户:
REFRESH DYNAMIC TABLE best_practice_customer_360.doc_ads_churn_score;
SELECT customer_id, city, frequency, recency_days, monetary,
avg_order_value, return_rate, churn_score, churn_risk
FROM best_practice_customer_360.doc_ads_churn_score
ORDER BY churn_score DESC, recency_days DESC
LIMIT 10;
SELECT churn_risk, COUNT(*) AS customer_count
FROM best_practice_customer_360.doc_ads_churn_score
GROUP BY churn_risk
ORDER BY churn_risk;
churn_risk | customer_count
-----------+---------------
HIGH | 20
MEDIUM | 30
按城市统计平均流失分:
SELECT city,
COUNT(*) AS customer_count,
ROUND(AVG(churn_score), 1) AS avg_churn_score,
COUNT(CASE WHEN churn_risk='HIGH' THEN 1 END) AS high_risk_count
FROM best_practice_customer_360.doc_ads_churn_score
GROUP BY city
ORDER BY avg_churn_score DESC;
SELECT
b.customer_id,
c.city,
COUNT(CASE WHEN b.event_type = 'view' THEN 1 END) AS views,
COUNT(CASE WHEN b.event_type = 'cart' THEN 1 END) AS carts,
COUNT(CASE WHEN b.event_type = 'purchase' THEN 1 END) AS purchases,
ROUND(COUNT(CASE WHEN b.event_type = 'purchase' THEN 1 END) * 100.0 / COUNT(*), 1)
AS purchase_rate_pct
FROM best_practice_customer_360.doc_ods_behavior_events b
JOIN best_practice_customer_360.doc_ods_customers c ON b.customer_id = c.customer_id
GROUP BY b.customer_id, c.city
HAVING COUNT(*) > 1
ORDER BY purchase_rate_pct DESC;
CREATE OR REPLACE VIEW best_practice_customer_360.sv_churn_risk_customers AS
SELECT
customer_id,
city,
frequency AS purchase_frequency,
recency_days AS days_since_last_purchase,
monetary AS total_spend,
avg_order_value,
return_rate,
total_events AS behavior_events,
purchase_events AS online_purchases,
churn_score,
churn_risk AS risk_level
FROM best_practice_customer_360.doc_ads_churn_score;
SELECT customer_id, city, purchase_frequency, days_since_last_purchase,
total_spend, churn_score, risk_level
FROM best_practice_customer_360.sv_churn_risk_customers
WHERE city = 'Bangalore'
AND risk_level = 'HIGH'
AND total_spend > 10000
ORDER BY churn_score DESC;