先写入活动元数据,供 DWD 层补全活动名称、创意类型和预算,也供 Inverted Index 搜索使用:
INSERT INTO best_practice_ad_attribution.bronze_campaign_meta
(campaign_id, campaign_name, channel, budget, start_date, end_date, creative_id, creative_type)
VALUES
('camp_001','Google Search Spring Promo', 'google', 50000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_g1', 'text'),
('camp_001','Google Search Spring Promo', 'google', 50000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_g2', 'text'),
('camp_002','WeChat Moments Brand Awareness', 'wechat', 80000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_w1', 'video'),
('camp_002','WeChat Moments Brand Awareness', 'wechat', 80000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_w2', 'image'),
('camp_003','Douyin Short Video Retargeting', 'douyin', 120000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_d1', 'video'),
('camp_003','Douyin Short Video Retargeting', 'douyin', 120000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_d2', 'video'),
('camp_004','Weibo Topic Engagement', 'weibo', 30000, CAST('2024-01-01' AS DATE), CAST('2024-01-31' AS DATE), 'cr_wb1', 'image');
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_campaign_name
ON TABLE bronze_campaign_meta (campaign_name)
PROPERTIES('analyzer'='english');
构建索引(使存量数据可被搜索):
BUILD INDEX idx_inv_campaign_name ON bronze_campaign_meta;
⚠️ 注意:
BUILD INDEX
BUILD INDEX
只接受不带 Schema 前缀的表名,需在
best_practice_ad_attribution
best_practice_ad_attribution
Schema 上下文下执行,或通过
-s best_practice_ad_attribution
-s best_practice_ad_attribution
参数切换。
CREATE INDEX
CREATE INDEX
只对新数据生效,存量数据需执行
BUILD INDEX
BUILD INDEX
后才可搜索。
验证全文搜索效果:
SELECT campaign_id, campaign_name, channel
FROM best_practice_ad_attribution.bronze_campaign_meta
WHERE MATCH_ALL(campaign_name, 'video');
campaign_id campaign_name channel
----------- ---------------------------------- -------
camp_003 Douyin Short Video Retargeting douyin
camp_003 Douyin Short Video Retargeting douyin
-- 先创建 OSS 存储连接(替换为实际 AK/SK 和 endpoint)
CREATE STORAGE CONNECTION IF NOT EXISTS conn_oss_ga
TYPE = 'OSS'
ACCESS_ID = '<your-access-id>'
ACCESS_KEY = '<your-access-key>'
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com';
-- 挂载存储桶到 Volume
CREATE EXTERNAL VOLUME IF NOT EXISTS vol_ga_exports
TYPE = 'OSS'
BUCKET = '<your-bucket>'
PATH = 'ga-exports/'
CONNECTION = conn_oss_ga;
-- 创建 OSS PIPE,LIST_PURGE 模式扫描新增 CSV 文件
CREATE PIPE IF NOT EXISTS best_practice_ad_attribution.pipe_ga_events
VIRTUAL_CLUSTER = 'DEFAULT'
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO best_practice_ad_attribution.bronze_ad_events
FROM (
SELECT
$1 AS event_id,
$2 AS user_id,
'google' AS channel,
$3 AS event_type,
CAST($4 AS TIMESTAMP) AS event_time,
$5 AS campaign_id,
$6 AS creative_id,
$7 AS platform,
$8 AS region,
CURRENT_TIMESTAMP() AS ingest_time
FROM @vol_ga_exports
)
USING csv
OPTIONS('header'='true', 'sep'=',');
💡 提示:
LIST_PURGE
LIST_PURGE
模式会在文件成功写入后将其从 Volume 扫描列表中标记为已处理,避免重复导入。适合 GA 每日全量文件场景;如需精确去重或支持文件重跑,改用
LIST_RETAIN
LIST_RETAIN
模式并由下游 Dynamic Table 做幂等处理。
配置 Kafka PIPE(实时点击流)
实时 Web / App 点击流通过 Kafka 接入,写入同一张 Bronze 表:
-- 创建 Kafka 存储连接
CREATE STORAGE CONNECTION IF NOT EXISTS conn_kafka_clickstream
TYPE = 'KAFKA'
KAFKA_BROKERS = '<kafka-broker>:9092';
-- 创建 Kafka PIPE,每 60 秒批量消费一次
CREATE PIPE IF NOT EXISTS best_practice_ad_attribution.pipe_kafka_clickstream
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_ad_attribution.bronze_ad_events
FROM (
SELECT
get_json_object(value, '$.event_id') AS event_id,
get_json_object(value, '$.user_id') AS user_id,
get_json_object(value, '$.channel') AS channel,
get_json_object(value, '$.event_type') AS event_type,
CAST(get_json_object(value, '$.event_time') AS TIMESTAMP) AS event_time,
get_json_object(value, '$.campaign_id') AS campaign_id,
get_json_object(value, '$.creative_id') AS creative_id,
get_json_object(value, '$.platform') AS platform,
get_json_object(value, '$.region') AS region,
CURRENT_TIMESTAMP() AS ingest_time
FROM READ_KAFKA(
'<kafka-broker>:9092',
'ad_clickstream', -- topic 名称
'', 'cz_ad_consumer',
'','','','',
'json', 'json', 0, map()
)
);
写入模拟数据
从本地 CSV 导入(推荐)
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/bronze_ad_events.csv' TO USER VOLUME FILE 'bronze_ad_events.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_ad_attribution.bronze_ad_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('bronze_ad_events.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_ad_attribution.bronze_ad_events
(event_id, user_id, channel, event_type, event_time, campaign_id, creative_id, platform, region)
VALUES
('e001','u001','google', 'impression', CAST('2024-01-01 08:00:00' AS TIMESTAMP),'camp_001','cr_g1','web','cn-north'),
('e002','u001','google', 'click', CAST('2024-01-01 08:05:00' AS TIMESTAMP),'camp_001','cr_g1','web','cn-north'),
('e003','u001','wechat', 'impression', CAST('2024-01-01 09:00:00' AS TIMESTAMP),'camp_002','cr_w1','app','cn-north'),
('e004','u001','wechat', 'click', CAST('2024-01-01 09:10:00' AS TIMESTAMP),'camp_002','cr_w1','app','cn-north'),
('e005','u001','wechat', 'conversion', CAST('2024-01-01 10:30:00' AS TIMESTAMP),'camp_002','cr_w1','app','cn-north'),
('e006','u002','douyin', 'impression', CAST('2024-01-01 10:00:00' AS TIMESTAMP),'camp_003','cr_d1','app','cn-south'),
('e007','u002','douyin', 'click', CAST('2024-01-01 10:15:00' AS TIMESTAMP),'camp_003','cr_d1','app','cn-south'),
('e008','u002','google', 'click', CAST('2024-01-01 11:00:00' AS TIMESTAMP),'camp_001','cr_g2','web','cn-south'),
('e009','u002','google', 'conversion', CAST('2024-01-01 11:45:00' AS TIMESTAMP),'camp_001','cr_g2','web','cn-south'),
('e010','u003','weibo', 'impression', CAST('2024-01-01 12:00:00' AS TIMESTAMP),'camp_004','cr_wb1','web','cn-east'),
('e011','u003','weibo', 'click', CAST('2024-01-01 12:20:00' AS TIMESTAMP),'camp_004','cr_wb1','web','cn-east'),
('e012','u003','douyin', 'impression', CAST('2024-01-01 13:00:00' AS TIMESTAMP),'camp_003','cr_d2','app','cn-east'),
('e013','u003','douyin', 'click', CAST('2024-01-01 13:10:00' AS TIMESTAMP),'camp_003','cr_d2','app','cn-east'),
('e014','u003','douyin', 'conversion', CAST('2024-01-01 14:00:00' AS TIMESTAMP),'camp_003','cr_d2','app','cn-east'),
('e015','u004','google', 'impression', CAST('2024-01-01 14:00:00' AS TIMESTAMP),'camp_001','cr_g1','web','cn-north'),
('e016','u004','google', 'click', CAST('2024-01-01 14:05:00' AS TIMESTAMP),'camp_001','cr_g1','web','cn-north'),
('e017','u004','wechat', 'impression', CAST('2024-01-01 15:00:00' AS TIMESTAMP),'camp_002','cr_w2','app','cn-north'),
('e018','u004','wechat', 'conversion', CAST('2024-01-01 16:00:00' AS TIMESTAMP),'camp_002','cr_w2','app','cn-north'),
('e019','u005','douyin', 'click', CAST('2024-01-01 16:00:00' AS TIMESTAMP),'camp_003','cr_d1','app','cn-west'),
('e020','u005','douyin', 'conversion', CAST('2024-01-01 17:00:00' AS TIMESTAMP),'camp_003','cr_d1','app','cn-west');
开启变更跟踪并创建 Table Stream
Table Stream 用于捕获
bronze_ad_events
bronze_ad_events
的新增转化事件,触发下游 Dynamic Table 的归因重算。使用前需开启表的
change_tracking
change_tracking
:
ALTER TABLE best_practice_ad_attribution.bronze_ad_events
SET TBLPROPERTIES ('change_tracking' = 'true');
CREATE TABLE STREAM IF NOT EXISTS best_practice_ad_attribution.stream_conversion_events
ON TABLE best_practice_ad_attribution.bronze_ad_events
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
💡 提示:选
APPEND_ONLY
APPEND_ONLY
模式,因为广告事件表是追加写入的(不做 UPDATE/DELETE),此模式性能优于 STANDARD 模式。Stream 只记录 创建之后 写入的新行,开启前已存在的历史数据不会出现在 Stream 中。
DELETE FROM best_practice_ad_attribution.bronze_ad_events
WHERE event_id IN ('e021', 'e022', 'e023');
下游 Dynamic Table 在下次刷新周期时,会自动感知上游新数据并增量计算。本文后续归因结果基于清理后的主数据集,Last Touch 的 google 转化数保持为 1。
DWD 层:用户触点旅程表
建表
DWD 层将 Bronze 事件表与活动元数据 JOIN,补全
campaign_name
campaign_name
、
creative_type
creative_type
,并通过
ROW_NUMBER
ROW_NUMBER
为每个用户的触点序列打上
touch_seq
touch_seq
编号。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ad_attribution.dwd_user_journey
REFRESH INTERVAL 5 MINUTE VCLUSTER DEFAULT
AS
SELECT
e.user_id,
e.channel,
e.event_type,
e.event_time,
e.campaign_id,
e.creative_id,
e.platform,
e.region,
m.campaign_name,
m.creative_type,
m.budget,
ROW_NUMBER() OVER (PARTITION BY e.user_id ORDER BY e.event_time) AS touch_seq
FROM best_practice_ad_attribution.bronze_ad_events e
LEFT JOIN best_practice_ad_attribution.bronze_campaign_meta m
ON e.campaign_id = m.campaign_id AND e.creative_id = m.creative_id;
SELECT user_id, channel, event_type, event_time, campaign_name, touch_seq
FROM best_practice_ad_attribution.dwd_user_journey
ORDER BY user_id, touch_seq
LIMIT 10;
user_id channel event_type event_time campaign_name touch_seq
------- ------- ---------- ------------------- -------------------------------- ---------
u001 google impression 2024-01-01T08:00:00 Google Search Spring Promo 1
u001 google click 2024-01-01T08:05:00 Google Search Spring Promo 2
u001 wechat impression 2024-01-01T09:00:00 WeChat Moments Brand Awareness 3
u001 wechat click 2024-01-01T09:10:00 WeChat Moments Brand Awareness 4
u001 wechat conversion 2024-01-01T10:30:00 WeChat Moments Brand Awareness 5
u002 douyin impression 2024-01-01T10:00:00 Douyin Short Video Retargeting 1
u002 douyin click 2024-01-01T10:15:00 Douyin Short Video Retargeting 2
u002 google click 2024-01-01T11:00:00 Google Search Spring Promo 3
u002 google conversion 2024-01-01T11:45:00 Google Search Spring Promo 4
u003 weibo impression 2024-01-01T12:00:00 Weibo Topic Engagement 1
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ad_attribution.dws_attribution_last_touch
REFRESH INTERVAL 5 MINUTE VCLUSTER DEFAULT
AS
WITH conversions AS (
SELECT user_id, event_time AS conv_time, campaign_id, channel
FROM best_practice_ad_attribution.dwd_user_journey
WHERE event_type = 'conversion'
),
last_touch AS (
SELECT
c.user_id,
c.conv_time,
j.channel AS attributed_channel,
j.campaign_name AS attributed_campaign_name
FROM conversions c
JOIN best_practice_ad_attribution.dwd_user_journey j
ON c.user_id = j.user_id
AND j.event_type IN ('click', 'impression')
AND j.event_time <= c.conv_time
WHERE j.touch_seq = (
SELECT MAX(touch_seq)
FROM best_practice_ad_attribution.dwd_user_journey j2
WHERE j2.user_id = c.user_id
AND j2.event_type IN ('click', 'impression')
AND j2.event_time <= c.conv_time
)
)
SELECT
attributed_channel,
attributed_campaign_name,
COUNT(*) AS conversions,
1.0 * COUNT(*) AS attributed_value
FROM last_touch
GROUP BY attributed_channel, attributed_campaign_name;
REFRESH DYNAMIC TABLE best_practice_ad_attribution.dws_attribution_last_touch;
SELECT *
FROM best_practice_ad_attribution.dws_attribution_last_touch
ORDER BY conversions DESC, attributed_channel DESC;
attributed_channel attributed_campaign_name conversions attributed_value
------------------ -------------------------------- ----------- ----------------
wechat WeChat Moments Brand Awareness 2 2.0
douyin Douyin Short Video Retargeting 2 2.0
google Google Search Spring Promo 1 1.0
Last Touch 模型将转化完全归因给最后触点,微信和抖音各获得 2 次转化。但这低估了 google 在用户决策路径中的前置作用(u001 先看到 google 广告才最终在微信转化)。
Linear 归因
将转化价值平均分配给路径上的每个触点。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ad_attribution.dws_attribution_linear
REFRESH INTERVAL 5 MINUTE VCLUSTER DEFAULT
AS
WITH conversions AS (
SELECT user_id, event_time AS conv_time
FROM best_practice_ad_attribution.dwd_user_journey
WHERE event_type = 'conversion'
),
touchpoints AS (
SELECT
c.user_id,
j.channel,
j.campaign_name,
j.event_time,
COUNT(*) OVER (PARTITION BY c.user_id) AS total_touches
FROM conversions c
JOIN best_practice_ad_attribution.dwd_user_journey j
ON c.user_id = j.user_id
AND j.event_type IN ('click', 'impression')
AND j.event_time <= c.conv_time
)
SELECT
channel AS attributed_channel,
campaign_name AS attributed_campaign_name,
COUNT(*) AS touch_count,
ROUND(SUM(1.0 / total_touches), 4) AS attributed_value
FROM touchpoints
GROUP BY channel, campaign_name
ORDER BY attributed_value DESC;
REFRESH DYNAMIC TABLE best_practice_ad_attribution.dws_attribution_linear;
SELECT * FROM best_practice_ad_attribution.dws_attribution_linear ORDER BY attributed_value DESC;
attributed_channel attributed_campaign_name touch_count attributed_value
------------------ -------------------------------- ----------- ----------------
douyin Douyin Short Video Retargeting 5 2.1667
google Google Search Spring Promo 5 1.5000
wechat WeChat Moments Brand Awareness 3 0.8333
weibo Weibo Topic Engagement 2 0.5000
Linear 模型充分体现了全渠道贡献:抖音触点最多(5 次),获得最高归因价值;google 触点同样是 5 次,但用户路径较长,平均分配后价值为 1.5。微博虽然没有直接转化用户,但参与了用户的决策路径,也获得了 0.5 的归因价值。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ad_attribution.dws_attribution_position_based
REFRESH INTERVAL 5 MINUTE VCLUSTER DEFAULT
AS
WITH conversions AS (
SELECT user_id, event_time AS conv_time
FROM best_practice_ad_attribution.dwd_user_journey
WHERE event_type = 'conversion'
),
touchpoints AS (
SELECT
c.user_id,
j.channel,
j.campaign_name,
j.touch_seq,
j.event_time,
MIN(j.touch_seq) OVER (PARTITION BY c.user_id) AS first_touch,
MAX(j.touch_seq) OVER (PARTITION BY c.user_id) AS last_touch_seq,
COUNT(*) OVER (PARTITION BY c.user_id) AS total_touches
FROM conversions c
JOIN best_practice_ad_attribution.dwd_user_journey j
ON c.user_id = j.user_id
AND j.event_type IN ('click', 'impression')
AND j.event_time <= c.conv_time
),
with_weight AS (
SELECT
user_id,
channel,
campaign_name,
CASE
WHEN touch_seq = first_touch AND touch_seq = last_touch_seq THEN 1.0
WHEN touch_seq = first_touch OR touch_seq = last_touch_seq THEN 0.4
ELSE 0.2 / GREATEST(total_touches - 2, 1)
END AS weight
FROM touchpoints
)
SELECT
channel AS attributed_channel,
campaign_name AS attributed_campaign_name,
COUNT(*) AS touch_count,
ROUND(SUM(weight), 4) AS attributed_value
FROM with_weight
GROUP BY channel, campaign_name
ORDER BY attributed_value DESC;
REFRESH DYNAMIC TABLE best_practice_ad_attribution.dws_attribution_position_based;
SELECT * FROM best_practice_ad_attribution.dws_attribution_position_based ORDER BY attributed_value DESC;
attributed_channel attributed_campaign_name touch_count attributed_value
------------------ -------------------------------- ----------- ----------------
douyin Douyin Short Video Retargeting 5 2.1000
google Google Search Spring Promo 5 1.5000
wechat WeChat Moments Brand Awareness 3 0.9000
weibo Weibo Topic Engagement 2 0.5000
与 Linear 相比,Position-Based 给微信更高的权重(0.9 vs 0.83),因为微信是多次转化的末触点(40% 权重),而非平均分摊。这一模型适合"重拉新 + 重收口"的营销策略下的效果评估。
ADS 层:广告活动 ROI 报告
ADS 层汇总三种归因模型的输出,结合活动预算,输出完整的 ROI 指标供 BI 工具接入。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ad_attribution.ads_campaign_roi
REFRESH INTERVAL 10 MINUTE VCLUSTER DEFAULT
AS
WITH evt_stats AS (
SELECT
campaign_id,
campaign_name,
channel,
creative_type,
budget,
COUNT(CASE WHEN event_type = 'impression' THEN 1 END) AS impressions,
COUNT(CASE WHEN event_type = 'click' THEN 1 END) AS clicks,
COUNT(CASE WHEN event_type = 'conversion' THEN 1 END) AS conversions,
COUNT(DISTINCT user_id) AS unique_users
FROM best_practice_ad_attribution.dwd_user_journey
GROUP BY campaign_id, campaign_name, channel, creative_type, budget
)
SELECT
campaign_id,
campaign_name,
channel,
creative_type,
budget,
impressions,
clicks,
conversions,
unique_users,
ROUND(CASE WHEN impressions > 0 THEN 100.0 * clicks / impressions ELSE 0 END, 2) AS ctr_pct,
ROUND(CASE WHEN clicks > 0 THEN 100.0 * conversions / clicks ELSE 0 END, 2) AS cvr_pct,
ROUND(CASE WHEN conversions > 0 THEN budget / conversions ELSE NULL END, 2) AS cost_per_conversion
FROM evt_stats
ORDER BY conversions DESC;
REFRESH DYNAMIC TABLE best_practice_ad_attribution.ads_campaign_roi;
SELECT campaign_name, channel, impressions, clicks, conversions, ctr_pct, cvr_pct, cost_per_conversion
FROM best_practice_ad_attribution.ads_campaign_roi
ORDER BY conversions DESC, channel, clicks DESC;