营销归因与 Uplift 建模数仓实践

在多渠道归因分析基础上,引入因果推断视角——将"本来就会购买的用户"与"因营销干预而购买的用户"区分开来,实现精准预算分配。本文以 4 个营销活动、50 条曝光记录、30 条转化记录为数据集,端到端演示 Kafka PIPE → ODS → DWD → DWS → ADS 的完整构建过程,并覆盖 Dynamic Table 增量计算、BITMAP 集合运算、ZettaPark Python Task 三项平台能力。


概述

Uplift 建模的核心挑战是:归因分析只能告诉你"点击了广告的用户中有多少转化了",却无法回答"如果没有这次广告,这些用户会不会同样购买"。解决这个问题需要实验组/对照组的因果对比,在数仓层面转化为以下几个子任务:

问题云器解决方案
实时摄取 App 内转化事件Kafka PIPE 持续消费,无需手写消费者代码
批量导入 DMP 曝光/点击日志OSS PIPE 批量导入,自动文件跟踪
用户-活动触点宽表自动维护Dynamic Table,声明式 SQL,增量刷新
实验组/对照组集合运算BITMAP 函数,亿级用户秒级交并差
运行 S-Learner / T-Learner / X-LearnerZettaPark Python Task,调用 EconML 因果推断库
定期刷新与质量告警Studio Task 调度,附加监控规则

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 ODS 层曝光、转化、用户特征表普通表,Kafka PIPE 和 INSERT 写入
CREATE PIPE
CREATE PIPE
创建 Kafka 持续摄取管道绑定到转化事件目标表
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
建 DWD / DWS / ADS 三层增量计算表无 REFRESH INTERVAL,调度由 Studio Task 管理
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用
GROUP_BITMAP_STATE
GROUP_BITMAP_STATE
构建用户 ID 集合 bitmap实验组/对照组用户集合
GROUP_BITMAP
GROUP_BITMAP
计算集合基数等价于 COUNT(DISTINCT) 的 bitmap 版本
BITMAP_AND
BITMAP_AND
求两个 bitmap 的交集找"实验组且已转化"的用户
BITMAP_COUNT
BITMAP_COUNT
计算 bitmap 的基数配合 BITMAP_AND 使用

前置准备

本文所有示例在

best_practice_uplift_model
best_practice_uplift_model
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_uplift_model;


ODS 层:原始数据接入

建表

三张 ODS 原始表:曝光记录、转化记录、用户特征。

-- 广告曝光表(含实验分组) CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_exposures ( user_id STRING, campaign_id STRING, channel STRING, exposure_time TIMESTAMP, is_treated INT -- 1=实验组(收到营销干预),0=对照组 ); -- 转化事件表 CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_conversions ( user_id STRING, conversion_time TIMESTAMP, order_value DOUBLE ); -- 用户画像特征表(来自 DMP) CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_user_features ( user_id STRING, age_group STRING, region STRING, historical_purchase_count INT );

Kafka 实时写入转化事件

方式一:通过 Kafka 实际写入(推荐)

在实际生产环境中,App 内转化事件通过 Kafka Topic 实时上报。在 Kafka broker 已配置的情况下,先创建 PIPE 持续消费:

-- 先建 raw 字符串接收表,PIPE 写入 JSON 字符串 CREATE TABLE IF NOT EXISTS best_practice_uplift_model.kafka_raw_conversions (value STRING); -- 创建 Kafka PIPE CREATE PIPE IF NOT EXISTS best_practice_uplift_model.pipe_conversions VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO best_practice_uplift_model.kafka_raw_conversions FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- 替换为实际 broker 地址 'marketing_conversions', -- topic 名称 '', 'cz_uplift_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

PIPE 创建后默认运行,每隔 60 秒批量消费一次。向 topic 发送 JSON 消息的 Python 示例:

from kafka import KafkaProducer import json, time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) conversion_event = { "user_id": "U001", "conversion_time": "2026-05-01 14:22:00", "order_value": 258.00 } producer.send('marketing_conversions', value=conversion_event) producer.flush()

方式二:INSERT 模拟(无 Kafka 环境时)

若未配置 Kafka,可通过以下方式写入数据。

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_exposures.csv' TO USER VOLUME FILE 'doc_exposures.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_uplift_model.doc_exposures FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_exposures.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

若未配置 Kafka,可通过

INSERT INTO
INSERT INTO
直接写入目标表,模拟 Kafka 消息已解析写入的效果,便于验证后续 Dynamic Table 和查询逻辑。以下 INSERT 已通过
cz-cli
cz-cli
实际执行:

INSERT INTO best_practice_uplift_model.doc_exposures (user_id, campaign_id, channel, exposure_time, is_treated) VALUES ('U001','CMP001','wechat', CAST('2026-05-01 09:00:00' AS TIMESTAMP),1), ('U002','CMP001','wechat', CAST('2026-05-01 09:05:00' AS TIMESTAMP),1), ('U003','CMP001','wechat', CAST('2026-05-01 09:10:00' AS TIMESTAMP),0), ('U004','CMP001','douyin', CAST('2026-05-01 09:15:00' AS TIMESTAMP),1), ('U005','CMP001','douyin', CAST('2026-05-01 09:20:00' AS TIMESTAMP),0), ('U006','CMP001','douyin', CAST('2026-05-01 09:25:00' AS TIMESTAMP),1), ('U007','CMP002','search', CAST('2026-05-01 10:00:00' AS TIMESTAMP),1), ('U008','CMP002','search', CAST('2026-05-01 10:05:00' AS TIMESTAMP),0), ('U009','CMP002','search', CAST('2026-05-01 10:10:00' AS TIMESTAMP),1), ('U010','CMP002','search', CAST('2026-05-01 10:15:00' AS TIMESTAMP),0) -- ...共 50 条,含 30 个实验组用户(is_treated=1)和 20 个对照组用户(is_treated=0) ;

验证数据写入情况:

SELECT is_treated, COUNT(*) AS users FROM best_practice_uplift_model.doc_exposures GROUP BY is_treated ORDER BY is_treated;

is_treated | users -----------+------ 0 | 20 1 | 30

写入转化记录(30 条,覆盖实验组中的高意向用户):

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_conversions.csv' TO USER VOLUME FILE 'doc_conversions.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_uplift_model.doc_conversions FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_conversions.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_uplift_model.doc_conversions (user_id, conversion_time, order_value) VALUES ('U001',CAST('2026-05-01 14:22:00' AS TIMESTAMP),258.00), ('U002',CAST('2026-05-01 15:10:00' AS TIMESTAMP),189.50), ('U004',CAST('2026-05-01 16:05:00' AS TIMESTAMP),320.00), ('U007',CAST('2026-05-01 18:00:00' AS TIMESTAMP),450.00), ('U009',CAST('2026-05-01 19:10:00' AS TIMESTAMP),175.00) -- ...共 30 条 ;

写入用户特征(20 条,来自 DMP 人群包):

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_user_features.csv' TO USER VOLUME FILE 'doc_user_features.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_uplift_model.doc_user_features FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_user_features.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_uplift_model.doc_user_features (user_id, age_group, region, historical_purchase_count) VALUES ('U001','25-34','shanghai',8), ('U002','35-44','beijing',3), ('U003','18-24','guangzhou',1), ('U004','25-34','shenzhen',12), ('U007','35-44','chengdu',15) -- ...共 20 条 ;


DWD 层:用户-活动触点宽表

建 Dynamic Table

DWD 层将三张 ODS 表 JOIN 为一张宽表,是后续所有分析的基础。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dwd_user_campaign_facts AS SELECT e.user_id, e.campaign_id, e.channel, e.exposure_time, e.is_treated, f.age_group, f.region, f.historical_purchase_count, CASE WHEN c.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_converted, c.order_value, c.conversion_time FROM best_practice_uplift_model.doc_exposures e LEFT JOIN best_practice_uplift_model.doc_user_features f ON e.user_id = f.user_id LEFT JOIN best_practice_uplift_model.doc_conversions c ON e.user_id = c.user_id;

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_uplift_model.dwd_user_campaign_facts;

查看宽表前几行:

SELECT user_id, campaign_id, channel, is_treated, age_group, region, is_converted, order_value FROM best_practice_uplift_model.dwd_user_campaign_facts LIMIT 5;

user_id | campaign_id | channel | is_treated | age_group | region | is_converted | order_value --------+-------------+---------+------------+-----------+-----------+--------------+------------ U031 | CMP004 | wechat | 1 | null | null | 1 | 480 U036 | CMP004 | email | 1 | null | null | 1 | 175 U039 | CMP004 | sms | 1 | null | null | 1 | 165 U044 | CMP002 | display | 1 | null | null | 1 | 280 U045 | CMP003 | email | 1 | null | null | 1 | 350

age_group
age_group
region
region
为 null 的用户是 ODS user_features 中未覆盖的用户(doc_user_features 仅有 20 条,曝光表有 50 条),LEFT JOIN 保留了所有曝光记录。


DWS 层:渠道级 Uplift 汇聚

建 Dynamic Table

DWS 层以

campaign_id × channel × is_treated
campaign_id × channel × is_treated
为粒度聚合,输出各渠道在实验组和对照组中的转化率与平均客单价。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dws_channel_uplift AS SELECT campaign_id, channel, is_treated, COUNT(*) AS user_count, SUM(is_converted) AS converted_count, ROUND(SUM(is_converted) * 1.0 / COUNT(*), 4) AS cvr, ROUND(AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END), 2) AS avg_order_value FROM best_practice_uplift_model.dwd_user_campaign_facts GROUP BY campaign_id, channel, is_treated;

手动触发刷新并查看结果:

REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift; SELECT campaign_id, channel, is_treated, user_count, converted_count, cvr, avg_order_value FROM best_practice_uplift_model.dws_channel_uplift ORDER BY campaign_id, channel, is_treated LIMIT 10;

campaign_id | channel | is_treated | user_count | converted_count | cvr | avg_order_value ------------+---------+------------+------------+-----------------+--------+---------------- CMP001 | douyin | 0 | 2 | 0 | 0.0000 | 0 CMP001 | douyin | 1 | 3 | 3 | 1.0000 | 261.63 CMP001 | search | 0 | 1 | 0 | 0.0000 | 0 CMP001 | search | 1 | 1 | 1 | 1.0000 | 430 CMP001 | wechat | 0 | 3 | 0 | 0.0000 | 0 CMP001 | wechat | 1 | 4 | 4 | 1.0000 | 210.13 CMP002 | display | 0 | 2 | 0 | 0.0000 | 0 CMP002 | display | 1 | 4 | 4 | 1.0000 | 182 CMP002 | search | 0 | 4 | 0 | 0.0000 | 0 CMP002 | search | 1 | 3 | 3 | 1.0000 | 281.67

BITMAP 集合运算:实验组与转化用户的交集

BITMAP 函数适合在亿级用户场景下快速完成实验组/对照组与转化人群的集合运算,避免大规模 JOIN。

-- 计算实验组人数和对照组人数(BITMAP 基数) SELECT GROUP_BITMAP(CASE WHEN is_treated=1 THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS treated_count, GROUP_BITMAP(CASE WHEN is_treated=0 THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS control_count FROM best_practice_uplift_model.doc_exposures;

treated_count | control_count --------------+-------------- 30 | 20

计算"实验组中已转化"的用户数(交集):

WITH treated_set AS ( SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm FROM best_practice_uplift_model.doc_exposures WHERE is_treated = 1 ), converted_set AS ( SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm FROM best_practice_uplift_model.doc_conversions ) SELECT BITMAP_COUNT(BITMAP_AND(t.bm, c.bm)) AS treated_and_converted FROM treated_set t CROSS JOIN converted_set c;

treated_and_converted --------------------- 30

实验组 30 个用户全部在转化记录中出现——与模拟数据的设计一致(转化记录均来自实验组用户)。


ADS 层:Uplift 评分与 ROI 推荐

建 Dynamic Table

ADS 层计算每个渠道的 Uplift CVR(实验组转化率 - 对照组转化率)和 Uplift ARPU(人均收益增量),并按三档等级标注。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.ads_uplift_score AS WITH treated AS ( SELECT campaign_id, channel, SUM(is_converted) * 1.0 / COUNT(*) AS cvr_treated, AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_treated, COUNT(*) AS cnt_treated FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE is_treated = 1 GROUP BY campaign_id, channel ), control AS ( SELECT campaign_id, channel, SUM(is_converted) * 1.0 / COUNT(*) AS cvr_control, AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_control, COUNT(*) AS cnt_control FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE is_treated = 0 GROUP BY campaign_id, channel ) SELECT t.campaign_id, t.channel, ROUND(t.cvr_treated, 4) AS cvr_treated, ROUND(c.cvr_control, 4) AS cvr_control, ROUND(t.cvr_treated - c.cvr_control, 4) AS uplift_cvr, ROUND(t.arpu_treated - c.arpu_control, 2) AS uplift_arpu, t.cnt_treated, c.cnt_control, CASE WHEN t.cvr_treated - c.cvr_control > 0.5 THEN 'HIGH' WHEN t.cvr_treated - c.cvr_control > 0.2 THEN 'MEDIUM' ELSE 'LOW' END AS uplift_tier FROM treated t JOIN control c ON t.campaign_id = c.campaign_id AND t.channel = c.channel;

Uplift 分级阈值说明

等级条件含义
HIGH
uplift_cvr > 0.5
uplift_cvr > 0.5
营销干预带来超过 50% 转化率增量,强烈建议加大投入
MEDIUM
uplift_cvr > 0.2
uplift_cvr > 0.2
中等效果,结合客单价决定是否扩量
LOW
uplift_cvr ≤ 0.2
uplift_cvr ≤ 0.2
营销效果弱,可能大量触达"本来就会购买"的用户

手动触发刷新并查看评分结果:

REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score; SELECT campaign_id, channel, cvr_treated, cvr_control, uplift_cvr, uplift_arpu, cnt_treated, cnt_control, uplift_tier FROM best_practice_uplift_model.ads_uplift_score ORDER BY campaign_id, channel;

campaign_id | channel | cvr_treated | cvr_control | uplift_cvr | uplift_arpu | cnt_treated | cnt_control | uplift_tier ------------+---------+-------------+-------------+------------+-------------+-------------+-------------+------------ CMP001 | douyin | 1.0000 | 0.0000 | 1.0000 | 261.63 | 3 | 2 | HIGH CMP001 | search | 1.0000 | 0.0000 | 1.0000 | 430.00 | 1 | 1 | HIGH CMP001 | wechat | 1.0000 | 0.0000 | 1.0000 | 210.13 | 4 | 3 | HIGH CMP002 | display | 1.0000 | 0.0000 | 1.0000 | 182.00 | 4 | 2 | HIGH CMP002 | search | 1.0000 | 0.0000 | 1.0000 | 281.67 | 3 | 4 | HIGH CMP003 | email | 1.0000 | 0.0000 | 1.0000 | 413.33 | 3 | 1 | HIGH CMP003 | push | 1.0000 | 0.0000 | 1.0000 | 217.50 | 2 | 2 | HIGH CMP003 | sms | 1.0000 | 0.0000 | 1.0000 | 244.50 | 2 | 1 | HIGH CMP004 | douyin | 1.0000 | 0.0000 | 1.0000 | 200.00 | 2 | 1 | HIGH CMP004 | email | 1.0000 | 0.0000 | 1.0000 | 175.00 | 1 | 1 | HIGH CMP004 | push | 1.0000 | 0.0000 | 1.0000 | 240.00 | 1 | 1 | HIGH CMP004 | wechat | 1.0000 | 0.0000 | 1.0000 | 300.00 | 2 | 1 | HIGH

结果解读

  • 模拟数据中对照组(
    is_treated=0
    is_treated=0
    )无转化记录,因此所有渠道的
    uplift_cvr = 1.0
    uplift_cvr = 1.0
    ,全部为 HIGH 等级。在真实生产数据中,对照组会有自然转化,
    uplift_cvr
    uplift_cvr
    通常在 0.05–0.30 之间,不同渠道会出现明显分层。
  • uplift_arpu
    uplift_arpu
    排名来看:search(¥430)email(¥413) 渠道的单用户收益增量最高,适合作为重点预算投入方向。
  • display(¥182) 渠道的
    uplift_arpu
    uplift_arpu
    最低,虽然当前数据集中转化率同样为 100%,但在实际场景中展示广告的自然转化率较高,Uplift CVR 往往偏低。

渠道 ROI 分析

SELECT channel, SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) AS treated_revenue, SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END) AS treated_users, ROUND( SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) / NULLIF(SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END), 0), 2) AS roi_per_treated_user FROM best_practice_uplift_model.dwd_user_campaign_facts GROUP BY channel ORDER BY roi_per_treated_user DESC;

channel | treated_revenue | treated_users | roi_per_treated_user --------+-----------------+---------------+--------------------- email | 1415.00 | 4 | 353.75 search | 1275.00 | 4 | 318.75 wechat | 1440.50 | 6 | 240.08 douyin | 1184.90 | 5 | 236.98 sms | 944.00 | 4 | 236.00 push | 675.00 | 3 | 225.00 display | 728.00 | 4 | 182.00

结果解读

email
email
search
search
渠道每个实验组用户带来的平均收益最高,是预算优先分配方向;
display
display
渠道虽然总收入(¥728)可观,但人均 ROI 垫底,说明触达用户较多但单用户价值偏低。


ZettaPark Python Task:Meta-Learner Uplift 建模

场景说明

SQL 层计算的是简单的 CVR 差值(ATE,平均处理效应),适合渠道级汇总分析。ZettaPark Python Task 进一步在个体层面估算处理效应(ITE),使用 Meta-Learner 框架(S-Learner / T-Learner / X-Learner),识别哪些用户是真正的 Persuadables(有说服空间)。

代码示例(T-Learner)

from clickzetta_zettapark.session import Session from sklearn.ensemble import GradientBoostingClassifier import pandas as pd # 通过 ZettaPark 连接 Lakehouse session = Session.builder.configs({ "instance": "<instance>", "workspace": "<workspace>", "schema": "best_practice_uplift_model", "vcluster": "DEFAULT", "username": "<username>", "password": "<password>" }).create() # 读取 DWD 宽表 df = session.sql(""" SELECT user_id, is_treated, is_converted, COALESCE(historical_purchase_count, 0) AS hist_purchase, CASE age_group WHEN '18-24' THEN 1 WHEN '25-34' THEN 2 WHEN '35-44' THEN 3 WHEN '45-54' THEN 4 ELSE 0 END AS age_bucket FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE age_group IS NOT NULL """).to_pandas() # T-Learner:分别对实验组和对照组训练模型 features = ['hist_purchase', 'age_bucket'] treatment_df = df[df['is_treated'] == 1] control_df = df[df['is_treated'] == 0] m1 = GradientBoostingClassifier(n_estimators=50, random_state=42) m0 = GradientBoostingClassifier(n_estimators=50, random_state=42) m1.fit(treatment_df[features], treatment_df['is_converted']) m0.fit(control_df[features], control_df['is_converted']) # 对全体用户预测 ITE(个体处理效应) df['p1'] = m1.predict_proba(df[features])[:, 1] df['p0'] = m0.predict_proba(df[features])[:, 1] df['ite'] = df['p1'] - df['p0'] # 写回 Lakehouse result_df = session.create_dataframe(df[['user_id', 'ite']]) result_df.write.save_as_table( "best_practice_uplift_model.ads_user_ite_scores", mode="overwrite" ) print(f"ITE scores written: {len(df)} users")

使用 EconML 的 X-Learner(更高精度)

对于有足量样本的场景,X-Learner 通过双重残差消除混淆偏差,估算更准确:

from econml.metalearners import XLearner from sklearn.ensemble import RandomForestClassifier xl = XLearner( models=RandomForestClassifier(n_estimators=100, random_state=42) ) X = df[features].values T = df['is_treated'].values Y = df['is_converted'].values xl.fit(Y, T, X=X) ite_scores = xl.effect(X)


调度管理:Studio Task

Dynamic Table 的定期刷新不写在 DDL 里,而是通过 Studio Task 调度,这样可以在同一任务上附加监控告警和数据质量检查规则。

在 Studio 中,于

best_practices/uplift_model/
best_practices/uplift_model/
路径下创建以下刷新任务:

  1. refresh_dwd_user_campaign_facts

    • SQL:
      REFRESH DYNAMIC TABLE best_practice_uplift_model.dwd_user_campaign_facts
      REFRESH DYNAMIC TABLE best_practice_uplift_model.dwd_user_campaign_facts
    • 调度:每小时整点
    • 质检:检查
      is_converted IS NULL
      is_converted IS NULL
      比例不超过 80%
  2. refresh_dws_channel_uplift

    • SQL:
      REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift
      REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift
    • 调度:依赖
      refresh_dwd_user_campaign_facts
      refresh_dwd_user_campaign_facts
      成功后触发
    • 告警:
      uplift_cvr
      uplift_cvr
      所有渠道全部为 0 时触发异常通知
  3. refresh_ads_uplift_score

    • SQL:
      REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score
      REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score
    • 调度:依赖
      refresh_dws_channel_uplift
      refresh_dws_channel_uplift
      成功后触发
    • 告警:HIGH 等级渠道数量较前日下降超过 50% 时发送告警
  4. run_uplift_ml_task(ZettaPark Python Task)

    • 脚本:上节 T-Learner 代码
    • 调度:每日 02:00
    • 输出:写入
      ads_user_ite_scores
      ads_user_ite_scores

配置 Studio Task DAG 依赖关系,确保数据层级从 DWD 到 ADS 顺序刷新后,再触发机器学习任务。


增量计算说明

dwd_user_campaign_facts
dwd_user_campaign_facts
dws_channel_uplift
dws_channel_uplift
都是以 ODS 三张表为上游的 Dynamic Table。当上游插入新曝光或新转化记录时,Dynamic Table 框架会自动检测变更并增量计算,不需要全量重跑。

这对 Uplift 建模场景尤其重要:

  • 每轮新活动结束后,将实验组/对照组的最终曝光和转化数据批量写入 ODS
  • 触发 DWD → DWS → ADS 的链式刷新
  • ADS 层即可看到最新的 Uplift CVR 和分级结果,驱动下一轮预算决策

注意事项

  • Dynamic Table DDL 不写
    REFRESH INTERVAL
    REFRESH INTERVAL
    ,调度由 Studio Task 管理,可附加监控告警和质检规则。
  • BITMAP 函数(
    GROUP_BITMAP_STATE
    GROUP_BITMAP_STATE
    /
    BITMAP_AND
    BITMAP_AND
    /
    BITMAP_COUNT
    BITMAP_COUNT
    )要求用户 ID 转为 BIGINT 类型。
    user_id
    user_id
    为字符串时,用
    CAST(SUBSTR(user_id, 2) AS BIGINT)
    CAST(SUBSTR(user_id, 2) AS BIGINT)
    提取数字部分。
  • Meta-Learner 依赖随机分组(RCT)假设。若实验分组有偏差,ITE 估算结果会失真,需要结合 Propensity Score 修正。
  • Uplift CVR 差值计算要求实验组和对照组在同一渠道、同一活动内配对。若某渠道只有实验组没有对照组,
    JOIN
    JOIN
    会过滤该渠道,
    ads_uplift_score
    ads_uplift_score
    中不会出现该行。
  • avg_order_value
    avg_order_value
    在 DWS 层使用
    AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END)
    AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END)
    而非
    AVG(order_value)
    AVG(order_value)
    ,目的是将未转化用户的 order_value(NULL)等价为 0,保证分母是总用户数而非仅转化用户数。

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询