-- 先建 raw 字符串接收表,PIPE 写入 JSON 字符串
CREATE TABLE IF NOT EXISTS best_practice_uplift_model.kafka_raw_conversions (value STRING);
-- 创建 Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_uplift_model.pipe_conversions
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_uplift_model.kafka_raw_conversions
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'marketing_conversions', -- topic 名称
'',
'cz_uplift_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_exposures.csv' TO USER VOLUME FILE 'doc_exposures.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_uplift_model.doc_exposures
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_exposures.csv');
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_conversions.csv' TO USER VOLUME FILE 'doc_conversions.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_uplift_model.doc_conversions
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_conversions.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_uplift_model.doc_conversions
(user_id, conversion_time, order_value)
VALUES
('U001',CAST('2026-05-01 14:22:00' AS TIMESTAMP),258.00),
('U002',CAST('2026-05-01 15:10:00' AS TIMESTAMP),189.50),
('U004',CAST('2026-05-01 16:05:00' AS TIMESTAMP),320.00),
('U007',CAST('2026-05-01 18:00:00' AS TIMESTAMP),450.00),
('U009',CAST('2026-05-01 19:10:00' AS TIMESTAMP),175.00)
-- ...共 30 条
;
写入用户特征(20 条,来自 DMP 人群包):
从本地 CSV 导入(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_user_features.csv' TO USER VOLUME FILE 'doc_user_features.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_uplift_model.doc_user_features
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_user_features.csv');
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dwd_user_campaign_facts
AS
SELECT
e.user_id,
e.campaign_id,
e.channel,
e.exposure_time,
e.is_treated,
f.age_group,
f.region,
f.historical_purchase_count,
CASE WHEN c.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_converted,
c.order_value,
c.conversion_time
FROM best_practice_uplift_model.doc_exposures e
LEFT JOIN best_practice_uplift_model.doc_user_features f ON e.user_id = f.user_id
LEFT JOIN best_practice_uplift_model.doc_conversions c ON e.user_id = c.user_id;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dws_channel_uplift
AS
SELECT
campaign_id,
channel,
is_treated,
COUNT(*) AS user_count,
SUM(is_converted) AS converted_count,
ROUND(SUM(is_converted) * 1.0 / COUNT(*), 4) AS cvr,
ROUND(AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END), 2) AS avg_order_value
FROM best_practice_uplift_model.dwd_user_campaign_facts
GROUP BY campaign_id, channel, is_treated;
手动触发刷新并查看结果:
REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift;
SELECT campaign_id, channel, is_treated,
user_count, converted_count, cvr, avg_order_value
FROM best_practice_uplift_model.dws_channel_uplift
ORDER BY campaign_id, channel, is_treated
LIMIT 10;
-- 计算实验组人数和对照组人数(BITMAP 基数)
SELECT
GROUP_BITMAP(CASE WHEN is_treated=1
THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS treated_count,
GROUP_BITMAP(CASE WHEN is_treated=0
THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS control_count
FROM best_practice_uplift_model.doc_exposures;
WITH treated_set AS (
SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm
FROM best_practice_uplift_model.doc_exposures
WHERE is_treated = 1
),
converted_set AS (
SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm
FROM best_practice_uplift_model.doc_conversions
)
SELECT BITMAP_COUNT(BITMAP_AND(t.bm, c.bm)) AS treated_and_converted
FROM treated_set t CROSS JOIN converted_set c;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.ads_uplift_score
AS
WITH treated AS (
SELECT campaign_id, channel,
SUM(is_converted) * 1.0 / COUNT(*) AS cvr_treated,
AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_treated,
COUNT(*) AS cnt_treated
FROM best_practice_uplift_model.dwd_user_campaign_facts
WHERE is_treated = 1
GROUP BY campaign_id, channel
),
control AS (
SELECT campaign_id, channel,
SUM(is_converted) * 1.0 / COUNT(*) AS cvr_control,
AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_control,
COUNT(*) AS cnt_control
FROM best_practice_uplift_model.dwd_user_campaign_facts
WHERE is_treated = 0
GROUP BY campaign_id, channel
)
SELECT
t.campaign_id,
t.channel,
ROUND(t.cvr_treated, 4) AS cvr_treated,
ROUND(c.cvr_control, 4) AS cvr_control,
ROUND(t.cvr_treated - c.cvr_control, 4) AS uplift_cvr,
ROUND(t.arpu_treated - c.arpu_control, 2) AS uplift_arpu,
t.cnt_treated,
c.cnt_control,
CASE
WHEN t.cvr_treated - c.cvr_control > 0.5 THEN 'HIGH'
WHEN t.cvr_treated - c.cvr_control > 0.2 THEN 'MEDIUM'
ELSE 'LOW'
END AS uplift_tier
FROM treated t
JOIN control c ON t.campaign_id = c.campaign_id AND t.channel = c.channel;
Uplift 分级阈值说明:
等级
条件
含义
HIGH
uplift_cvr > 0.5
uplift_cvr > 0.5
营销干预带来超过 50% 转化率增量,强烈建议加大投入
MEDIUM
uplift_cvr > 0.2
uplift_cvr > 0.2
中等效果,结合客单价决定是否扩量
LOW
uplift_cvr ≤ 0.2
uplift_cvr ≤ 0.2
营销效果弱,可能大量触达"本来就会购买"的用户
手动触发刷新并查看评分结果:
REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score;
SELECT campaign_id, channel, cvr_treated, cvr_control,
uplift_cvr, uplift_arpu, cnt_treated, cnt_control, uplift_tier
FROM best_practice_uplift_model.ads_uplift_score
ORDER BY campaign_id, channel;
SELECT
channel,
SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) AS treated_revenue,
SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END) AS treated_users,
ROUND(
SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) /
NULLIF(SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END), 0),
2) AS roi_per_treated_user
FROM best_practice_uplift_model.dwd_user_campaign_facts
GROUP BY channel
ORDER BY roi_per_treated_user DESC;