CREATE PIPE IF NOT EXISTS best_practice_product_analytics.pipe_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_product_analytics.doc_events
FROM (
SELECT
GET_JSON_OBJECT(CAST(value AS STRING), '$.user_id') AS user_id,
GET_JSON_OBJECT(CAST(value AS STRING), '$.session_id') AS session_id,
GET_JSON_OBJECT(CAST(value AS STRING), '$.event_name') AS event_name,
CAST(GET_JSON_OBJECT(CAST(value AS STRING), '$.event_time') AS TIMESTAMP) AS event_time,
GET_JSON_OBJECT(CAST(value AS STRING), '$.page_url') AS page_url,
GET_JSON_OBJECT(CAST(value AS STRING), '$.properties') AS properties
FROM READ_KAFKA(
'<kafka-broker>:9092',
'sdk_tracking_events',
'',
'cz_pa_consumer',
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_product_analytics.doc_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_product_analytics.doc_events VALUES
-- S001: U001 完整漏斗(2026-05-10)
('U001','S001','page_view', CAST('2026-05-10 10:00:00' AS TIMESTAMP),'/home', '{"referrer":"google","duration_s":12}'),
('U001','S001','product_view',CAST('2026-05-10 10:01:30' AS TIMESTAMP),'/product/101', '{"product_id":"101","category":"electronics"}'),
('U001','S001','add_to_cart', CAST('2026-05-10 10:03:00' AS TIMESTAMP),'/product/101', '{"product_id":"101","price":299}'),
('U001','S001','checkout', CAST('2026-05-10 10:05:00' AS TIMESTAMP),'/checkout', '{"cart_value":299,"currency":"CNY"}'),
('U001','S001','purchase', CAST('2026-05-10 10:07:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD001","amount":299}'),
-- S002: U002 完整漏斗(2026-05-10)
('U002','S002','page_view', CAST('2026-05-10 10:10:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'),
('U002','S002','product_view',CAST('2026-05-10 10:11:00' AS TIMESTAMP),'/product/202', '{"product_id":"202","category":"clothing"}'),
('U002','S002','add_to_cart', CAST('2026-05-10 10:13:00' AS TIMESTAMP),'/product/202', '{"product_id":"202","price":99}'),
('U002','S002','checkout', CAST('2026-05-10 10:15:00' AS TIMESTAMP),'/checkout', '{"cart_value":99,"currency":"CNY"}'),
('U002','S002','purchase', CAST('2026-05-10 10:17:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD002","amount":99}'),
-- S003: U003 完整漏斗(2026-05-10)
('U003','S003','page_view', CAST('2026-05-10 10:20:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'),
('U003','S003','product_view',CAST('2026-05-10 10:21:30' AS TIMESTAMP),'/product/303', '{"product_id":"303","category":"books"}'),
('U003','S003','add_to_cart', CAST('2026-05-10 10:23:00' AS TIMESTAMP),'/product/303', '{"product_id":"303","price":49}'),
('U003','S003','checkout', CAST('2026-05-10 10:25:00' AS TIMESTAMP),'/checkout', '{"cart_value":49,"currency":"CNY"}'),
('U003','S003','purchase', CAST('2026-05-10 10:27:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD003","amount":49}'),
-- S004: U004 完整漏斗(2026-05-10)
('U004','S004','page_view', CAST('2026-05-10 10:30:00' AS TIMESTAMP),'/home', '{"referrer":"social"}'),
('U004','S004','product_view',CAST('2026-05-10 10:31:30' AS TIMESTAMP),'/product/404', '{"product_id":"404","category":"sports"}'),
('U004','S004','add_to_cart', CAST('2026-05-10 10:33:00' AS TIMESTAMP),'/product/404', '{"product_id":"404","price":159}'),
('U004','S004','checkout', CAST('2026-05-10 10:35:00' AS TIMESTAMP),'/checkout', '{"cart_value":159,"currency":"CNY"}'),
('U004','S004','purchase', CAST('2026-05-10 10:37:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD004","amount":159}'),
-- S005: U005 结账后放弃,05-11 回访才完成购买(2026-05-10)
('U005','S005','page_view', CAST('2026-05-10 10:40:00' AS TIMESTAMP),'/home', '{"referrer":"google"}'),
('U005','S005','product_view',CAST('2026-05-10 10:41:30' AS TIMESTAMP),'/product/505', '{"product_id":"505","category":"beauty"}'),
('U005','S005','add_to_cart', CAST('2026-05-10 10:43:00' AS TIMESTAMP),'/product/505', '{"product_id":"505","price":89}'),
('U005','S005','checkout', CAST('2026-05-10 10:45:00' AS TIMESTAMP),'/checkout', '{"cart_value":89,"currency":"CNY"}'),
-- S006: U006 加购后放弃(2026-05-10)
('U006','S006','page_view', CAST('2026-05-10 11:00:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'),
('U006','S006','product_view',CAST('2026-05-10 11:01:30' AS TIMESTAMP),'/product/606', '{"product_id":"606","category":"electronics"}'),
('U006','S006','add_to_cart', CAST('2026-05-10 11:03:00' AS TIMESTAMP),'/product/606', '{"product_id":"606","price":499}'),
-- S007: U007 加购后放弃(2026-05-10)
('U007','S007','page_view', CAST('2026-05-10 11:10:00' AS TIMESTAMP),'/home', '{"referrer":"social"}'),
('U007','S007','product_view',CAST('2026-05-10 11:11:30' AS TIMESTAMP),'/product/707', '{"product_id":"707","category":"clothing"}'),
('U007','S007','add_to_cart', CAST('2026-05-10 11:13:00' AS TIMESTAMP),'/product/707', '{"product_id":"707","price":129}'),
-- S008: U008 浏览商品后退出(2026-05-10)
('U008','S008','page_view', CAST('2026-05-10 11:20:00' AS TIMESTAMP),'/home', '{"referrer":"google"}'),
('U008','S008','product_view',CAST('2026-05-10 11:21:30' AS TIMESTAMP),'/product/808', '{"product_id":"808","category":"sports"}'),
-- S009: U009 浏览商品后退出,05-11 回访才完成购买(2026-05-10)
('U009','S009','page_view', CAST('2026-05-10 11:30:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'),
('U009','S009','product_view',CAST('2026-05-10 11:31:30' AS TIMESTAMP),'/product/909', '{"product_id":"909","category":"books"}'),
-- S010: U010 浏览商品后退出(2026-05-10)
('U010','S010','page_view', CAST('2026-05-10 11:40:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'),
('U010','S010','product_view',CAST('2026-05-10 11:41:30' AS TIMESTAMP),'/product/101', '{"product_id":"101","category":"electronics"}'),
-- S011: U003 回访完整漏斗(2026-05-11)
('U003','S011','page_view', CAST('2026-05-11 09:00:00' AS TIMESTAMP),'/home', '{"referrer":"push_notification"}'),
('U003','S011','product_view',CAST('2026-05-11 09:01:30' AS TIMESTAMP),'/product/303', '{"product_id":"303","category":"books"}'),
('U003','S011','add_to_cart', CAST('2026-05-11 09:03:00' AS TIMESTAMP),'/product/303', '{"product_id":"303","price":49}'),
('U003','S011','checkout', CAST('2026-05-11 09:05:00' AS TIMESTAMP),'/checkout', '{"cart_value":49,"currency":"CNY"}'),
('U003','S011','purchase', CAST('2026-05-11 09:07:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD005","amount":49}'),
-- S012: U005 回访,完成昨日未购买的订单(2026-05-11)
('U005','S012','page_view', CAST('2026-05-11 09:15:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'),
('U005','S012','product_view',CAST('2026-05-11 09:16:30' AS TIMESTAMP),'/product/505', '{"product_id":"505","category":"beauty"}'),
('U005','S012','add_to_cart', CAST('2026-05-11 09:18:00' AS TIMESTAMP),'/product/505', '{"product_id":"505","price":89}'),
('U005','S012','checkout', CAST('2026-05-11 09:20:00' AS TIMESTAMP),'/checkout', '{"cart_value":89,"currency":"CNY"}'),
('U005','S012','purchase', CAST('2026-05-11 09:22:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD006","amount":89}'),
-- S013: U009 回访,首次完成完整漏斗(2026-05-11)
('U009','S013','page_view', CAST('2026-05-11 09:30:00' AS TIMESTAMP),'/home', '{"referrer":"push_notification"}'),
('U009','S013','product_view',CAST('2026-05-11 09:31:30' AS TIMESTAMP),'/product/909', '{"product_id":"909","category":"books"}'),
('U009','S013','add_to_cart', CAST('2026-05-11 09:33:00' AS TIMESTAMP),'/product/909', '{"product_id":"909","price":79}'),
('U009','S013','checkout', CAST('2026-05-11 09:35:00' AS TIMESTAMP),'/checkout', '{"cart_value":79,"currency":"CNY"}'),
('U009','S013','purchase', CAST('2026-05-11 09:37:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD007","amount":79}'),
-- S014: U010 回访跳出(2026-05-11)
('U010','S014','page_view', CAST('2026-05-11 09:45:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}')
;
写入后验证行数:
SELECT COUNT(*) AS event_count FROM best_practice_product_analytics.doc_events;
INSERT INTO best_practice_product_analytics.doc_ab_assignments VALUES
('U001', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U002', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U003', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U005', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U009', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U004', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U006', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U007', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U008', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U010', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U001', 'exp_homepage_banner', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U002', 'exp_homepage_banner', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U003', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U004', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)),
('U006', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP));
Silver 层 Dynamic Table:会话化重建与路径计算
Silver 层在 Bronze 原始事件基础上做三件事:
LEFT JOIN
doc_users
doc_users
,关联用户注册日期、国家、平台等维度
用
LAG
LAG
/
LEAD
LEAD
窗口函数计算每条事件的上一步和下一步,重建用户行为路径
计算
days_since_signup
days_since_signup
,用于后续新老用户漏斗对比
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.silver_user_sessions
AS
SELECT
e.user_id,
e.session_id,
e.event_name,
e.event_time,
e.page_url,
e.properties,
u.signup_date,
u.country,
u.platform,
LAG(e.event_name) OVER (PARTITION BY e.user_id, e.session_id ORDER BY e.event_time) AS prev_event,
LEAD(e.event_name) OVER (PARTITION BY e.user_id, e.session_id ORDER BY e.event_time) AS next_event,
DATEDIFF(CAST(e.event_time AS DATE), u.signup_date) AS days_since_signup
FROM best_practice_product_analytics.doc_events e
LEFT JOIN best_practice_product_analytics.doc_users u ON e.user_id = u.user_id;
⚠️ 注意:Dynamic Table DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
。刷新调度通过 Studio Task 管理(见下节),这样可以在同一任务上附加监控告警和数据质量规则。
SELECT user_id, session_id, event_name, prev_event, next_event, days_since_signup
FROM best_practice_product_analytics.silver_user_sessions
WHERE user_id = 'U001' AND session_id = 'S001'
ORDER BY event_time;
SELECT
event_name,
COUNT(DISTINCT user_id) AS user_count,
ROUND(COUNT(DISTINCT user_id) * 100.0 /
MAX(COUNT(DISTINCT user_id)) OVER (), 1) AS pct_of_top
FROM best_practice_product_analytics.doc_events
WHERE event_name IN ('page_view','product_view','add_to_cart','checkout','purchase')
GROUP BY event_name
ORDER BY user_count DESC;
WITH funnel AS (
SELECT
user_id,
MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) AS step1,
MAX(CASE WHEN event_name = 'product_view' THEN 1 ELSE 0 END) AS step2,
MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) AS step3,
MAX(CASE WHEN event_name = 'checkout' THEN 1 ELSE 0 END) AS step4,
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS step5
FROM best_practice_product_analytics.doc_events
GROUP BY user_id
)
SELECT
SUM(step1) AS page_view,
SUM(step2) AS product_view,
SUM(step3) AS add_to_cart,
SUM(step4) AS checkout,
SUM(step5) AS purchase,
ROUND(SUM(step5) * 100.0 / NULLIF(SUM(step1), 0), 1) AS overall_cvr_pct
FROM funnel;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.gold_funnel_daily
AS
SELECT
CAST(event_time AS DATE) AS event_date,
COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN user_id END) AS visitors,
COUNT(DISTINCT CASE WHEN event_name = 'product_view' THEN user_id END) AS product_viewers,
COUNT(DISTINCT CASE WHEN event_name = 'add_to_cart' THEN user_id END) AS cart_adders,
COUNT(DISTINCT CASE WHEN event_name = 'checkout' THEN user_id END) AS checkouts,
COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN user_id END) AS purchasers,
ROUND(COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN user_id END) * 100.0
/ NULLIF(COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN user_id END), 0), 1) AS overall_cvr_pct
FROM best_practice_product_analytics.doc_events
GROUP BY CAST(event_time AS DATE);
⚠️ 注意:DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
,刷新由 Studio Task
refresh_gold_funnel_daily
refresh_gold_funnel_daily
管理。
REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_funnel_daily;
SELECT * FROM best_practice_product_analytics.gold_funnel_daily ORDER BY event_date;
SELECT
a.variant,
COUNT(DISTINCT a.user_id) AS total_users,
COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) AS purchasers,
ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) * 100.0
/ COUNT(DISTINCT a.user_id), 1) AS purchase_rate_pct
FROM best_practice_product_analytics.doc_ab_assignments a
LEFT JOIN best_practice_product_analytics.doc_events e
ON a.user_id = e.user_id
WHERE a.experiment_id = 'exp_checkout_v2'
GROUP BY a.variant
ORDER BY a.variant;
结果解读:treatment 组购买率(100%)远高于 control 组(20%),新版结账流程效果显著。实际评估中需结合样本量和统计显著性检验(见 ZettaPark 章节)。
BITMAP 用户集合运算
BITMAP 函数适合快速计算实验组用户数和行为用户数的基数,以及实验组与行为用户的交集:
SELECT
a.variant,
GROUP_BITMAP(CAST(REGEXP_REPLACE(a.user_id, '[0-9]', '') AS BIGINT)) AS ab_users,
GROUP_BITMAP(CAST(REGEXP_REPLACE(e.user_id, '[0-9]', '') AS BIGINT)) AS purchasers
FROM best_practice_product_analytics.doc_ab_assignments a
LEFT JOIN (
SELECT DISTINCT user_id
FROM best_practice_product_analytics.doc_events
WHERE event_name = 'purchase'
) e ON a.user_id = e.user_id
WHERE a.experiment_id = 'exp_checkout_v2'
GROUP BY a.variant;
返回集合基数(INT),内部用 Roaring Bitmap 压缩存储。当用户 ID 为纯数字时可直接 CAST;字母混合 ID 需先做哈希映射。
REGEXP_REPLACE(user_id, '[0-9]', '')
REGEXP_REPLACE(user_id, '[0-9]', '')
从"U001"提取数字部分"1",适用于本例的简单 ID 格式。
Gold 层 Dynamic Table:A/B 实验聚合指标
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.gold_ab_metrics
AS
SELECT
a.experiment_id,
a.variant,
COUNT(DISTINCT a.user_id) AS total_users,
COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) AS purchasers,
ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) * 100.0
/ NULLIF(COUNT(DISTINCT a.user_id), 0), 2) AS purchase_rate,
COUNT(DISTINCT CASE WHEN e.event_name = 'add_to_cart' THEN e.user_id END) AS cart_adders,
ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'add_to_cart' THEN e.user_id END) * 100.0
/ NULLIF(COUNT(DISTINCT a.user_id), 0), 2) AS cart_rate
FROM best_practice_product_analytics.doc_ab_assignments a
LEFT JOIN best_practice_product_analytics.doc_events e ON a.user_id = e.user_id
GROUP BY a.experiment_id, a.variant;
刷新并查询:
REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_ab_metrics;
SELECT * FROM best_practice_product_analytics.gold_ab_metrics ORDER BY experiment_id, variant;
-- 创建 A/B 分析宽表(如未存在)
CREATE TABLE IF NOT EXISTS best_practice_product_analytics.ab_wide_table (
user_id STRING,
experiment_id STRING,
variant STRING,
assigned_at TIMESTAMP
);
-- MERGE INTO 增量更新(每次调度执行一次,消费 Stream 中的新增行)
MERGE INTO best_practice_product_analytics.ab_wide_table t
USING (
SELECT user_id, experiment_id, variant, assigned_at
FROM best_practice_product_analytics.stream_ab_assignments
WHERE __change_type = 'INSERT'
) s ON t.user_id = s.user_id AND t.experiment_id = s.experiment_id
WHEN MATCHED THEN
UPDATE SET variant = s.variant, assigned_at = s.assigned_at
WHEN NOT MATCHED THEN
INSERT (user_id, experiment_id, variant, assigned_at)
VALUES (s.user_id, s.experiment_id, s.variant, s.assigned_at);
⚠️ 注意:Table Stream 消费游标在每次成功的 DML 事务后自动推进。若 MERGE INTO 事务失败,Stream 偏移量不会移动,下次执行会重新处理同一批数据,确保不丢数据。