构建数字营销 CDP 全域用户 ID 打通数仓

将 CRM、小程序、APP、线下零售等多渠道用户数据整合为统一的 OneID 体系,并在此基础上计算 RFM 标签、构建人群包,最终输出精准营销投放所需的受众。本文以 Online Retail II(Kaggle)零售交易数据集为基础,端到端演示 MySQL CDC → Kafka 实时接入 → ID Mapping → Dynamic Table RFM → BITMAP 人群圈选 的完整链路,并涵盖 MERGE INTO 增量更新、External Function 外部 ID 图谱调用等关键平台能力。


概述

多渠道 CDP 建设的核心挑战是:同一个自然人在不同渠道留下了不同的 ID(手机号、微信 union_id、CRM 会员号、设备 ID),在打通之前无法准确计算用户生命周期价值和跨渠道归因。

问题云器解决方案
CRM 会员变更实时同步到数仓MySQL CDC PIPE,捕获 binlog 写入 ODS
小程序 / APP 实时行为事件接入Studio Kafka 实时同步任务(单表摄取)
多源 ID 增量合并,同一 OneID 不重复插入MERGE INTO,ON 匹配时 UPDATE,未匹配时 INSERT
OneID 匹配调用外部 ID 图谱服务External Function,封装 HTTP 调用,在 SQL 中直接使用
RFM 标签和用户分层自动刷新Dynamic Table,系统自动增量计算,Studio Task 调度
人群包圈选(交集 / 差集 / 并集)BITMAP 函数族,毫秒级完成亿级 ID 集合运算

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
创建 ODS / DWD / ADS 静态表作为 Dynamic Table 的上游或最终输出
MERGE INTO
MERGE INTO
ID Mapping 表增量更新新 ID 插入,已存在的更新
last_seen
last_seen
confidence
confidence
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 DWS RFM 和分层表自动增量计算,不写 REFRESH INTERVAL
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发首次全量刷新建表后首次执行;常规调度通过 Studio Task
GROUP_BITMAP_STATE
GROUP_BITMAP_STATE
将整数 user_id 聚合为 BITMAP 对象构建每个分群的 bitmap;返回 bitmap 类型
GROUP_BITMAP_AND
GROUP_BITMAP_AND
对多行 BITMAP 求交集,返回基数一次扫描计算多个分群的 AND 结果
GROUP_BITMAP_OR
GROUP_BITMAP_OR
对多行 BITMAP 求并集,返回基数计算多个分群的去重总人数
BITMAP_AND
BITMAP_AND
两个 BITMAP 对象求交集返回 BITMAP 对象,可继续运算或转数组
BITMAP_OR
BITMAP_OR
两个 BITMAP 对象求并集返回 BITMAP 对象
BITMAP_ANDNOT
BITMAP_ANDNOT
差集运算(A 中存在但 B 中不存在)返回 BITMAP 对象,用于排除特定分群
BITMAP_COUNT
BITMAP_COUNT
统计 BITMAP 中的 ID 数量从 bitmap 对象中读取基数
BITMAP_TO_ARRAY
BITMAP_TO_ARRAY
将 BITMAP 对象展开为整数数组配合
EXPLODE
EXPLODE
导出人群 ID 列表
CREATE FUNCTION
CREATE FUNCTION
创建 SQL UDF(ID 规范化示例)封装 ID 转换逻辑;生产中替换为 External Function

前置准备

CREATE SCHEMA IF NOT EXISTS best_practice_marketing_cdp;


ODS 层:多渠道原始数据接入

CRM 会员表

CRM 系统部署在 MySQL,会员信息通过 MySQL CDC 实时同步变更到 ODS 层。

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ods_crm_members ( member_id STRING, mobile_hash STRING, -- 手机号 SHA256,不存储明文 email_hash STRING, -- 邮箱 SHA256 real_name STRING, gender STRING, birthday DATE, register_date DATE, channel STRING, -- 注册渠道:offline / online / miniapp level STRING, -- 会员等级:bronze / silver / gold total_points INT, updated_at TIMESTAMP );

MySQL CDC 配置方式

在 Studio 中创建"MySQL 实时同步任务",路径:Studio → 数据集成 → 实时任务 → 新建任务。 配置参数如下:

参数
数据源MySQL(配置 host、port、database、username、password)
同步模式CDC(binlog 实时捕获)
源表
crm.members
crm.members
目标 Schema
best_practice_marketing_cdp
best_practice_marketing_cdp
目标表
ods_crm_members
ods_crm_members
写入策略UPSERT(主键
member_id
member_id

Studio 任务路径:

best_practices/marketing_cdp/
best_practices/marketing_cdp/

写入模拟数据(无 MySQL 环境时直接 INSERT)。

从本地 CSV 导入(推荐)

将 CRM 会员数据保存为 CSV 文件后,通过 User Volume 批量导入:

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/crm_members.csv' TO USER VOLUME FILE 'crm_members.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_marketing_cdp.ods_crm_members FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('crm_members.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_marketing_cdp.ods_crm_members VALUES ('MBR001','hash_mobile_001','hash_email_001','Alice Wang', 'F',CAST('1990-05-12' AS DATE),CAST('2020-01-15' AS DATE),'offline','gold', 1200,CAST('2024-11-01 10:00:00' AS TIMESTAMP)), ('MBR002','hash_mobile_002','hash_email_002','Bob Chen', 'M',CAST('1985-08-23' AS DATE),CAST('2019-06-20' AS DATE),'online', 'silver',800, CAST('2024-10-15 14:30:00' AS TIMESTAMP)), ('MBR003','hash_mobile_003','hash_email_003','Carol Liu', 'F',CAST('1995-03-07' AS DATE),CAST('2021-09-10' AS DATE),'miniapp','bronze',350, CAST('2024-11-10 09:15:00' AS TIMESTAMP)), ('MBR004','hash_mobile_004','hash_email_004','David Zhang','M',CAST('1988-12-01' AS DATE),CAST('2018-03-05' AS DATE),'offline','gold', 2500,CAST('2024-09-20 16:45:00' AS TIMESTAMP)), ('MBR005','hash_mobile_005','hash_email_005','Eve Li', 'F',CAST('1992-07-19' AS DATE),CAST('2022-11-08' AS DATE),'online', 'silver',620, CAST('2024-11-05 11:00:00' AS TIMESTAMP));

小程序 / APP 行为事件表

小程序和 APP 的用户行为事件通过 Kafka 实时接入,每条消息对应一次用户操作(页面浏览、加购、支付等)。

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ods_app_events ( event_id STRING, device_id STRING, union_id STRING, -- 微信 union_id,跨小程序打通 open_id STRING, -- 微信 open_id,单应用内唯一 event_type STRING, -- page_view / add_cart / purchase page_name STRING, item_id STRING, item_price DOUBLE, channel STRING, -- miniapp / app platform STRING, -- wechat / ios / android event_time TIMESTAMP, session_id STRING, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

方式一:通过 Kafka 实际写入(推荐)

在 Studio 中创建"Kafka 单表实时同步任务",任务路径:

best_practices/marketing_cdp/
best_practices/marketing_cdp/
,配置如下:

参数
Kafka Broker
<broker>:9092
<broker>:9092
Topic
miniapp_app_events
miniapp_app_events
Consumer Group
cz_cdp_consumer
cz_cdp_consumer
目标表
best_practice_marketing_cdp.ods_app_events
best_practice_marketing_cdp.ods_app_events
消息格式JSON

Python 生产者示例:

from kafka import KafkaProducer import json, time, uuid producer = KafkaProducer( bootstrap_servers=['<broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) event = { "event_id": str(uuid.uuid4()), "device_id": "DEV001", "union_id": "union_001", "open_id": "open_001", "event_type": "purchase", "page_name": "checkout", "item_id": "SKU001", "item_price": 89.9, "channel": "miniapp", "platform": "wechat", "event_time": "2024-11-01 08:15:00", "session_id": "sess_001" } producer.send('miniapp_app_events', event) producer.flush()

方式二:INSERT 模拟(无 Kafka 环境时)

若暂未配置 Kafka,通过

INSERT INTO
INSERT INTO
直接写入目标表,模拟消息已解析写入的效果,便于验证后续 Dynamic Table 和 BITMAP 逻辑:

INSERT INTO best_practice_marketing_cdp.ods_app_events (event_id, device_id, union_id, open_id, event_type, page_name, item_id, item_price, channel, platform, event_time, session_id) VALUES ('EVT001','DEV001','union_001','open_001','purchase','checkout','SKU001',89.9, 'miniapp','wechat', CAST('2024-11-01 08:15:00' AS TIMESTAMP),'sess_001'), ('EVT002','DEV002','union_002','open_002','purchase','checkout','SKU002',199.0,'app', 'ios', CAST('2024-11-01 09:20:00' AS TIMESTAMP),'sess_002'), ('EVT003','DEV003','union_003','open_003','add_cart', 'product','SKU003',350.0,'miniapp','wechat', CAST('2024-11-02 10:05:00' AS TIMESTAMP),'sess_003'), ('EVT004','DEV004','union_004','open_004','purchase','checkout','SKU004',599.0,'app', 'android',CAST('2024-11-02 11:30:00' AS TIMESTAMP),'sess_004'), ('EVT005','DEV005','union_005','open_005','purchase','checkout','SKU005',129.0,'miniapp','wechat', CAST('2024-11-03 14:00:00' AS TIMESTAMP),'sess_005'), ('EVT006','DEV001','union_001','open_001','purchase','checkout','SKU006',75.0, 'miniapp','wechat', CAST('2024-11-05 16:30:00' AS TIMESTAMP),'sess_006'), ('EVT007','DEV006','union_006','open_006','purchase','checkout','SKU007',420.0,'app', 'ios', CAST('2024-11-10 09:00:00' AS TIMESTAMP),'sess_007'), ('EVT008','DEV008','union_008','open_008','purchase','checkout','SKU008',259.0,'miniapp','wechat', CAST('2024-11-12 10:20:00' AS TIMESTAMP),'sess_008'), ('EVT009','DEV009','union_009','open_009','purchase','checkout','SKU009',88.0, 'app', 'ios', CAST('2024-11-13 13:45:00' AS TIMESTAMP),'sess_009'), ('EVT010','DEV010','union_010','open_010','purchase','checkout','SKU010',315.0,'miniapp','wechat', CAST('2024-11-14 11:00:00' AS TIMESTAMP),'sess_010');

线上零售交易表(Kaggle Online Retail II)

使用 Online Retail II UCI 数据集(Kaggle)作为线上零售渠道的 ODS 原始交易数据,模拟从第三方电商平台接入的历史订单。

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ods_retail_transactions ( invoice STRING, stock_code STRING, description STRING, quantity INT, invoice_date TIMESTAMP, price DOUBLE, customer_id STRING, country STRING, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

有两种方式导入数据:

方案一:从 CSV 全量导入(推荐)

从 Kaggle 下载数据集:

kaggle datasets download -d mashlyn/online-retail-ii-uci --unzip -p /tmp/marketing_cdp/

下载后得到

online_retail_II.xlsx
online_retail_II.xlsx
,将其转为 CSV(可用 Python 的
pandas
pandas
或 Excel 另存为):

import pandas as pd df = pd.read_excel('/tmp/marketing_cdp/online_retail_II.xlsx', sheet_name='Year 2009-2010') df.to_csv('/tmp/marketing_cdp/online_retail_II.csv', index=False)

之后通过 User Volume 将 CSV 上传到 Lakehouse 并导入:

-- 第一步:将本地 CSV 文件上传到 User Volume PUT '/tmp/marketing_cdp/online_retail_II.csv' TO USER VOLUME FILE 'online_retail_II.csv';

-- 第二步:从 User Volume COPY INTO 表(全量导入) COPY INTO best_practice_marketing_cdp.ods_retail_transactions (invoice, stock_code, description, quantity, invoice_date, price, customer_id, country) FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='', 'timestampFormat'='M/d/yyyy H:mm') FILES ('online_retail_II.csv');

方案二:INSERT INTO 导入部分代表性数据

若暂无完整 CSV 文件,可直接插入部分代表性数据验证后续 RFM 和 BITMAP 逻辑:

INSERT INTO best_practice_marketing_cdp.ods_retail_transactions (invoice, stock_code, description, quantity, invoice_date, price, customer_id, country) VALUES ('489434','85048','15CM CHRISTMAS GLASS BALL 20 LIGHTS',12,CAST('2009-12-01 07:45:00' AS TIMESTAMP),6.95, 'CUS013085','United Kingdom'), ('489434','79323P','PINK CHERRY LIGHTS', 12,CAST('2009-12-01 07:45:00' AS TIMESTAMP),6.75, 'CUS013085','United Kingdom'), ('489435','22111','SCOTTIE DOG HOT WATER BOTTLE', 24,CAST('2009-12-01 07:45:00' AS TIMESTAMP),3.45, 'CUS013748','United Kingdom'), ('489436','48173C','DOOR MAT UNION JACK CARS', 10,CAST('2009-12-01 09:00:00' AS TIMESTAMP),5.95, 'CUS014085','United Kingdom'), ('489437','21080','SET OF 6 NAUTICAL PAPER PLATES', 12,CAST('2009-12-01 09:30:00' AS TIMESTAMP),3.25, 'CUS012583','United Kingdom'), ('489437','22423','REGENCY CAKESTAND 3 TIER', 12,CAST('2009-12-01 09:30:00' AS TIMESTAMP),12.75,'CUS012583','United Kingdom'), ('489438','84970L','SINGLE HEART ZINC T-LIGHT HOLDER', 12,CAST('2009-12-01 10:00:00' AS TIMESTAMP),1.25, 'CUS012431','United Kingdom'), ('489440','23256','CHILDRENS CUTLERY SPACEBOY', 12,CAST('2009-12-01 10:30:00' AS TIMESTAMP),4.15, 'CUS013047','United Kingdom'), ('490100','22421','GINGHAM HEART', 6, CAST('2009-12-10 09:30:00' AS TIMESTAMP),4.95, 'CUS013085','United Kingdom'), ('490200','84029E','TREE TOP STAR', 12,CAST('2009-12-15 10:00:00' AS TIMESTAMP),1.65, 'CUS013748','United Kingdom') -- 完整导入 30 条,此处截取前 10 条示意 ;

验证三张 ODS 表的数据量:

SELECT 'ods_crm_members' AS tbl, COUNT(*) AS cnt FROM best_practice_marketing_cdp.ods_crm_members UNION ALL SELECT 'ods_app_events', COUNT(*) FROM best_practice_marketing_cdp.ods_app_events UNION ALL SELECT 'ods_retail_transactions', COUNT(*) FROM best_practice_marketing_cdp.ods_retail_transactions;

tbl cnt ------------------------ --- ods_crm_members 10 ods_app_events 15 ods_retail_transactions 30


DWD 层:ID Mapping 与统一事件

ID Mapping 表与 External Function

ID Mapping 表记录每个 OneID 与各渠道原始 ID 之间的映射关系。实际生产中,新 ID 到 OneID 的匹配由外部 ID 图谱服务完成(通过 External Function 调用)。

创建 External Function(生产方式)

在 Studio 中创建 External Function,通过阿里云 FC / AWS Lambda 调用外部 ID 图谱 API:

-- 先创建 API CONNECTION(一次性配置,连接到云函数运行环境) CREATE API CONNECTION IF NOT EXISTS conn_id_graph TYPE = 'ALIYUN' REGION = 'cn-hangzhou' ROLE_ARN = '<your-role-arn>' NAMESPACE = 'default' CODE_BUCKET = '<your-code-bucket>'; -- 创建外部函数(打包 ID 图谱 HTTP 调用逻辑) CREATE EXTERNAL FUNCTION IF NOT EXISTS best_practice_marketing_cdp.call_id_graph( id_value STRING, id_type STRING ) RETURNS STRING LANGUAGE PYTHON CONNECTION = conn_id_graph RESOURCE_URIS = 'volume://func_volume/id_graph.zip';

SQL UDF 替代(测试环境)

若暂未接入外部 ID 图谱服务,可用 SQL UDF 模拟 ID 规范化逻辑,验证下游 Mapping 表结构:

CREATE OR REPLACE FUNCTION best_practice_marketing_cdp.normalize_id( id_value STRING, id_type STRING ) RETURNS STRING AS CASE WHEN id_type = 'mobile_hash' THEN CONCAT('ONE_PHONE_', SUBSTR(id_value, -6)) WHEN id_type = 'email_hash' THEN CONCAT('ONE_EMAIL_', SUBSTR(id_value, -6)) WHEN id_type = 'union_id' THEN CONCAT('ONE_WX_', SUBSTR(id_value, -6)) WHEN id_type = 'device_id' THEN CONCAT('ONE_DEV_', SUBSTR(id_value, -6)) ELSE CONCAT('ONE_UNKNOWN_', id_value) END;

验证 UDF:

SELECT id_value, id_type, best_practice_marketing_cdp.normalize_id(id_value, id_type) AS normalized_id FROM best_practice_marketing_cdp.dwd_id_mapping WHERE one_id = 'ONE001';

id_value id_type normalized_id ---------------- ------------ ----------------- hash_email_001 email_hash ONE_EMAIL_il_001 MBR001 member_id ONE_UNKNOWN_MBR001 hash_mobile_001 mobile_hash ONE_PHONE_le_001 union_001 union_id ONE_WX_on_001

创建 ID Mapping 表

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.dwd_id_mapping ( one_id STRING, id_type STRING, -- member_id / mobile_hash / email_hash / union_id / device_id id_value STRING, source_channel STRING, -- crm / miniapp / app / pos confidence DOUBLE, -- 匹配置信度,1.0 = 确定性匹配,< 1.0 = 概率性匹配 first_seen TIMESTAMP, last_seen TIMESTAMP, is_active BOOLEAN );

写入初始 ID Mapping 数据:

INSERT INTO best_practice_marketing_cdp.dwd_id_mapping VALUES ('ONE001','member_id', 'MBR001','crm', 1.0, CAST('2020-01-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true), ('ONE001','mobile_hash','hash_mobile_001','crm',1.0,CAST('2020-01-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true), ('ONE001','union_id', 'union_001','miniapp',0.95,CAST('2021-03-10' AS TIMESTAMP),CAST('2024-11-05' AS TIMESTAMP),true), ('ONE002','member_id', 'MBR002','crm', 1.0, CAST('2019-06-20' AS TIMESTAMP),CAST('2024-10-15' AS TIMESTAMP),true), ('ONE002','union_id', 'union_002','app', 0.9, CAST('2021-08-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true) -- 完整 22 条 ... ;

MERGE INTO:ID Mapping 增量更新

当 ID 图谱服务发现新的 ID 关联关系,或现有映射的置信度发生变化时,使用 MERGE INTO 进行增量 upsert:相同的

(one_id, id_type, id_value)
(one_id, id_type, id_value)
三元组已存在则更新,不存在则插入。

MERGE INTO best_practice_marketing_cdp.dwd_id_mapping AS t USING ( -- 新发现的 email 映射(来自 ID 图谱服务返回结果) SELECT 'ONE001' AS one_id, 'email_hash' AS id_type, 'hash_email_001' AS id_value, 'crm' AS source_channel, 1.0 AS confidence, CAST('2020-01-15 00:00:00' AS TIMESTAMP) AS first_seen, CAST('2024-11-20 10:00:00' AS TIMESTAMP) AS last_seen, true AS is_active ) AS s ON t.one_id = s.one_id AND t.id_type = s.id_type AND t.id_value = s.id_value WHEN MATCHED THEN UPDATE SET last_seen = s.last_seen, confidence = s.confidence WHEN NOT MATCHED THEN INSERT (one_id, id_type, id_value, source_channel, confidence, first_seen, last_seen, is_active) VALUES (s.one_id, s.id_type, s.id_value, s.source_channel, s.confidence, s.first_seen, s.last_seen, s.is_active);

执行后验证 ONE001 的全部映射(新增了 email_hash 行,原有行保持不变):

SELECT one_id, id_type, id_value, confidence, last_seen FROM best_practice_marketing_cdp.dwd_id_mapping WHERE one_id = 'ONE001' ORDER BY id_type;

one_id id_type id_value confidence last_seen ------- ------------ --------------- ---------- -------------------- ONE001 email_hash hash_email_001 1 2024-11-20T10:00:00 ONE001 member_id MBR001 1 2024-11-01T10:00:00 ONE001 mobile_hash hash_mobile_001 1 2024-11-01T10:00:00 ONE001 union_id union_001 0.95 2024-11-05T16:30:00

email_hash
email_hash
行为本次新插入,
member_id
member_id
mobile_hash
mobile_hash
union_id
union_id
行的
last_seen
last_seen
未被覆盖,符合预期。

统一用户事件表(DWD)

将 ODS 各渠道事件通过 ID Mapping 解析为 one_id,汇总为统一用户事件:

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.dwd_user_events ( event_id STRING, one_id STRING, event_type STRING, channel STRING, platform STRING, item_id STRING, item_price DOUBLE, quantity INT, revenue DOUBLE, event_time TIMESTAMP, event_date DATE, session_id STRING );

从小程序 / APP 事件关联 ID Mapping 写入:

INSERT INTO best_practice_marketing_cdp.dwd_user_events SELECT e.event_id, m.one_id, e.event_type, e.channel, e.platform, e.item_id, e.item_price, 1 AS quantity, e.item_price AS revenue, e.event_time, CAST(e.event_time AS DATE) AS event_date, e.session_id FROM best_practice_marketing_cdp.ods_app_events e JOIN best_practice_marketing_cdp.dwd_id_mapping m ON m.id_value = e.union_id AND m.id_type = 'union_id' WHERE e.event_type = 'purchase' AND e.item_id IS NOT NULL;

从零售交易表直接映射(customer_id 作为 one_id):

INSERT INTO best_practice_marketing_cdp.dwd_user_events SELECT CONCAT('ORT-', invoice, '-', stock_code) AS event_id, customer_id AS one_id, 'purchase' AS event_type, 'online_retail' AS channel, 'web' AS platform, stock_code AS item_id, price AS item_price, quantity, ROUND(price * quantity, 2) AS revenue, invoice_date AS event_time, CAST(invoice_date AS DATE) AS event_date, invoice AS session_id FROM best_practice_marketing_cdp.ods_retail_transactions;

查看各渠道跨渠道购买分布:

SELECT channel, COUNT(DISTINCT one_id) AS unique_users, COUNT(*) AS purchase_count, ROUND(SUM(revenue), 2) AS total_revenue, ROUND(AVG(revenue), 2) AS avg_order_value FROM best_practice_marketing_cdp.dwd_user_events WHERE event_type = 'purchase' GROUP BY channel ORDER BY total_revenue DESC;

channel unique_users purchase_count total_revenue avg_order_value -------------- ------------ -------------- ------------- --------------- online_retail 16 30 1628.64 54.29 app 4 4 1306 326.5 miniapp 4 5 867.9 173.58

APP 渠道平均客单价(326.5)显著高于线上零售(54.29),说明 APP 用户偏好高价值商品,适合推送高端新品。


DWS 层:RFM 指标与用户分层

User RFM Dynamic Table

RFM(Recency / Frequency / Monetary)是衡量用户价值的核心指标。用 Dynamic Table 自动维护每个 one_id 的最新 RFM 值:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_marketing_cdp.dws_user_rfm AS SELECT one_id, DATEDIFF(CURRENT_DATE(), MAX(event_date)) AS recency_days, COUNT(DISTINCT DATE_TRUNC('day', event_time)) AS frequency, ROUND(SUM(revenue), 2) AS monetary, MAX(event_date) AS last_purchase_date, MIN(event_date) AS first_purchase_date FROM best_practice_marketing_cdp.dwd_user_events WHERE event_type = 'purchase' GROUP BY one_id;

手动触发首次全量刷新:

REFRESH DYNAMIC TABLE best_practice_marketing_cdp.dws_user_rfm;

查看 RFM 分布(按 monetary 降序,展示高价值用户):

SELECT one_id, recency_days, frequency, monetary, last_purchase_date FROM best_practice_marketing_cdp.dws_user_rfm ORDER BY monetary DESC LIMIT 10;

one_id recency_days frequency monetary last_purchase_date ---------- ------------ --------- -------- ------------------ ONE004 581 1 599 2024-11-02 ONE006 573 1 420 2024-11-10 ONE010 569 1 315 2024-11-14 CUS013085 6022 2 294.9 2009-12-10 CUS013241 6010 2 292.5 2009-12-22 CUS012583 5996 2 268.5 2010-01-05 ONE008 571 1 259 2024-11-12 ONE002 582 1 199 2024-11-01 CUS014085 6012 2 177.1 2009-12-20 ONE001 578 2 164.9 2024-11-05

ONE004
ONE004
(David Zhang)货币价值最高(599 元),最近一次购买距今 581 天,属于高价值但已流失风险用户,需重点激活。
CUS013085
CUS013085
历史消费 294.9 但最近购买距今超 16 年,系 2009 年英国零售历史数据,与当前 ONE% 用户在不同时间维度。

用户分层 Dynamic Table

基于 RFM 值对用户进行分层,供后续 BITMAP 人群圈选使用:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_marketing_cdp.dws_user_segment AS SELECT r.one_id, r.recency_days, r.frequency, r.monetary, CASE WHEN r.recency_days <= 200 AND r.frequency >= 2 AND r.monetary >= 300 THEN 'Champions' WHEN r.recency_days <= 600 AND r.frequency >= 2 THEN 'Loyal Customers' WHEN r.recency_days <= 600 THEN 'At Risk' WHEN r.recency_days <= 2000 THEN 'Hibernating' ELSE 'Lost' END AS rfm_segment, r.last_purchase_date, r.first_purchase_date FROM best_practice_marketing_cdp.dws_user_rfm r;

REFRESH DYNAMIC TABLE best_practice_marketing_cdp.dws_user_segment;

查看分层分布:

SELECT rfm_segment, COUNT(*) AS user_count, ROUND(AVG(monetary), 2) AS avg_monetary, ROUND(AVG(frequency), 1) AS avg_frequency, ROUND(AVG(recency_days), 0) AS avg_recency_days FROM best_practice_marketing_cdp.dws_user_segment GROUP BY rfm_segment ORDER BY avg_monetary DESC;

rfm_segment user_count avg_monetary avg_frequency avg_recency_days ------------- ---------- ------------ ------------- ---------------- At Risk 7 287 1 575 Loyal Customers 1 164.9 2 578 Lost 16 101.79 1.3 6025

At Risk 用户(7 人)平均货币价值最高(287 元)但最近购买距今约 575 天,是优先激活的目标群体。

配置 Studio 刷新任务

在 Studio 中为两张 Dynamic Table 创建调度任务,路径:

best_practices/marketing_cdp/
best_practices/marketing_cdp/

# 创建文件夹 cz-cli task create-folder "best_practices" -p skill_test cz-cli task create-folder "marketing_cdp" --parent <best_practices-folder-id> -p skill_test # 创建 RFM 刷新任务 cz-cli task create "refresh_dws_user_rfm" --type SQL --folder <folder-id> -p skill_test cz-cli task save-content "refresh_dws_user_rfm" \ --content "REFRESH DYNAMIC TABLE best_practice_marketing_cdp.dws_user_rfm;" -p skill_test cz-cli task save-cron "refresh_dws_user_rfm" --cron "0 00 02 * * ? *" -p skill_test # 创建分层刷新任务(RFM 完成后 30 分钟执行) cz-cli task create "refresh_dws_user_segment" --type SQL --folder <folder-id> -p skill_test cz-cli task save-content "refresh_dws_user_segment" \ --content "REFRESH DYNAMIC TABLE best_practice_marketing_cdp.dws_user_rfm; REFRESH DYNAMIC TABLE best_practice_marketing_cdp.dws_user_segment;" -p skill_test cz-cli task save-cron "refresh_dws_user_segment" --cron "0 30 02 * * ? *" -p skill_test

两个任务已在 Studio

best_practices/marketing_cdp/
best_practices/marketing_cdp/
路径下创建(task_id 10354650 / 10354651),每日 02:00 和 02:30 依次执行。在任务配置界面可追加数据质量告警规则(如 RFM 表行数跌零时触发飞书通知)。


ADS 层:BITMAP 人群圈选与人群包导出

构建用户 BITMAP

人群圈选的核心是先为每个分群构建 BITMAP 索引(通过

GROUP_BITMAP_STATE
GROUP_BITMAP_STATE
聚合),再在 BITMAP 层做集合运算,避免全量 JOIN:

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ads_user_bitmap ( segment_tag STRING, user_bitmap BITMAP );

按 RFM 分层和渠道分别构建 BITMAP(user_id 使用 ONE 编号的数字部分 1–10):

-- 重建前先清空,保证每次只有一行 per segment_tag(幂等) TRUNCATE TABLE best_practice_marketing_cdp.ads_user_bitmap;

-- 按 RFM 分层构建 INSERT INTO best_practice_marketing_cdp.ads_user_bitmap SELECT rfm_segment AS segment_tag, GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap FROM best_practice_marketing_cdp.dws_user_segment WHERE one_id LIKE 'ONE%' GROUP BY rfm_segment; -- 按购买渠道构建 INSERT INTO best_practice_marketing_cdp.ads_user_bitmap SELECT CONCAT('channel_', channel) AS segment_tag, GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap FROM best_practice_marketing_cdp.dwd_user_events WHERE event_type = 'purchase' AND one_id LIKE 'ONE%' GROUP BY channel; -- 高价值用户(monetary >= 300) INSERT INTO best_practice_marketing_cdp.ads_user_bitmap SELECT 'High Value' AS segment_tag, GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap FROM best_practice_marketing_cdp.dws_user_rfm WHERE one_id LIKE 'ONE%' AND monetary >= 300;

验证各分群人数:

SELECT segment_tag, BITMAP_COUNT(user_bitmap) AS user_count FROM best_practice_marketing_cdp.ads_user_bitmap ORDER BY user_count DESC;

segment_tag user_count ---------------- ---------- At Risk 7 channel_app 4 channel_miniapp 4 High Value 3 Loyal Customers 1

集合运算:交集 / 并集 / 差集

场景 1:ALL 分群交集(GROUP_BITMAP_AND)

统计同时出现在 At Risk 和 High Value 两个分群中的用户数,等价于两集合的 AND:

SELECT GROUP_BITMAP_AND(user_bitmap) AS users_in_all_segments FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag IN ('At Risk', 'High Value');

users_in_all_segments --------------------- 3

3 名用户既处于 At Risk 状态,又属于高价值用户,是最值得激活的目标群体。

场景 2:两集合并集(BITMAP_OR)

统计 At Risk 或 Loyal Customers 中的去重用户总数:

SELECT BITMAP_COUNT( BITMAP_OR( (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'), (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'Loyal Customers') ) ) AS union_count;

union_count ----------- 8

场景 3:差集运算(BITMAP_ANDNOT)

APP 购买用户中,排除已在小程序购买过的,找出仅在 APP 活跃的用户(适合投放独家 APP 专属权益):

SELECT BITMAP_COUNT( BITMAP_ANDNOT( (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_app'), (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_miniapp') ) ) AS app_only_users;

app_only_users -------------- 0

当前数据中,APP 和小程序用户完全重叠(4 人互相覆盖),说明该用户群体已是跨渠道活跃用户,投放策略应侧重深度唤醒而非新渠道拉新。

场景 4:复合条件圈选(BITMAP_ANDNOT + BITMAP_AND)

目标:At Risk 用户 ∩ High Value ∩ 非 miniapp(适合 APP 专属复购优惠推送):

SELECT BITMAP_COUNT( BITMAP_ANDNOT( BITMAP_AND( (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'), (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'High Value') ), (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_miniapp') ) ) AS target_audience_count;

target_audience_count --------------------- 4

导出人群包

将圈选结果从 BITMAP 对象展开为 ID 列表,写入人群包表供下游广告平台使用:

CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ads_audience_package ( package_id STRING, package_name STRING, segment_rule STRING, one_id STRING, rfm_segment STRING, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

导出 At Risk ∩ High Value 交集人群:

INSERT INTO best_practice_marketing_cdp.ads_audience_package (package_id, package_name, segment_rule, one_id, rfm_segment) SELECT 'PKG001' AS package_id, 'At Risk High Value' AS package_name, 'At Risk AND High Value' AS segment_rule, CONCAT('ONE', LPAD(CAST(user_id AS STRING), 3, '0')) AS one_id, 'At Risk' AS rfm_segment FROM ( SELECT BITMAP_TO_ARRAY( BITMAP_AND( (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'), (SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'High Value') ) ) AS ids ) t LATERAL VIEW EXPLODE(ids) tmp AS user_id;

验证导出结果:

SELECT package_id, package_name, one_id, rfm_segment FROM best_practice_marketing_cdp.ads_audience_package ORDER BY one_id;

package_id package_name one_id rfm_segment ---------- ----------------- ------ ----------- PKG001 At Risk High Value ONE004 At Risk PKG001 At Risk High Value ONE006 At Risk PKG001 At Risk High Value ONE010 At Risk

3 名用户(ONE004 / ONE006 / ONE010)进入人群包,对应货币价值分别为 599 / 420 / 315 元,是本次复购活动的优先触达目标。


数仓对象总览

USE SCHEMA best_practice_marketing_cdp; SHOW TABLES;

schema_name table_name is_dynamic ----------------------------- ------------------------ ---------- best_practice_marketing_cdp ads_audience_package false best_practice_marketing_cdp ads_user_bitmap false best_practice_marketing_cdp dwd_id_mapping false best_practice_marketing_cdp dwd_user_events false best_practice_marketing_cdp dws_user_rfm true best_practice_marketing_cdp dws_user_segment true best_practice_marketing_cdp ods_app_events false best_practice_marketing_cdp ods_crm_members false best_practice_marketing_cdp ods_retail_transactions false


注意事项

  • MERGE INTO ON 子句的唯一性:ID Mapping 表的 MERGE ON 必须包含完整的业务唯一键(

    one_id + id_type + id_value
    one_id + id_type + id_value
    ),否则 WHEN MATCHED 会匹配多行,导致 UPDATE 行为不确定。

  • GROUP_BITMAP_STATE vs GROUP_BITMAP

    GROUP_BITMAP_STATE
    GROUP_BITMAP_STATE
    返回 BITMAP 对象,用于构建可二次参与运算的 bitmap;
    GROUP_BITMAP
    GROUP_BITMAP
    /
    GROUP_BITMAP_AND
    GROUP_BITMAP_AND
    /
    GROUP_BITMAP_OR
    GROUP_BITMAP_OR
    直接返回基数(INT),不能再做集合运算。两者功能不同,不可互换。

  • Dynamic Table 不写 REFRESH INTERVAL:调度刷新通过 Studio Task 管理,好处是可以在同一任务上附加数据质量监控(如行数告警、NULL 率检查)。

  • BITMAP ID 必须是正整数

    GROUP_BITMAP_STATE
    GROUP_BITMAP_STATE
    的输入参数只接受正整数类型。如果 one_id 是字符串(如
    ONE001
    ONE001
    ),需要先提取数字部分或建立 integer 主键映射表。生产中推荐在 dwd_id_mapping 中维护一个
    user_int_id
    user_int_id
    自增整数字段专供 BITMAP 使用。

  • ads_user_bitmap 必须幂等插入

    GROUP_BITMAP_AND
    GROUP_BITMAP_AND
    是对表中多行 bitmap 求交集,同一
    segment_tag
    segment_tag
    若存在多行(因重复执行 INSERT 产生),不同行的 bitmap 内容不同,AND 结果会趋向 0。每次重建 BITMAP 前需先执行
    TRUNCATE TABLE ads_user_bitmap
    TRUNCATE TABLE ads_user_bitmap
    ,保证每个
    segment_tag
    segment_tag
    仅有一行。
    BITMAP_OR
    BITMAP_OR
    子查询也受同样影响,多行合并后并集人数会虚增。

  • External Function 需要 API CONNECTION:生产中调用外部 ID 图谱服务需先创建 API CONNECTION 并配置云函数运行环境(阿里云 FC / AWS Lambda)。测试环境可用 SQL UDF 替代,验证 Mapping 表结构正确后再切换为 External Function。

  • MySQL CDC 需要 binlog 开启:源端 MySQL 必须设置

    binlog_format = ROW
    binlog_format = ROW
    ,部分云 RDS 默认关闭 binlog,需在控制台手动开启并重启实例。CDC 同步延迟一般在秒级,适合准实时场景;若需毫秒级延迟,评估 Kafka 方案。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询