CREATE PIPE IF NOT EXISTS best_practice_saas_dw.pipe_feature_usage
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_saas_dw.ods_kafka_raw_usage
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'saas_feature_usage', -- topic 名称
'',
'cz_saas_consumer', -- consumer group ID
'', '', '', '',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/ods_feature_usage.csv' TO USER VOLUME FILE 'ods_feature_usage.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_saas_dw.ods_feature_usage
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('ods_feature_usage.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_saas_dw.ods_feature_usage
(usage_id, subscription_id, usage_date, feature_name,
usage_count, usage_duration_secs, error_count, is_beta_feature)
VALUES
('U-1c6c24','S-0fcf7d', CAST('2023-07-27' AS DATE),'feature_20',9,5004,0,FALSE),
('U-f07cb8','S-c25263', CAST('2023-08-07' AS DATE),'feature_5', 9,369, 0,FALSE),
('U-a3b9d1','S-8cec59', CAST('2024-03-15' AS DATE),'feature_10',12,3200,1,TRUE),
('U-c4e2f0','S-0f6f44', CAST('2024-06-20' AS DATE),'feature_26',5,2800,0,FALSE);
验证 ODS 层数据量(以本文使用的完整数据集为例):
SELECT
(SELECT COUNT(*) FROM best_practice_saas_dw.ods_accounts) AS accounts,
(SELECT COUNT(*) FROM best_practice_saas_dw.ods_subscriptions) AS subscriptions,
(SELECT COUNT(*) FROM best_practice_saas_dw.ods_feature_usage) AS feature_usage,
(SELECT COUNT(*) FROM best_practice_saas_dw.ods_churn_events) AS churn_events;
CREATE OR REPLACE FUNCTION best_practice_saas_dw.mask_account_name(name STRING)
RETURNS STRING
AS CASE
WHEN current_user() IN ('privileged_user') THEN name -- 替换为实际授权用户名
ELSE CONCAT(SUBSTR(name, 1, 3), '****')
END;
💡 提示:将
'privileged_user'
'privileged_user'
替换为实际需要查看明文数据的用户名。Column Masking 通过
current_user()
current_user()
函数匹配当前连接的用户名,需将所有授权用户名显式列在
IN()
IN()
列表中。
ALTER TABLE best_practice_saas_dw.ods_accounts
CHANGE COLUMN account_name
SET MASK best_practice_saas_dw.mask_account_name;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_saas_dw.dwd_tenant_feature_usage
AS
SELECT
fu.usage_id,
fu.subscription_id,
s.account_id,
a.account_name,
a.industry,
a.country,
a.plan_tier,
a.seats,
a.churn_flag AS account_churn_flag,
fu.usage_date,
fu.feature_name,
fu.usage_count,
fu.usage_duration_secs,
fu.error_count,
fu.is_beta_feature,
s.mrr_amount,
s.billing_frequency,
s.upgrade_flag,
s.downgrade_flag,
s.churn_flag AS subscription_churn_flag,
fu.ingest_time
FROM best_practice_saas_dw.ods_feature_usage fu
JOIN best_practice_saas_dw.ods_subscriptions s ON fu.subscription_id = s.subscription_id
JOIN best_practice_saas_dw.ods_accounts a ON s.account_id = a.account_id;
⚠️ 注意:Dynamic Table 的 DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
。定期刷新通过在 Studio 创建"刷新动态表"任务来调度(见下方说明),这样可以在同一任务上附加监控告警和数据质量检查。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_saas_dw.dws_tenant_monthly_metrics
AS
SELECT
account_id,
MAX(account_name) AS account_name,
MAX(industry) AS industry,
MAX(country) AS country,
MAX(plan_tier) AS plan_tier,
MAX(seats) AS seats,
DATE_TRUNC('month', usage_date) AS usage_month,
COUNT(DISTINCT feature_name) AS distinct_features_used,
COUNT(DISTINCT CASE WHEN is_beta_feature THEN feature_name END) AS beta_features_used,
SUM(usage_count) AS total_usage_count,
SUM(usage_duration_secs) AS total_duration_secs,
ROUND(AVG(usage_duration_secs), 1) AS avg_duration_secs,
SUM(error_count) AS total_errors,
COUNT(*) AS usage_event_count,
MAX(mrr_amount) AS mrr_amount,
MAX(CAST(account_churn_flag AS INT)) AS churn_flag,
MAX(CAST(upgrade_flag AS INT)) AS upgrade_flag,
MAX(CAST(downgrade_flag AS INT)) AS downgrade_flag
FROM best_practice_saas_dw.dwd_tenant_feature_usage
GROUP BY account_id, DATE_TRUNC('month', usage_date);
同样在 Studio
best_practices/saas_dw/
best_practices/saas_dw/
路径下创建刷新任务,调度周期可设为每小时(DWS 刷新时间晚于 DWD)。
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_saas_dw.dws_tenant_monthly_metrics;
SELECT COUNT(*) AS dws_row_count
FROM best_practice_saas_dw.dws_tenant_monthly_metrics;
dws_row_count
-------------
4032
查看最近一个月的高活跃租户:
SELECT account_id, account_name, plan_tier, usage_month,
distinct_features_used, total_usage_count,
total_errors, mrr_amount, churn_flag
FROM best_practice_saas_dw.dws_tenant_monthly_metrics
ORDER BY usage_month DESC, total_usage_count DESC
LIMIT 8;
SELECT
-- 高健康:6 个功能、80 次用量、无错误、有升级
best_practice_saas_dw.calc_tenant_health_score(
CAST(6 AS INT), CAST(80 AS BIGINT), CAST(0 AS BIGINT),
CAST(20 AS BIGINT), CAST(1 AS INT), CAST(0 AS INT)
) AS high_health,
-- 中等:4 个功能、40 次用量、2/15 错误率
best_practice_saas_dw.calc_tenant_health_score(
CAST(4 AS INT), CAST(40 AS BIGINT), CAST(2 AS BIGINT),
CAST(15 AS BIGINT), CAST(0 AS INT), CAST(0 AS INT)
) AS mid_health,
-- 低健康:2 个功能、10 次用量、5/10 错误率、有降级
best_practice_saas_dw.calc_tenant_health_score(
CAST(2 AS INT), CAST(10 AS BIGINT), CAST(5 AS BIGINT),
CAST(10 AS BIGINT), CAST(0 AS INT), CAST(1 AS INT)
) AS low_health;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_saas_dw.ads_tenant_health_score
AS
SELECT
m.account_id,
m.account_name,
m.industry,
m.country,
m.plan_tier,
m.seats,
m.usage_month,
m.distinct_features_used,
m.total_usage_count,
m.total_errors,
m.usage_event_count,
m.beta_features_used,
m.mrr_amount,
m.churn_flag,
m.upgrade_flag,
m.downgrade_flag,
ROUND(best_practice_saas_dw.calc_tenant_health_score(
CAST(m.distinct_features_used AS INT),
m.total_usage_count,
m.total_errors,
m.usage_event_count,
CAST(m.upgrade_flag AS INT),
CAST(m.downgrade_flag AS INT)
), 1) AS health_score,
CASE
WHEN best_practice_saas_dw.calc_tenant_health_score(
CAST(m.distinct_features_used AS INT),
m.total_usage_count, m.total_errors, m.usage_event_count,
CAST(m.upgrade_flag AS INT), CAST(m.downgrade_flag AS INT)
) >= 70 THEN 'HEALTHY'
WHEN best_practice_saas_dw.calc_tenant_health_score(
CAST(m.distinct_features_used AS INT),
m.total_usage_count, m.total_errors, m.usage_event_count,
CAST(m.upgrade_flag AS INT), CAST(m.downgrade_flag AS INT)
) >= 40 THEN 'AT_RISK'
ELSE 'CHURN_RISK'
END AS health_tier,
ce.churn_date,
ce.reason_code AS churn_reason
FROM best_practice_saas_dw.dws_tenant_monthly_metrics m
LEFT JOIN best_practice_saas_dw.ods_churn_events ce
ON m.account_id = ce.account_id
AND DATE_TRUNC('month', ce.churn_date) = m.usage_month;
REFRESH DYNAMIC TABLE best_practice_saas_dw.ads_tenant_health_score;
SELECT COUNT(*) AS ads_count
FROM best_practice_saas_dw.ads_tenant_health_score;
ads_count
---------
4046
健康分层分布
SELECT health_tier,
COUNT(*) AS tenant_month_count,
ROUND(AVG(health_score), 1) AS avg_score
FROM best_practice_saas_dw.ads_tenant_health_score
GROUP BY health_tier
ORDER BY avg_score DESC;
SELECT health_tier, churn_flag, COUNT(*) AS cnt
FROM best_practice_saas_dw.ads_tenant_health_score
GROUP BY health_tier, churn_flag
ORDER BY health_tier, churn_flag;
SELECT reason_code,
COUNT(*) AS churn_count,
ROUND(AVG(refund_amount_usd), 2) AS avg_refund
FROM best_practice_saas_dw.ods_churn_events
GROUP BY reason_code
ORDER BY churn_count DESC;
SELECT plan_tier,
COUNT(*) AS accounts,
ROUND(AVG(seats), 1) AS avg_seats,
SUM(CAST(churn_flag AS INT)) AS churned_accounts,
ROUND(100.0 * SUM(CAST(churn_flag AS INT)) / COUNT(*), 1) AS churn_rate_pct
FROM best_practice_saas_dw.ods_accounts
GROUP BY plan_tier
ORDER BY churn_rate_pct DESC;
SELECT feature_name,
COUNT(DISTINCT subscription_id) AS user_count,
SUM(usage_count) AS total_usage,
ROUND(AVG(usage_duration_secs) / 60.0, 1) AS avg_minutes,
SUM(error_count) AS total_errors,
ROUND(100.0 * SUM(CAST(is_beta_feature AS INT)) / COUNT(*), 1) AS beta_pct
FROM best_practice_saas_dw.ods_feature_usage
GROUP BY feature_name
ORDER BY user_count DESC
LIMIT 6;
CREATE OR REPLACE VIEW best_practice_saas_dw.v_tenant_churn_risk AS
SELECT
account_id,
account_name,
industry,
country,
plan_tier,
seats,
usage_month,
ROUND(mrr_amount, 2) AS mrr_usd,
distinct_features_used,
total_usage_count,
total_errors,
beta_features_used,
health_score,
health_tier,
churn_flag,
churn_reason,
CASE
WHEN churn_flag = 1 AND churn_reason IS NOT NULL THEN churn_reason
WHEN health_tier = 'CHURN_RISK' AND churn_flag = 0 THEN 'predicted_risk'
ELSE 'stable'
END AS risk_signal
FROM best_practice_saas_dw.ads_tenant_health_score;
查询各流失信号的规模和平均 MRR(用于优先级排序):
SELECT risk_signal,
COUNT(*) AS cnt,
ROUND(AVG(mrr_usd), 0) AS avg_mrr
FROM best_practice_saas_dw.v_tenant_churn_risk
GROUP BY risk_signal
ORDER BY cnt DESC;
-- 授权 CS 角色查询 Semantic View
GRANT SELECT ON VIEW best_practice_saas_dw.v_tenant_churn_risk TO ROLE cs_team;
-- 授权分析师查询 DWS / ADS 层
GRANT SELECT ON DYNAMIC TABLE best_practice_saas_dw.dws_tenant_monthly_metrics TO ROLE analyst;
GRANT SELECT ON DYNAMIC TABLE best_practice_saas_dw.ads_tenant_health_score TO ROLE analyst;
-- ODS 层仅管理员可查
GRANT SELECT ON TABLE best_practice_saas_dw.ods_accounts TO ROLE admin;
GRANT SELECT ON TABLE best_practice_saas_dw.ods_subscriptions TO ROLE admin;