零售客户 360 流失预测数仓实践

将线上电商行为、线下门店交易、退货记录整合为统一 Customer 360 视图,通过 RFM 指标和流失评分支持精准留存干预。本文以 50 万条真实零售数据集为基础,端到端演示 Kafka PIPE → Bronze → Silver → Gold 的完整构建过程,并覆盖 MERGE INTO 增量合并、SQL UDF 评分、Semantic View 自然语言查询等关键平台能力的落地用法。


概述

零售客户流失预测的典型挑战,以及云器 Lakehouse 的对应解法:

问题解决方案
多渠道数据孤岛(电商行为、POS 交易、客服工单)各自独立,无法关联Bronze 层建统一 ODS,MERGE INTO 做多源增量合并
客户购买记录实时变化,RFM 指标需要持续更新Dynamic Table 声明式级联刷新,系统自动追踪上游变更
流失评分逻辑复杂,多个下游表都要用SQL UDF 封装评分公式,Silver 和 Gold 层复用
运营同学需要用自然语言查询高风险客户Semantic View 对接 Analytics Agent
高基数
customer_id
customer_id
点查效率低
Bloomfilter Index 加速过滤

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 Bronze 层 ODS 原始表普通表,作为 Dynamic Table 上游
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
customer_id
customer_id
列建 Bloomfilter 索引
适合高基数列的点查加速
CREATE PIPE
CREATE PIPE
创建 Kafka 行为事件摄取管道持续摄取点击、加购、购买事件
MERGE INTO
MERGE INTO
增量合并多源客户数据支持 UPSERT 语义,更新已有客户或插入新客户
CREATE FUNCTION
CREATE FUNCTION
创建
calc_churn_score
calc_churn_score
评分 UDF
封装 recency + frequency + return_rate 评分公式
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 Silver / Gold 层增量计算表系统自动识别上游变更并增量刷新
CREATE VIEW
CREATE VIEW
创建 Semantic View提供语义化字段名,供 Analytics Agent 解析
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发首次刷新初次构建或调试时使用

前置准备

所有示例在

best_practice_customer_360
best_practice_customer_360
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_customer_360;

下载数据集并解压到本地:

kaggle datasets download -d datarspectrum/retail-data-warehouse-12-table-1m-rows-dataset \ --unzip -p /tmp/customer_360/

解压后包含 12 个 CSV 文件:

customers.csv
customers.csv
(5 万行)、
orders.csv
orders.csv
(30 万行)、
order_items.csv
order_items.csv
(60 万行)、
payments.csv
payments.csv
(30 万行)、
returns.csv
returns.csv
(3 万行)等。本文使用 50 位客户、100 笔订单、120 条订单明细作为演示数据集。


ODS 层:多渠道原始数据接入

ODS 层承接三类数据源:CRM/POS 交易数据(批量或 CDC 同步)、行为埋点事件(Kafka 流式接入)、退货记录(定时批量)。

建表

-- 客户基础信息表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_customers ( customer_id BIGINT, city STRING, signup_date DATE ); -- 订单表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_orders ( order_id BIGINT, customer_id BIGINT, store_id BIGINT, order_date DATE, promotion_id BIGINT ); -- 订单明细表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_order_items ( order_item_id BIGINT, order_id BIGINT, product_id BIGINT, qty INT, price DOUBLE ); -- 支付表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_payments ( payment_id BIGINT, order_id BIGINT, amount DOUBLE ); -- 退货表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_returns ( return_id BIGINT, order_item_id BIGINT, refund DOUBLE ); -- Kafka 行为事件原始表(接收 JSON 字符串) CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_kafka_behavior_raw ( value STRING, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() ); -- 行为事件解析表 CREATE TABLE IF NOT EXISTS best_practice_customer_360.doc_ods_behavior_events ( event_id STRING, customer_id BIGINT, event_type STRING, page STRING, product_id BIGINT, event_time TIMESTAMP, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

创建 Bloomfilter Index

doc_ods_orders
doc_ods_orders
doc_ods_behavior_events
doc_ods_behavior_events
都会按
customer_id
customer_id
频繁过滤,该列基数达到客户总量级(高基数),适合 Bloomfilter Index。

CREATE BLOOMFILTER INDEX idx_bf_customer_id ON TABLE doc_ods_orders (customer_id); CREATE BLOOMFILTER INDEX idx_bf_event_customer ON TABLE doc_ods_behavior_events (customer_id);

Kafka 行为事件摄取

方式一:通过 Kafka PIPE 实时摄取(生产推荐)

在生产环境中,用户点击、加购、购买等行为事件由前端埋点上报至 Kafka。通过 Kafka PIPE 持续摄取到 ODS 层:

CREATE PIPE IF NOT EXISTS best_practice_customer_360.pipe_behavior_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO best_practice_customer_360.doc_ods_kafka_behavior_raw FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- 替换为实际 broker 地址 'retail_behavior_events', -- topic 名称 '', 'cz_c360_consumer', -- consumer group '','','','', 'raw','raw', 0, map() ) );

以下是向 Kafka topic 发送行为事件的 Python 生产者示例:

from kafka import KafkaProducer import json import time from datetime import datetime producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # 构造行为事件消息 def send_behavior_event(customer_id, event_type, product_id=None): event = { "event_id": f"E{int(time.time()*1000)}", "customer_id": customer_id, "event_type": event_type, # view / cart / purchase "page": "product" if product_id else "home", "product_id": product_id, "event_time": datetime.utcnow().isoformat() } producer.send('retail_behavior_events', value=event) producer.flush() return event # 示例:发送一条购买事件 send_behavior_event(customer_id=1001, event_type='purchase', product_id=472)

方式二:INSERT 模拟(无 Kafka 环境时)

若暂未配置 Kafka,可先将数据保存为本地 CSV 文件,通过 cz-cli 上传到 User Volume 后用 COPY INTO 导入(推荐):

从本地 CSV 导入(推荐)

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/behavior_events_data.csv' TO USER VOLUME FILE 'behavior_events_data.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_customer_360.doc_ods_behavior_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('behavior_events_data.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_customer_360.doc_ods_behavior_events (event_id, customer_id, event_type, page, product_id, event_time) VALUES ('E001',1,'view','product',472,CAST('2025-11-01 10:00:00' AS TIMESTAMP)), ('E002',1,'cart','product',472,CAST('2025-11-01 10:05:00' AS TIMESTAMP)), ('E003',1,'purchase','checkout',472,CAST('2025-11-01 10:10:00' AS TIMESTAMP)), ('E004',2,'view','product',1666,CAST('2025-11-02 09:00:00' AS TIMESTAMP)), ('E005',3,'purchase','checkout',9246,CAST('2025-11-03 14:30:00' AS TIMESTAMP)), ('E006',2,'view','product',1666,CAST('2025-11-03 11:00:00' AS TIMESTAMP)), ('E007',2,'cart','product',1666,CAST('2025-11-03 11:10:00' AS TIMESTAMP)), ('E008',4,'view','product',321,CAST('2025-11-04 08:30:00' AS TIMESTAMP)), ('E009',5,'view','product',889,CAST('2025-11-04 09:00:00' AS TIMESTAMP)), ('E010',5,'cart','product',889,CAST('2025-11-04 09:15:00' AS TIMESTAMP)), ('E011',6,'purchase','checkout',512,CAST('2025-11-05 14:00:00' AS TIMESTAMP)), ('E012',7,'view','product',734,CAST('2025-11-06 10:30:00' AS TIMESTAMP)), ('E013',8,'view','product',201,CAST('2025-11-06 13:00:00' AS TIMESTAMP)), ('E014',8,'cart','product',201,CAST('2025-11-06 13:20:00' AS TIMESTAMP)), ('E015',9,'purchase','checkout',3301,CAST('2025-11-07 16:00:00' AS TIMESTAMP)), ('E016',10,'view','product',4412,CAST('2025-11-08 09:45:00' AS TIMESTAMP)), ('E017',10,'purchase','checkout',4412,CAST('2025-11-08 10:00:00' AS TIMESTAMP)), ('E018',11,'view','home',0,CAST('2025-11-09 08:00:00' AS TIMESTAMP)), ('E019',12,'view','product',671,CAST('2025-11-10 11:00:00' AS TIMESTAMP)), ('E020',13,'view','product',882,CAST('2025-11-10 14:00:00' AS TIMESTAMP)), ('E021',13,'cart','product',882,CAST('2025-11-10 14:30:00' AS TIMESTAMP)), ('E022',14,'purchase','checkout',993,CAST('2025-11-11 10:00:00' AS TIMESTAMP)), ('E023',15,'view','product',1104,CAST('2025-11-12 09:00:00' AS TIMESTAMP)), ('E024',16,'view','product',2215,CAST('2025-11-13 15:00:00' AS TIMESTAMP)), ('E025',17,'purchase','checkout',3326,CAST('2025-11-14 11:30:00' AS TIMESTAMP)), ('E026',18,'view','product',4437,CAST('2025-11-15 13:00:00' AS TIMESTAMP)), ('E027',19,'cart','product',5548,CAST('2025-11-16 10:00:00' AS TIMESTAMP)), ('E028',20,'view','product',6659,CAST('2025-11-17 09:30:00' AS TIMESTAMP)), ('E029',21,'purchase','checkout',7770,CAST('2025-11-18 14:00:00' AS TIMESTAMP)), ('E030',22,'view','product',8881,CAST('2025-11-19 11:00:00' AS TIMESTAMP)), ('E031',23,'view','product',9992,CAST('2025-11-20 10:30:00' AS TIMESTAMP)), ('E032',24,'cart','product',1113,CAST('2025-11-21 16:00:00' AS TIMESTAMP)), ('E033',25,'purchase','checkout',2224,CAST('2025-11-22 09:00:00' AS TIMESTAMP)), ('E034',26,'view','product',3335,CAST('2025-11-23 14:30:00' AS TIMESTAMP)), ('E035',27,'view','product',4446,CAST('2025-11-24 11:00:00' AS TIMESTAMP)), ('E036',28,'cart','product',5557,CAST('2025-11-25 10:00:00' AS TIMESTAMP)), ('E037',29,'purchase','checkout',6668,CAST('2025-11-26 15:00:00' AS TIMESTAMP)), ('E038',30,'view','product',7779,CAST('2025-11-27 09:00:00' AS TIMESTAMP)), ('E039',31,'view','product',8880,CAST('2025-11-28 13:00:00' AS TIMESTAMP)), ('E040',32,'cart','product',9991,CAST('2025-11-29 11:30:00' AS TIMESTAMP)), ('E041',33,'purchase','checkout',1102,CAST('2025-11-30 10:00:00' AS TIMESTAMP)), ('E042',34,'view','product',2213,CAST('2025-12-01 09:30:00' AS TIMESTAMP)), ('E043',35,'view','product',3324,CAST('2025-12-02 14:00:00' AS TIMESTAMP)), ('E044',36,'cart','product',4435,CAST('2025-12-03 11:00:00' AS TIMESTAMP)), ('E045',37,'purchase','checkout',5546,CAST('2025-12-04 10:30:00' AS TIMESTAMP)), ('E046',38,'view','product',6657,CAST('2025-12-05 15:00:00' AS TIMESTAMP)), ('E047',39,'view','product',7768,CAST('2025-12-06 09:00:00' AS TIMESTAMP)), ('E048',40,'cart','product',8879,CAST('2025-12-07 13:00:00' AS TIMESTAMP)), ('E049',41,'purchase','checkout',9980,CAST('2025-12-08 11:00:00' AS TIMESTAMP)), ('E050',43,'purchase','checkout',2986,CAST('2025-12-13 14:00:00' AS TIMESTAMP));

验证 ODS 层数据量:

SELECT 'customers' AS tbl, COUNT(*) AS cnt FROM best_practice_customer_360.doc_ods_customers UNION ALL SELECT 'orders', COUNT(*) FROM best_practice_customer_360.doc_ods_orders UNION ALL SELECT 'order_items', COUNT(*) FROM best_practice_customer_360.doc_ods_order_items UNION ALL SELECT 'payments', COUNT(*) FROM best_practice_customer_360.doc_ods_payments UNION ALL SELECT 'returns', COUNT(*) FROM best_practice_customer_360.doc_ods_returns UNION ALL SELECT 'behavior', COUNT(*) FROM best_practice_customer_360.doc_ods_behavior_events;

tbl | cnt -------------+---- customers | 51 orders | 100 order_items | 120 payments | 100 returns | 20 behavior | 50

MERGE INTO:增量合并多源客户数据

当 CRM 系统推送客户更新时,用

MERGE INTO
MERGE INTO
实现 UPSERT——已存在的客户更新城市和注册时间,新客户直接插入:

MERGE INTO best_practice_customer_360.doc_ods_customers AS tgt USING ( SELECT 51 AS customer_id, 'Chennai' AS city, CAST('2024-01-15' AS DATE) AS signup_date UNION ALL SELECT 1, 'Mumbai', CAST('2021-02-16' AS DATE) ) AS src ON tgt.customer_id = src.customer_id WHEN MATCHED THEN UPDATE SET city = src.city, signup_date = src.signup_date WHEN NOT MATCHED THEN INSERT (customer_id, city, signup_date) VALUES (src.customer_id, src.city, src.signup_date);

执行后

customers
customers
表从 50 行增加到 51 行(customer_id=51 是新客户),customer_id=1 的城市信息被更新为最新值。


流失评分 UDF

将流失预测评分逻辑封装为 SQL UDF,Silver 和 Gold 层均可复用。

评分规则:

  • Recency 贡献:超过 180 天未购买 +40 分,超过 90 天 +25 分,超过 60 天 +15 分,否则 +5 分
  • Frequency 贡献:仅 1 笔订单 +20 分,3 笔以内 +10 分,3 笔以上 +0 分
  • Return Rate 贡献:退货率 > 30% +20 分,> 10% +10 分,否则 +0 分
  • Order Value 贡献:平均客单价 < 1000 元 +10 分
  • 最终值限制在 0–100 分范围内

CREATE OR REPLACE FUNCTION best_practice_customer_360.calc_churn_score( days_since_last_order INT, total_orders INT, avg_order_value DOUBLE, return_rate DOUBLE ) RETURNS DOUBLE AS LEAST(100.0, GREATEST(0.0, CASE WHEN days_since_last_order > 180 THEN 40.0 WHEN days_since_last_order > 90 THEN 25.0 WHEN days_since_last_order > 60 THEN 15.0 ELSE 5.0 END + CASE WHEN total_orders <= 1 THEN 20.0 WHEN total_orders <= 3 THEN 10.0 ELSE 0.0 END + CASE WHEN return_rate > 0.3 THEN 20.0 WHEN return_rate > 0.1 THEN 10.0 ELSE 0.0 END + CASE WHEN avg_order_value < 1000 THEN 10.0 ELSE 0.0 END ));

验证三类典型客户的评分:

SELECT best_practice_customer_360.calc_churn_score(200, 1, 800.0, 0.35) AS score_high_risk, best_practice_customer_360.calc_churn_score(45, 8, 12000.0, 0.02) AS score_low_risk, best_practice_customer_360.calc_churn_score(100, 3, 2000.0, 0.12) AS score_medium_risk;

score_high_risk | score_low_risk | score_medium_risk ----------------+----------------+------------------ 90 | 5 | 45

  • 高风险客户(90 分):200 天未购买(+40)+ 仅 1 笔订单(+20)+ 退货率 35%(+20)+ 低客单价(+10)
  • 低风险客户(5 分):45 天内有购买(+5),8 笔订单且高客单价,几乎无流失信号
  • 中等风险(45 分):100 天未购买(+25)+ 3 笔以内订单(+10)+ 10% 退货率(+10)

Silver 层 Dynamic Table:订单宽表与 RFM 指标

DWD 层:客户订单宽表

DWD 层将客户维度、订单事实、支付金额三张表 JOIN 展平,并计算截至今日的距最近订单天数:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_dwd_customer_orders AS SELECT c.customer_id, c.city, c.signup_date, o.order_id, o.order_date, o.store_id, o.promotion_id, p.amount AS payment_amount, DATEDIFF(CAST('2026-06-06' AS DATE), o.order_date) AS days_since_order FROM best_practice_customer_360.doc_ods_customers c JOIN best_practice_customer_360.doc_ods_orders o ON c.customer_id = o.customer_id LEFT JOIN best_practice_customer_360.doc_ods_payments p ON o.order_id = p.order_id;

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dwd_customer_orders; SELECT COUNT(*) AS silver_count FROM best_practice_customer_360.doc_dwd_customer_orders;

silver_count ------------ 100

查看订单宽表样本:

SELECT customer_id, city, order_id, order_date, payment_amount, days_since_order FROM best_practice_customer_360.doc_dwd_customer_orders ORDER BY customer_id LIMIT 8;

customer_id | city | order_id | order_date | payment_amount | days_since_order ------------+-----------+----------+------------+----------------+----------------- 1 | Mumbai | 100 | 2021-01-21 | 19338 | 1962 1 | Mumbai | 50 | 2021-01-01 | 15966 | 1982 2 | Bangalore | 1 | 2021-08-26 | 1462 | 1745 2 | Bangalore | 51 | 2022-01-08 | 19953 | 1610 3 | Pune | 52 | 2021-11-09 | 12236 | 1670 3 | Pune | 2 | 2022-03-19 | 2272 | 1540 4 | Mumbai | 3 | 2021-01-21 | 1342 | 1962 4 | Mumbai | 53 | 2020-11-09 | 10052 | 2035

DWS 层:RFM 指标聚合

DWS 层以

customer_id
customer_id
为粒度聚合,计算 RFM 三大指标以及客户注册年龄:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_dws_customer_rfm AS SELECT o.customer_id, c.city, c.signup_date, COUNT(DISTINCT o.order_id) AS frequency, MIN(o.days_since_order) AS recency_days, ROUND(SUM(o.payment_amount), 2) AS monetary, ROUND(AVG(o.payment_amount), 2) AS avg_order_value, DATEDIFF(CAST('2026-06-06' AS DATE), c.signup_date) AS customer_age_days FROM best_practice_customer_360.doc_dwd_customer_orders o JOIN best_practice_customer_360.doc_ods_customers c ON o.customer_id = c.customer_id GROUP BY o.customer_id, c.city, c.signup_date;

手动触发刷新并查看高价值客户:

REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dws_customer_rfm; SELECT customer_id, city, frequency, recency_days, monetary, avg_order_value, customer_age_days FROM best_practice_customer_360.doc_dws_customer_rfm ORDER BY monetary DESC LIMIT 8;

customer_id | city | frequency | recency_days | monetary | avg_order_value | customer_age_days ------------+-----------+-----------+--------------+----------+-----------------+------------------ 1 | Mumbai | 2 | 1962 | 35304 | 17652 | 1936 16 | Bangalore | 2 | 1147 | 31290 | 15645 | 2613 24 | Bangalore | 2 | 1030 | 30731 | 15365.5 | 1335 46 | Pune | 2 | 1203 | 29931 | 14965.5 | 1876 25 | Bangalore | 2 | 934 | 29743 | 14871.5 | 2056 33 | Delhi | 2 | 1625 | 29435 | 14717.5 | 1541 27 | Mumbai | 2 | 1310 | 28516 | 14258 | 1155 37 | Bangalore | 2 | 1672 | 28433 | 14216.5 | 2673

结果解读

  • customer_id=1(Mumbai)总消费 35304 元,但
    recency_days=1962
    recency_days=1962
    (超过 5 年未购买),高消费历史与长期沉默并存,是典型的"历史高价值、当前高流失风险"客户,需要重点挽回。
  • customer_id=24(Bangalore)
    recency_days=1030
    recency_days=1030
    ,相对较新,且累计消费 30731 元,在 Bangalore 用户中属于高潜力客群。

Gold 层 Dynamic Table:流失评分与风险分级

ADS 层:流失评分宽表

ADS 层汇聚 RFM 指标、退货统计、行为事件,调用

calc_churn_score
calc_churn_score
UDF 打分,并输出三档风险等级:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_customer_360.doc_ads_churn_score AS WITH return_stats AS ( SELECT o.customer_id, COUNT(DISTINCT r.return_id) AS return_count, COUNT(DISTINCT oi.order_item_id) AS item_count FROM best_practice_customer_360.doc_ods_order_items oi JOIN best_practice_customer_360.doc_ods_orders o ON oi.order_id = o.order_id LEFT JOIN best_practice_customer_360.doc_ods_returns r ON oi.order_item_id = r.order_item_id GROUP BY o.customer_id ), behavior_stats AS ( SELECT customer_id, COUNT(*) AS total_events, COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) AS purchase_events FROM best_practice_customer_360.doc_ods_behavior_events GROUP BY customer_id ) SELECT rfm.customer_id, rfm.city, rfm.frequency, rfm.recency_days, rfm.monetary, rfm.avg_order_value, COALESCE(rs.return_count, 0) AS return_count, COALESCE(rs.item_count, 0) AS item_count, CASE WHEN rs.item_count > 0 THEN ROUND(CAST(rs.return_count AS DOUBLE) / rs.item_count, 4) ELSE 0.0 END AS return_rate, COALESCE(bs.total_events, 0) AS total_events, COALESCE(bs.purchase_events, 0) AS purchase_events, ROUND(best_practice_customer_360.calc_churn_score( CAST(rfm.recency_days AS INT), CAST(rfm.frequency AS INT), rfm.avg_order_value, CASE WHEN rs.item_count > 0 THEN CAST(rs.return_count AS DOUBLE) / rs.item_count ELSE 0.0 END ), 2) AS churn_score, CASE WHEN best_practice_customer_360.calc_churn_score( CAST(rfm.recency_days AS INT), CAST(rfm.frequency AS INT), rfm.avg_order_value, CASE WHEN rs.item_count > 0 THEN CAST(rs.return_count AS DOUBLE) / rs.item_count ELSE 0.0 END ) >= 60 THEN 'HIGH' WHEN best_practice_customer_360.calc_churn_score( CAST(rfm.recency_days AS INT), CAST(rfm.frequency AS INT), rfm.avg_order_value, CASE WHEN rs.item_count > 0 THEN CAST(rs.return_count AS DOUBLE) / rs.item_count ELSE 0.0 END ) >= 30 THEN 'MEDIUM' ELSE 'LOW' END AS churn_risk FROM best_practice_customer_360.doc_dws_customer_rfm rfm LEFT JOIN return_stats rs ON rfm.customer_id = rs.customer_id LEFT JOIN behavior_stats bs ON rfm.customer_id = bs.customer_id;

手动触发刷新并查看高流失风险客户:

REFRESH DYNAMIC TABLE best_practice_customer_360.doc_ads_churn_score; SELECT customer_id, city, frequency, recency_days, monetary, avg_order_value, return_rate, churn_score, churn_risk FROM best_practice_customer_360.doc_ads_churn_score ORDER BY churn_score DESC, recency_days DESC LIMIT 10;

customer_id | city | frequency | recency_days | monetary | avg_order_value | return_rate | churn_score | churn_risk ------------+-----------+-----------+--------------+----------+-----------------+-------------+-------------+----------- 4 | Mumbai | 2 | 1962 | 11394 | 5697.0 | 0.3333 | 70 | HIGH 22 | Mumbai | 2 | 1908 | 22159 | 11079.5 | 0.5 | 70 | HIGH 38 | Bangalore | 2 | 1837 | 13749 | 6874.5 | 0.5 | 70 | HIGH 10 | Bangalore | 2 | 1793 | 11238 | 5619.0 | 0.3333 | 70 | HIGH 21 | Mumbai | 2 | 1628 | 12813 | 6406.5 | 0.3333 | 70 | HIGH 2 | Bangalore | 2 | 1610 | 21415 | 10707.5 | 0.3333 | 70 | HIGH 18 | Bangalore | 2 | 1584 | 12997 | 6498.5 | 0.3333 | 70 | HIGH 32 | Delhi | 2 | 1492 | 14664 | 7332.0 | 0.5 | 70 | HIGH 13 | Bangalore | 2 | 1248 | 12936 | 6468.0 | 0.3333 | 70 | HIGH 7 | Delhi | 2 | 1219 | 13684 | 6842.0 | 0.3333 | 70 | HIGH

结果解读

  • 全部 HIGH 风险客户的
    churn_score=70
    churn_score=70
    ,说明他们共同特征是:180 天以上未购买(+40)+ 购买次数 ≤ 3(+10)+ 退货率 > 10%(+10)+ 无低客单价扣分(avg > 1000),合计 70 分。
  • customer_id=22 消费 22159 元,但 1908 天未购买(约 5 年),是高价值沉睡客户,适合重激活营销。
  • customer_id=4(Mumbai)
    return_rate=0.33
    return_rate=0.33
    ,说明三笔购买中有一笔退货,结合长期未购的信号,流失可能性高。

查看风险等级分布:

SELECT churn_risk, COUNT(*) AS customer_count FROM best_practice_customer_360.doc_ads_churn_score GROUP BY churn_risk ORDER BY churn_risk;

churn_risk | customer_count -----------+--------------- HIGH | 20 MEDIUM | 30

按城市统计平均流失分:

SELECT city, COUNT(*) AS customer_count, ROUND(AVG(churn_score), 1) AS avg_churn_score, COUNT(CASE WHEN churn_risk='HIGH' THEN 1 END) AS high_risk_count FROM best_practice_customer_360.doc_ads_churn_score GROUP BY city ORDER BY avg_churn_score DESC;

city | customer_count | avg_churn_score | high_risk_count ----------+----------------+-----------------+---------------- Bangalore | 19 | 59.5 | 8 Mumbai | 12 | 59.2 | 5 Delhi | 12 | 56.7 | 5 Pune | 7 | 52.9 | 2


行为事件分析

除订单数据外,行为埋点数据可以揭示"浏览但不购买"的潜在流失信号:

SELECT b.customer_id, c.city, COUNT(CASE WHEN b.event_type = 'view' THEN 1 END) AS views, COUNT(CASE WHEN b.event_type = 'cart' THEN 1 END) AS carts, COUNT(CASE WHEN b.event_type = 'purchase' THEN 1 END) AS purchases, ROUND(COUNT(CASE WHEN b.event_type = 'purchase' THEN 1 END) * 100.0 / COUNT(*), 1) AS purchase_rate_pct FROM best_practice_customer_360.doc_ods_behavior_events b JOIN best_practice_customer_360.doc_ods_customers c ON b.customer_id = c.customer_id GROUP BY b.customer_id, c.city HAVING COUNT(*) > 1 ORDER BY purchase_rate_pct DESC;

customer_id | city | views | carts | purchases | purchase_rate_pct ------------+-----------+-------+-------+-----------+------------------ 3 | Pune | 1 | 0 | 1 | 50.0 10 | Bangalore | 1 | 0 | 1 | 50.0 1 | Mumbai | 1 | 1 | 1 | 33.3 5 | Delhi | 1 | 1 | 0 | 0.0 8 | Bangalore | 1 | 1 | 0 | 0.0 2 | Bangalore | 2 | 0 | 0 | 0.0

customer_id=5 和 customer_id=8 存在"加购未支付"的放弃行为,结合订单流失分,可触发购物车挽回推送。


Semantic View:Analytics Agent 自然语言查询

对 Gold 层流失评分表创建 Semantic View,提供语义化的字段名称,供 Analytics Agent 解析自然语言查询:

CREATE OR REPLACE VIEW best_practice_customer_360.sv_churn_risk_customers AS SELECT customer_id, city, frequency AS purchase_frequency, recency_days AS days_since_last_purchase, monetary AS total_spend, avg_order_value, return_rate, total_events AS behavior_events, purchase_events AS online_purchases, churn_score, churn_risk AS risk_level FROM best_practice_customer_360.doc_ads_churn_score;

示例查询:用自然语言"找出 Bangalore 高流失风险、消费超过 1 万元的客户"——Analytics Agent 会转换为:

SELECT customer_id, city, purchase_frequency, days_since_last_purchase, total_spend, churn_score, risk_level FROM best_practice_customer_360.sv_churn_risk_customers WHERE city = 'Bangalore' AND risk_level = 'HIGH' AND total_spend > 10000 ORDER BY churn_score DESC;

customer_id | city | purchase_frequency | days_since_last_purchase | total_spend | churn_score | risk_level ------------+-----------+--------------------+--------------------------+-------------+-------------+----------- 2 | Bangalore | 2 | 1610 | 21415 | 70 | HIGH 18 | Bangalore | 2 | 1584 | 12997 | 70 | HIGH 38 | Bangalore | 2 | 1837 | 13749 | 70 | HIGH


Studio 任务调度 Dynamic Table 刷新

Dynamic Table 的定期刷新通过 Studio 任务管理,三张 DT 构成有依赖关系的 DAG:

# 在 best_practices/customer_360/ 路径下创建刷新任务 cz-cli task create-folder "customer_360" --parent <best_practices_folder_id> -p skill_test # 创建三个刷新任务 cz-cli task create "refresh_dwd_customer_orders" --type SQL --folder <folder_id> -p skill_test cz-cli task create "refresh_dws_customer_rfm" --type SQL --folder <folder_id> -p skill_test cz-cli task create "refresh_ads_churn_score" --type SQL --folder <folder_id> -p skill_test # 设置每个任务的 SQL 内容 cz-cli task save-content refresh_dwd_customer_orders \ --content "REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dwd_customer_orders;" \ -p skill_test cz-cli task save-content refresh_dws_customer_rfm \ --content "REFRESH DYNAMIC TABLE best_practice_customer_360.doc_dws_customer_rfm;" \ -p skill_test cz-cli task save-content refresh_ads_churn_score \ --content "REFRESH DYNAMIC TABLE best_practice_customer_360.doc_ads_churn_score;" \ -p skill_test # 配置调度时间(每天凌晨依次刷新) cz-cli task save-cron refresh_dwd_customer_orders --cron "0 2 * * *" -p skill_test cz-cli task save-cron refresh_dws_customer_rfm --cron "0 3 * * *" -p skill_test cz-cli task save-cron refresh_ads_churn_score --cron "0 4 * * *" -p skill_test # 配置任务依赖关系(DAG) cz-cli task save-config refresh_dws_customer_rfm \ --deps replace \ --dep-tasks '[{"taskId":<dwd_task_id>,"taskName":"refresh_dwd_customer_orders"}]' \ -p skill_test cz-cli task save-config refresh_ads_churn_score \ --deps replace \ --dep-tasks '[{"taskId":<rfm_task_id>,"taskName":"refresh_dws_customer_rfm"}]' \ -p skill_test

DAG 执行顺序:

02:00 refresh_dwd_customer_orders (DWD 层 · 客户订单宽表) ↓ 依赖完成后 03:00 refresh_dws_customer_rfm (DWS 层 · RFM 指标) ↓ 依赖完成后 04:00 refresh_ads_churn_score (ADS 层 · 流失评分)


数仓对象总览

构建完成后,

best_practice_customer_360
best_practice_customer_360
Schema 下的对象:

SHOW TABLES IN best_practice_customer_360;

schema_name | table_name | is_view | is_dynamic ----------------------------+------------------------------+---------+----------- best_practice_customer_360 | doc_ods_customers | false | false best_practice_customer_360 | doc_ods_orders | false | false best_practice_customer_360 | doc_ods_order_items | false | false best_practice_customer_360 | doc_ods_payments | false | false best_practice_customer_360 | doc_ods_returns | false | false best_practice_customer_360 | doc_ods_kafka_behavior_raw | false | false best_practice_customer_360 | doc_ods_behavior_events | false | false best_practice_customer_360 | doc_dwd_customer_orders | false | true best_practice_customer_360 | doc_dws_customer_rfm | false | true best_practice_customer_360 | doc_ads_churn_score | false | true best_practice_customer_360 | sv_churn_risk_customers | true | false

数据流架构:


注意事项

  • Dynamic Table 不写

    REFRESH INTERVAL
    REFRESH INTERVAL
    :DDL 中不包含
    REFRESH INTERVAL
    REFRESH INTERVAL
    参数,由 Studio 任务(
    refresh_dwd_customer_orders
    refresh_dwd_customer_orders
    refresh_dws_customer_rfm
    refresh_dws_customer_rfm
    refresh_ads_churn_score
    refresh_ads_churn_score
    )按 DAG 顺序调度。这样可以在同一任务上挂载质量检查和告警规则,不只是刷新。

  • calc_churn_score
    calc_churn_score
    UDF 类型匹配:第二个参数
    total_orders
    total_orders
    声明为
    INT
    INT
    ,但 Dynamic Table 中
    COUNT DISTINCT
    COUNT DISTINCT
    返回
    BIGINT
    BIGINT
    ,需要在调用前显式
    CAST(frequency AS INT)
    CAST(frequency AS INT)
    ,否则 SQL 编译阶段报类型不匹配错误。

  • MERGE INTO 写入模式与 Dynamic Table 增量刷新:ODS 层使用

    MERGE INTO
    MERGE INTO
    追加写入时,Dynamic Table 能追踪到新增和更新的行,正常增量刷新。若改用
    INSERT OVERWRITE
    INSERT OVERWRITE
    全量覆盖,会导致 Dynamic Table 退化为全量重算。推荐始终使用
    INSERT INTO
    INSERT INTO
    MERGE INTO
    MERGE INTO
    保持追加模式。

  • Bloomfilter Index 对存量数据限制

    CREATE BLOOMFILTER INDEX
    CREATE BLOOMFILTER INDEX
    只对创建后写入的新数据自动生效,对已有数据无过滤加速效果。
    BLOOMFILTER
    BLOOMFILTER
    类型不支持
    BUILD INDEX
    BUILD INDEX
    命令,若需覆盖存量数据需重建表。

  • Semantic View 字段命名

    sv_churn_risk_customers
    sv_churn_risk_customers
    中的字段名应尽量使用业务语言(如
    days_since_last_purchase
    days_since_last_purchase
    而非
    recency_days
    recency_days
    ),帮助 Analytics Agent 更准确地解析自然语言中的意图。

  • churn_score
    churn_score
    公式调参:当前权重(recency 最高 40 分)适合以近期活跃度为核心的电商场景。高客单价 B2B 场景建议降低 recency 权重、提高 monetary 权重;订阅制场景建议引入"连续未续费月数"替换 recency。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询