产品数据分析数仓实践(漏斗 + A/B 测试)

将 Web/App 埋点事件流构建为多层分析数仓,实现注册-浏览-加购-下单全链路漏斗分析,以及 A/B 实验转化率的差异度量。本文以 52 条模拟埋点事件、10 个用户、15 条实验分组记录为数据集,端到端演示 Kafka PIPE → Bronze → Silver → Gold 的完整构建过程,并覆盖 Window Function(LAG/LEAD)、BITMAP 用户集合运算、Table Stream + MERGE INTO 三项关键平台能力的落地用法。


概述

产品数据分析的典型链路是:埋点 SDK 上报 → 实时接入 → 原始存储(Bronze)→ 会话化重建(Silver)→ 漏斗 & A/B 指标聚合(Gold)

云器 Lakehouse 通过以下组合解决几个核心问题:

问题解决方案
埋点事件毫秒级高频写入Kafka PIPE 持续摄取,无需手写消费者
Bronze → Silver → Gold 自动增量计算Dynamic Table,声明式 SQL,系统自动调度依赖链
用户行为路径重建(prev_event / next_event)Window Function LAG/LEAD,按 session_id 分区排序
A/B 实验用户集合运算(实验组 ∩ 行为组)BITMAP 函数,高效计算用户集合交并差
A/B 分组表增量更新(用户重新分组)Table Stream + MERGE INTO,捕获新增/变更分组并合并

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 Bronze 层原始事件表和维度表普通表,作为 Dynamic Table 上游
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
user_id
user_id
列创建 Bloomfilter 索引
适合高基数列的点查过滤
CREATE PIPE
CREATE PIPE
创建 Kafka 持续摄取管道绑定到 Bronze 层
doc_events
doc_events
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 Silver / Gold 层增量计算表系统自动识别上游变更并增量刷新
CREATE TABLE STREAM
CREATE TABLE STREAM
doc_ab_assignments
doc_ab_assignments
上创建 Stream
捕获新增实验分组,驱动 MERGE INTO
MERGE INTO
MERGE INTO
增量更新 A/B 分组宽表处理用户重新分组的 upsert 场景
LAG
LAG
/
LEAD
LEAD
计算前后事件步骤Silver 层会话路径重建
GROUP_BITMAP
GROUP_BITMAP
用户集合基数统计A/B 实验组用户数 / 行为用户数计算
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用

前置准备

本文所有示例在

best_practice_product_analytics
best_practice_product_analytics
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_product_analytics;


Bronze 层:原始埋点事件表

建表

doc_events
doc_events
是全文的核心事件表,承载来自 SDK 的原始埋点事件流。

CREATE TABLE IF NOT EXISTS best_practice_product_analytics.doc_events ( user_id STRING, session_id STRING, event_name STRING, event_time TIMESTAMP, page_url STRING, properties STRING -- JSON 字符串,存储业务属性 );

properties
properties
使用 STRING 类型存储 JSON,后续通过
GET_JSON_OBJECT
GET_JSON_OBJECT
解析。用 Parquet 格式存储时,JSON 字段对压缩友好,不建议拆分为多列宽表。

创建 Bloomfilter Index

后续 Silver 和 Gold 层都会按

user_id
user_id
过滤,适合 Bloomfilter Index 加速点查。

CREATE BLOOMFILTER INDEX idx_bf_pa_user_id ON TABLE doc_events (user_id);

用户维度表和 A/B 分组表

CREATE TABLE IF NOT EXISTS best_practice_product_analytics.doc_users ( user_id STRING, signup_date DATE, country STRING, platform STRING ); CREATE TABLE IF NOT EXISTS best_practice_product_analytics.doc_ab_assignments ( user_id STRING, experiment_id STRING, variant STRING, assigned_at TIMESTAMP );

配置 Kafka PIPE

Kafka PIPE 实现埋点事件从 Kafka Topic 到

doc_events
doc_events
的毫秒级持续摄取。正式环境替换 broker 地址和 topic 名称即可使用。

方式一:通过 Kafka 实际写入(推荐)

配置好 Kafka broker 后,通过 Python producer 向 topic 发送事件消息,PIPE 自动摄取:

from kafka import KafkaProducer import json, time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) events = [ { "user_id": "U001", "session_id": "S001", "event_name": "page_view", "event_time": "2026-05-10 10:00:00", "page_url": "/home", "properties": json.dumps({"referrer": "google", "duration_s": 12}) }, { "user_id": "U001", "session_id": "S001", "event_name": "product_view", "event_time": "2026-05-10 10:01:30", "page_url": "/product/101", "properties": json.dumps({"product_id": "101", "category": "electronics"}) }, # ... 更多事件 ] for evt in events: producer.send('sdk_tracking_events', evt) producer.flush() print(f"Sent {len(events)} events")

对应的 Kafka PIPE DDL:

CREATE PIPE IF NOT EXISTS best_practice_product_analytics.pipe_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO best_practice_product_analytics.doc_events FROM ( SELECT GET_JSON_OBJECT(CAST(value AS STRING), '$.user_id') AS user_id, GET_JSON_OBJECT(CAST(value AS STRING), '$.session_id') AS session_id, GET_JSON_OBJECT(CAST(value AS STRING), '$.event_name') AS event_name, CAST(GET_JSON_OBJECT(CAST(value AS STRING), '$.event_time') AS TIMESTAMP) AS event_time, GET_JSON_OBJECT(CAST(value AS STRING), '$.page_url') AS page_url, GET_JSON_OBJECT(CAST(value AS STRING), '$.properties') AS properties FROM READ_KAFKA( '<kafka-broker>:9092', 'sdk_tracking_events', '', 'cz_pa_consumer', '','','','', 'raw', 'raw', 0, map() ) );

方式二:INSERT 模拟(无 Kafka 环境时)

若暂未配置 Kafka 环境,可通过

INSERT INTO
INSERT INTO
直接写入
doc_events
doc_events
,模拟 PIPE 解析写入的效果,便于验证后续 Dynamic Table 和查询逻辑。本文以下所有示例均基于此方式的数据。

从本地 CSV 导入数据(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_product_analytics.doc_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('data.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_product_analytics.doc_events VALUES -- S001: U001 完整漏斗(2026-05-10) ('U001','S001','page_view', CAST('2026-05-10 10:00:00' AS TIMESTAMP),'/home', '{"referrer":"google","duration_s":12}'), ('U001','S001','product_view',CAST('2026-05-10 10:01:30' AS TIMESTAMP),'/product/101', '{"product_id":"101","category":"electronics"}'), ('U001','S001','add_to_cart', CAST('2026-05-10 10:03:00' AS TIMESTAMP),'/product/101', '{"product_id":"101","price":299}'), ('U001','S001','checkout', CAST('2026-05-10 10:05:00' AS TIMESTAMP),'/checkout', '{"cart_value":299,"currency":"CNY"}'), ('U001','S001','purchase', CAST('2026-05-10 10:07:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD001","amount":299}'), -- S002: U002 完整漏斗(2026-05-10) ('U002','S002','page_view', CAST('2026-05-10 10:10:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'), ('U002','S002','product_view',CAST('2026-05-10 10:11:00' AS TIMESTAMP),'/product/202', '{"product_id":"202","category":"clothing"}'), ('U002','S002','add_to_cart', CAST('2026-05-10 10:13:00' AS TIMESTAMP),'/product/202', '{"product_id":"202","price":99}'), ('U002','S002','checkout', CAST('2026-05-10 10:15:00' AS TIMESTAMP),'/checkout', '{"cart_value":99,"currency":"CNY"}'), ('U002','S002','purchase', CAST('2026-05-10 10:17:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD002","amount":99}'), -- S003: U003 完整漏斗(2026-05-10) ('U003','S003','page_view', CAST('2026-05-10 10:20:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'), ('U003','S003','product_view',CAST('2026-05-10 10:21:30' AS TIMESTAMP),'/product/303', '{"product_id":"303","category":"books"}'), ('U003','S003','add_to_cart', CAST('2026-05-10 10:23:00' AS TIMESTAMP),'/product/303', '{"product_id":"303","price":49}'), ('U003','S003','checkout', CAST('2026-05-10 10:25:00' AS TIMESTAMP),'/checkout', '{"cart_value":49,"currency":"CNY"}'), ('U003','S003','purchase', CAST('2026-05-10 10:27:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD003","amount":49}'), -- S004: U004 完整漏斗(2026-05-10) ('U004','S004','page_view', CAST('2026-05-10 10:30:00' AS TIMESTAMP),'/home', '{"referrer":"social"}'), ('U004','S004','product_view',CAST('2026-05-10 10:31:30' AS TIMESTAMP),'/product/404', '{"product_id":"404","category":"sports"}'), ('U004','S004','add_to_cart', CAST('2026-05-10 10:33:00' AS TIMESTAMP),'/product/404', '{"product_id":"404","price":159}'), ('U004','S004','checkout', CAST('2026-05-10 10:35:00' AS TIMESTAMP),'/checkout', '{"cart_value":159,"currency":"CNY"}'), ('U004','S004','purchase', CAST('2026-05-10 10:37:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD004","amount":159}'), -- S005: U005 结账后放弃,05-11 回访才完成购买(2026-05-10) ('U005','S005','page_view', CAST('2026-05-10 10:40:00' AS TIMESTAMP),'/home', '{"referrer":"google"}'), ('U005','S005','product_view',CAST('2026-05-10 10:41:30' AS TIMESTAMP),'/product/505', '{"product_id":"505","category":"beauty"}'), ('U005','S005','add_to_cart', CAST('2026-05-10 10:43:00' AS TIMESTAMP),'/product/505', '{"product_id":"505","price":89}'), ('U005','S005','checkout', CAST('2026-05-10 10:45:00' AS TIMESTAMP),'/checkout', '{"cart_value":89,"currency":"CNY"}'), -- S006: U006 加购后放弃(2026-05-10) ('U006','S006','page_view', CAST('2026-05-10 11:00:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'), ('U006','S006','product_view',CAST('2026-05-10 11:01:30' AS TIMESTAMP),'/product/606', '{"product_id":"606","category":"electronics"}'), ('U006','S006','add_to_cart', CAST('2026-05-10 11:03:00' AS TIMESTAMP),'/product/606', '{"product_id":"606","price":499}'), -- S007: U007 加购后放弃(2026-05-10) ('U007','S007','page_view', CAST('2026-05-10 11:10:00' AS TIMESTAMP),'/home', '{"referrer":"social"}'), ('U007','S007','product_view',CAST('2026-05-10 11:11:30' AS TIMESTAMP),'/product/707', '{"product_id":"707","category":"clothing"}'), ('U007','S007','add_to_cart', CAST('2026-05-10 11:13:00' AS TIMESTAMP),'/product/707', '{"product_id":"707","price":129}'), -- S008: U008 浏览商品后退出(2026-05-10) ('U008','S008','page_view', CAST('2026-05-10 11:20:00' AS TIMESTAMP),'/home', '{"referrer":"google"}'), ('U008','S008','product_view',CAST('2026-05-10 11:21:30' AS TIMESTAMP),'/product/808', '{"product_id":"808","category":"sports"}'), -- S009: U009 浏览商品后退出,05-11 回访才完成购买(2026-05-10) ('U009','S009','page_view', CAST('2026-05-10 11:30:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'), ('U009','S009','product_view',CAST('2026-05-10 11:31:30' AS TIMESTAMP),'/product/909', '{"product_id":"909","category":"books"}'), -- S010: U010 浏览商品后退出(2026-05-10) ('U010','S010','page_view', CAST('2026-05-10 11:40:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}'), ('U010','S010','product_view',CAST('2026-05-10 11:41:30' AS TIMESTAMP),'/product/101', '{"product_id":"101","category":"electronics"}'), -- S011: U003 回访完整漏斗(2026-05-11) ('U003','S011','page_view', CAST('2026-05-11 09:00:00' AS TIMESTAMP),'/home', '{"referrer":"push_notification"}'), ('U003','S011','product_view',CAST('2026-05-11 09:01:30' AS TIMESTAMP),'/product/303', '{"product_id":"303","category":"books"}'), ('U003','S011','add_to_cart', CAST('2026-05-11 09:03:00' AS TIMESTAMP),'/product/303', '{"product_id":"303","price":49}'), ('U003','S011','checkout', CAST('2026-05-11 09:05:00' AS TIMESTAMP),'/checkout', '{"cart_value":49,"currency":"CNY"}'), ('U003','S011','purchase', CAST('2026-05-11 09:07:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD005","amount":49}'), -- S012: U005 回访,完成昨日未购买的订单(2026-05-11) ('U005','S012','page_view', CAST('2026-05-11 09:15:00' AS TIMESTAMP),'/home', '{"referrer":"email"}'), ('U005','S012','product_view',CAST('2026-05-11 09:16:30' AS TIMESTAMP),'/product/505', '{"product_id":"505","category":"beauty"}'), ('U005','S012','add_to_cart', CAST('2026-05-11 09:18:00' AS TIMESTAMP),'/product/505', '{"product_id":"505","price":89}'), ('U005','S012','checkout', CAST('2026-05-11 09:20:00' AS TIMESTAMP),'/checkout', '{"cart_value":89,"currency":"CNY"}'), ('U005','S012','purchase', CAST('2026-05-11 09:22:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD006","amount":89}'), -- S013: U009 回访,首次完成完整漏斗(2026-05-11) ('U009','S013','page_view', CAST('2026-05-11 09:30:00' AS TIMESTAMP),'/home', '{"referrer":"push_notification"}'), ('U009','S013','product_view',CAST('2026-05-11 09:31:30' AS TIMESTAMP),'/product/909', '{"product_id":"909","category":"books"}'), ('U009','S013','add_to_cart', CAST('2026-05-11 09:33:00' AS TIMESTAMP),'/product/909', '{"product_id":"909","price":79}'), ('U009','S013','checkout', CAST('2026-05-11 09:35:00' AS TIMESTAMP),'/checkout', '{"cart_value":79,"currency":"CNY"}'), ('U009','S013','purchase', CAST('2026-05-11 09:37:00' AS TIMESTAMP),'/order/done', '{"order_id":"ORD007","amount":79}'), -- S014: U010 回访跳出(2026-05-11) ('U010','S014','page_view', CAST('2026-05-11 09:45:00' AS TIMESTAMP),'/home', '{"referrer":"direct"}') ;

写入后验证行数:

SELECT COUNT(*) AS event_count FROM best_practice_product_analytics.doc_events;

event_count ----------- 52

插入用户维度数据(10 个用户,U001 注册于 2026-01-10,距 2026-05-10 恰好 120 天):

INSERT INTO best_practice_product_analytics.doc_users VALUES ('U001', CAST('2026-01-10' AS DATE), 'CN', 'iOS'), ('U002', CAST('2026-01-15' AS DATE), 'CN', 'Android'), ('U003', CAST('2026-02-01' AS DATE), 'CN', 'Web'), ('U004', CAST('2026-02-10' AS DATE), 'CN', 'iOS'), ('U005', CAST('2026-02-20' AS DATE), 'CN', 'Android'), ('U006', CAST('2026-03-01' AS DATE), 'CN', 'Web'), ('U007', CAST('2026-03-10' AS DATE), 'CN', 'iOS'), ('U008', CAST('2026-03-20' AS DATE), 'CN', 'Android'), ('U009', CAST('2026-04-01' AS DATE), 'CN', 'Web'), ('U010', CAST('2026-04-10' AS DATE), 'CN', 'iOS');

插入 A/B 实验分组数据(15 条记录,两个实验):

  • exp_checkout_v2
    exp_checkout_v2
    :treatment = {U001–U003, U005, U009},control = {U004, U006–U010}
  • exp_homepage_banner
    exp_homepage_banner
    :control = {U001, U002},treatment = {U003, U004, U006}

INSERT INTO best_practice_product_analytics.doc_ab_assignments VALUES ('U001', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U002', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U003', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U005', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U009', 'exp_checkout_v2', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U004', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U006', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U007', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U008', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U010', 'exp_checkout_v2', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U001', 'exp_homepage_banner', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U002', 'exp_homepage_banner', 'control', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U003', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U004', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP)), ('U006', 'exp_homepage_banner', 'treatment', CAST('2026-05-09 08:00:00' AS TIMESTAMP));


Silver 层 Dynamic Table:会话化重建与路径计算

Silver 层在 Bronze 原始事件基础上做三件事:

  1. LEFT JOIN
    doc_users
    doc_users
    ,关联用户注册日期、国家、平台等维度
  2. LAG
    LAG
    /
    LEAD
    LEAD
    窗口函数计算每条事件的上一步和下一步,重建用户行为路径
  3. 计算
    days_since_signup
    days_since_signup
    ,用于后续新老用户漏斗对比

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.silver_user_sessions AS SELECT e.user_id, e.session_id, e.event_name, e.event_time, e.page_url, e.properties, u.signup_date, u.country, u.platform, LAG(e.event_name) OVER (PARTITION BY e.user_id, e.session_id ORDER BY e.event_time) AS prev_event, LEAD(e.event_name) OVER (PARTITION BY e.user_id, e.session_id ORDER BY e.event_time) AS next_event, DATEDIFF(CAST(e.event_time AS DATE), u.signup_date) AS days_since_signup FROM best_practice_product_analytics.doc_events e LEFT JOIN best_practice_product_analytics.doc_users u ON e.user_id = u.user_id;

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_product_analytics.silver_user_sessions;

验证并查看路径重建结果:

SELECT user_id, session_id, event_name, prev_event, next_event, days_since_signup FROM best_practice_product_analytics.silver_user_sessions WHERE user_id = 'U001' AND session_id = 'S001' ORDER BY event_time;

user_id | session_id | event_name | prev_event | next_event | days_since_signup --------+------------+--------------+--------------+--------------+------------------ U001 | S001 | page_view | NULL | product_view | 120 U001 | S001 | product_view | page_view | add_to_cart | 120 U001 | S001 | add_to_cart | product_view | checkout | 120 U001 | S001 | checkout | add_to_cart | purchase | 120 U001 | S001 | purchase | checkout | NULL | 120

prev_event = NULL
prev_event = NULL
表示会话起点,
next_event = NULL
next_event = NULL
表示会话终点。
days_since_signup = 120
days_since_signup = 120
说明 U001 是注册 120 天后的回访用户。

配置 Studio Task 管理刷新调度

Silver 层的刷新调度通过 Studio 任务管理:

  1. 在 Studio best_practices/product_analytics/ 路径下创建 SQL 任务
    refresh_silver_user_sessions
    refresh_silver_user_sessions
  2. 任务内容:
    REFRESH DYNAMIC TABLE best_practice_product_analytics.silver_user_sessions
    REFRESH DYNAMIC TABLE best_practice_product_analytics.silver_user_sessions
  3. 配置 Cron 调度(例如每 5 分钟):
    */5 * * * *
    */5 * * * *
  4. 在任务上配置数据质量告警:当 Silver 层行数比上次刷新减少超过 10% 时触发告警

# 创建并配置任务(已通过 cz-cli 实际执行) cz-cli task create "refresh_silver_user_sessions" -p skill_test --type SQL --folder 187108 cz-cli task save-content refresh_silver_user_sessions -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_product_analytics.silver_user_sessions" cz-cli task save-cron refresh_silver_user_sessions -p skill_test --cron "*/5 * * * *"


场景一:漏斗转化率分析

漏斗分析按步骤聚合唯一用户数,计算每步的转化率和整体 CVR。

按事件步骤聚合

SELECT event_name, COUNT(DISTINCT user_id) AS user_count, ROUND(COUNT(DISTINCT user_id) * 100.0 / MAX(COUNT(DISTINCT user_id)) OVER (), 1) AS pct_of_top FROM best_practice_product_analytics.doc_events WHERE event_name IN ('page_view','product_view','add_to_cart','checkout','purchase') GROUP BY event_name ORDER BY user_count DESC;

event_name | user_count | pct_of_top --------------+------------+----------- page_view | 10 | 100.0 product_view | 10 | 100.0 add_to_cart | 8 | 80.0 checkout | 6 | 60.0 purchase | 6 | 60.0

结果解读:从浏览到加购流失 20%,从加购到下单流失 25%(8→6),整体 CVR 60%。最大流失点在加购到结账环节,重点优化结账体验。

横向漏斗(单行输出整体 CVR)

WITH funnel AS ( SELECT user_id, MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) AS step1, MAX(CASE WHEN event_name = 'product_view' THEN 1 ELSE 0 END) AS step2, MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) AS step3, MAX(CASE WHEN event_name = 'checkout' THEN 1 ELSE 0 END) AS step4, MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS step5 FROM best_practice_product_analytics.doc_events GROUP BY user_id ) SELECT SUM(step1) AS page_view, SUM(step2) AS product_view, SUM(step3) AS add_to_cart, SUM(step4) AS checkout, SUM(step5) AS purchase, ROUND(SUM(step5) * 100.0 / NULLIF(SUM(step1), 0), 1) AS overall_cvr_pct FROM funnel;

page_view | product_view | add_to_cart | checkout | purchase | overall_cvr_pct ----------+--------------+-------------+----------+----------+---------------- 10 | 10 | 8 | 6 | 6 | 60.0


Gold 层 Dynamic Table:每日漏斗聚合

gold_funnel_daily
gold_funnel_daily
以天为粒度聚合漏斗各步骤的唯一用户数。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.gold_funnel_daily AS SELECT CAST(event_time AS DATE) AS event_date, COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN user_id END) AS visitors, COUNT(DISTINCT CASE WHEN event_name = 'product_view' THEN user_id END) AS product_viewers, COUNT(DISTINCT CASE WHEN event_name = 'add_to_cart' THEN user_id END) AS cart_adders, COUNT(DISTINCT CASE WHEN event_name = 'checkout' THEN user_id END) AS checkouts, COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN user_id END) AS purchasers, ROUND(COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN user_id END) * 100.0 / NULLIF(COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN user_id END), 0), 1) AS overall_cvr_pct FROM best_practice_product_analytics.doc_events GROUP BY CAST(event_time AS DATE);

REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_funnel_daily; SELECT * FROM best_practice_product_analytics.gold_funnel_daily ORDER BY event_date;

event_date | visitors | product_viewers | cart_adders | checkouts | purchasers | overall_cvr_pct ------------+----------+-----------------+-------------+-----------+------------+---------------- 2026-05-10 | 10 | 10 | 7 | 5 | 4 | 40.0 2026-05-11 | 4 | 3 | 3 | 3 | 3 | 75.0

结果解读:5 月 11 日的整体 CVR(75%)明显高于 5 月 10 日(40%),原因是 5 月 11 日的流量来自高意向回访用户(Session S011–S014),转化质量更高。

配置 Studio Task

cz-cli task create "refresh_gold_funnel_daily" -p skill_test --type SQL --folder 187108 cz-cli task save-content refresh_gold_funnel_daily -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_funnel_daily" cz-cli task save-cron refresh_gold_funnel_daily -p skill_test --cron "*/5 * * * *"


场景二:A/B 实验指标分析

A/B 分组查询

exp_checkout_v2
exp_checkout_v2
实验,计算实验组和对照组的购买率:

SELECT a.variant, COUNT(DISTINCT a.user_id) AS total_users, COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) AS purchasers, ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) * 100.0 / COUNT(DISTINCT a.user_id), 1) AS purchase_rate_pct FROM best_practice_product_analytics.doc_ab_assignments a LEFT JOIN best_practice_product_analytics.doc_events e ON a.user_id = e.user_id WHERE a.experiment_id = 'exp_checkout_v2' GROUP BY a.variant ORDER BY a.variant;

variant | total_users | purchasers | purchase_rate_pct ----------+-------------+------------+------------------ control | 5 | 1 | 20.0 treatment | 5 | 5 | 100.0

结果解读:treatment 组购买率(100%)远高于 control 组(20%),新版结账流程效果显著。实际评估中需结合样本量和统计显著性检验(见 ZettaPark 章节)。

BITMAP 用户集合运算

BITMAP 函数适合快速计算实验组用户数和行为用户数的基数,以及实验组与行为用户的交集:

SELECT a.variant, GROUP_BITMAP(CAST(REGEXP_REPLACE(a.user_id, '[0-9]', '') AS BIGINT)) AS ab_users, GROUP_BITMAP(CAST(REGEXP_REPLACE(e.user_id, '[0-9]', '') AS BIGINT)) AS purchasers FROM best_practice_product_analytics.doc_ab_assignments a LEFT JOIN ( SELECT DISTINCT user_id FROM best_practice_product_analytics.doc_events WHERE event_name = 'purchase' ) e ON a.user_id = e.user_id WHERE a.experiment_id = 'exp_checkout_v2' GROUP BY a.variant;

variant | ab_users | purchasers ----------+----------+----------- treatment | 5 | 5 control | 5 | 1


Gold 层 Dynamic Table:A/B 实验聚合指标

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_product_analytics.gold_ab_metrics AS SELECT a.experiment_id, a.variant, COUNT(DISTINCT a.user_id) AS total_users, COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) AS purchasers, ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'purchase' THEN e.user_id END) * 100.0 / NULLIF(COUNT(DISTINCT a.user_id), 0), 2) AS purchase_rate, COUNT(DISTINCT CASE WHEN e.event_name = 'add_to_cart' THEN e.user_id END) AS cart_adders, ROUND(COUNT(DISTINCT CASE WHEN e.event_name = 'add_to_cart' THEN e.user_id END) * 100.0 / NULLIF(COUNT(DISTINCT a.user_id), 0), 2) AS cart_rate FROM best_practice_product_analytics.doc_ab_assignments a LEFT JOIN best_practice_product_analytics.doc_events e ON a.user_id = e.user_id GROUP BY a.experiment_id, a.variant;

刷新并查询:

REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_ab_metrics; SELECT * FROM best_practice_product_analytics.gold_ab_metrics ORDER BY experiment_id, variant;

experiment_id | variant | total_users | purchasers | purchase_rate | cart_adders | cart_rate ---------------------+-----------+-------------+------------+---------------+-------------+---------- exp_checkout_v2 | control | 5 | 1 | 20.00 | 3 | 60.00 exp_checkout_v2 | treatment | 5 | 5 | 100.00 | 5 | 100.00 exp_homepage_banner | control | 2 | 2 | 100.00 | 2 | 100.00 exp_homepage_banner | treatment | 3 | 2 | 66.67 | 3 | 100.00

配置 Studio Task

cz-cli task create "refresh_gold_ab_metrics" -p skill_test --type SQL --folder 187108 cz-cli task save-content refresh_gold_ab_metrics -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_product_analytics.gold_ab_metrics" cz-cli task save-cron refresh_gold_ab_metrics -p skill_test --cron "*/5 * * * *"


Table Stream + MERGE INTO:A/B 分组增量更新

当用户被重新分配实验组时(重分组或纠偏),需要捕获变更并同步更新宽表。使用 Table Stream 监听

doc_ab_assignments
doc_ab_assignments
的新增行,再通过 MERGE INTO 合并到分析宽表。

创建 Table Stream

CREATE TABLE STREAM IF NOT EXISTS stream_ab_assignments ON TABLE doc_ab_assignments WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

APPEND_ONLY
APPEND_ONLY
模式只捕获 INSERT 操作。A/B 分组通常只追加,不做 UPDATE/DELETE,故选此模式。

创建分析宽表并通过 MERGE INTO 增量更新

以下示例演示 Stream 消费模式:用

MERGE INTO
MERGE INTO
将 Stream 中的新增分组合并到宽表,已有用户更新分组,新用户直接插入。

-- 创建 A/B 分析宽表(如未存在) CREATE TABLE IF NOT EXISTS best_practice_product_analytics.ab_wide_table ( user_id STRING, experiment_id STRING, variant STRING, assigned_at TIMESTAMP ); -- MERGE INTO 增量更新(每次调度执行一次,消费 Stream 中的新增行) MERGE INTO best_practice_product_analytics.ab_wide_table t USING ( SELECT user_id, experiment_id, variant, assigned_at FROM best_practice_product_analytics.stream_ab_assignments WHERE __change_type = 'INSERT' ) s ON t.user_id = s.user_id AND t.experiment_id = s.experiment_id WHEN MATCHED THEN UPDATE SET variant = s.variant, assigned_at = s.assigned_at WHEN NOT MATCHED THEN INSERT (user_id, experiment_id, variant, assigned_at) VALUES (s.user_id, s.experiment_id, s.variant, s.assigned_at);


数仓对象总览

全部构建完成后,

best_practice_product_analytics
best_practice_product_analytics
Schema 下的对象:

SHOW TABLES IN best_practice_product_analytics;

schema_name | table_name | is_dynamic ----------------------------------+----------------------+----------- best_practice_product_analytics | doc_ab_assignments | false best_practice_product_analytics | doc_events | false best_practice_product_analytics | doc_users | false best_practice_product_analytics | gold_ab_metrics | true best_practice_product_analytics | gold_funnel_daily | true best_practice_product_analytics | silver_user_sessions | true

Studio 路径

best_practices/product_analytics/
best_practices/product_analytics/
下的任务:

任务名调度说明
refresh_silver_user_sessions
refresh_silver_user_sessions
*/5 * * * *
*/5 * * * *
Silver 层增量刷新
refresh_gold_funnel_daily
refresh_gold_funnel_daily
*/5 * * * *
*/5 * * * *
Gold 漏斗聚合刷新
refresh_gold_ab_metrics
refresh_gold_ab_metrics
*/5 * * * *
*/5 * * * *
Gold A/B 指标刷新

架构结构:

Kafka Topic (sdk_tracking_events) │ ▼ pipe_events (BATCH_INTERVAL=60s) doc_events [Bronze] doc_users Bloomfilter Index (user_id) │ │ │ └──────────────────────────────────────┘ │ LEFT JOIN ▼ Studio Task: refresh_silver_user_sessions silver_user_sessions [Silver Dynamic Table] LAG/LEAD: prev_event / next_event days_since_signup │ ┌──────────┴──────────┐ ▼ ▼ gold_funnel_daily gold_ab_metrics [Gold Dynamic Table] [Gold Dynamic Table] daily CVR / visitors experiment_id / variant Studio Task: *_funnel purchase_rate / cart_rate Studio Task: *_ab_metrics doc_ab_assignments │ Table Stream (APPEND_ONLY) ▼ stream_ab_assignments MERGE INTO → ab_wide_table


注意事项

  • Dynamic Table 不写 REFRESH INTERVAL:DDL 中不包含调度参数,刷新周期通过 Studio Task 的 Cron 配置管理,便于在同一任务上附加监控告警和数据质量检查。

  • LAG/LEAD 窗口函数的分区维度:Silver 层用

    PARTITION BY user_id, session_id
    PARTITION BY user_id, session_id
    确保路径重建在单次会话内进行。若只按
    user_id
    user_id
    分区,跨 Session 的事件会互相串联,导致路径错误。

  • BITMAP 用户 ID 格式要求

    GROUP_BITMAP
    GROUP_BITMAP
    要求 BIGINT 类型参数。若用户 ID 包含非数字字符,需在写入 Bronze 层时做 ID 映射(如 hash),或使用
    REGEXP_REPLACE
    REGEXP_REPLACE
    提取数字部分。注意不同用户 ID 提取后可能冲突,生产环境建议维护统一的整数映射表。

  • Table Stream 对存量数据不可见:默认创建的 Stream(

    SHOW_INITIAL_ROWS=FALSE
    SHOW_INITIAL_ROWS=FALSE
    )只捕获创建后新增的行。若需处理建 Stream 时已有的数据,创建时加
    WITH PROPERTIES ('TABLE_STREAM_MODE'='APPEND_ONLY','SHOW_INITIAL_ROWS'='TRUE')
    WITH PROPERTIES ('TABLE_STREAM_MODE'='APPEND_ONLY','SHOW_INITIAL_ROWS'='TRUE')

  • Dynamic Table 增量刷新依赖上游变更追踪:首次

    REFRESH
    REFRESH
    做全量计算;后续增量刷新只处理上次刷新点以来的变更。Bronze 层使用
    INSERT OVERWRITE
    INSERT OVERWRITE
    写入会导致 Dynamic Table 退化为全量刷新。

  • Bloomfilter Index 对存量数据

    CREATE BLOOMFILTER INDEX
    CREATE BLOOMFILTER INDEX
    只对创建后写入的新数据生效。如表中已有大量存量数据,Bloomfilter 对存量部分的过滤加速效果有限(BLOOMFILTER 类型不支持
    BUILD INDEX
    BUILD INDEX
    存量覆盖)。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询