-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/crm_members.csv' TO USER VOLUME FILE 'crm_members.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_marketing_cdp.ods_crm_members
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('crm_members.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_marketing_cdp.ods_crm_members VALUES
('MBR001','hash_mobile_001','hash_email_001','Alice Wang', 'F',CAST('1990-05-12' AS DATE),CAST('2020-01-15' AS DATE),'offline','gold', 1200,CAST('2024-11-01 10:00:00' AS TIMESTAMP)),
('MBR002','hash_mobile_002','hash_email_002','Bob Chen', 'M',CAST('1985-08-23' AS DATE),CAST('2019-06-20' AS DATE),'online', 'silver',800, CAST('2024-10-15 14:30:00' AS TIMESTAMP)),
('MBR003','hash_mobile_003','hash_email_003','Carol Liu', 'F',CAST('1995-03-07' AS DATE),CAST('2021-09-10' AS DATE),'miniapp','bronze',350, CAST('2024-11-10 09:15:00' AS TIMESTAMP)),
('MBR004','hash_mobile_004','hash_email_004','David Zhang','M',CAST('1988-12-01' AS DATE),CAST('2018-03-05' AS DATE),'offline','gold', 2500,CAST('2024-09-20 16:45:00' AS TIMESTAMP)),
('MBR005','hash_mobile_005','hash_email_005','Eve Li', 'F',CAST('1992-07-19' AS DATE),CAST('2022-11-08' AS DATE),'online', 'silver',620, CAST('2024-11-05 11:00:00' AS TIMESTAMP));
-- 第一步:将本地 CSV 文件上传到 User Volume
PUT '/tmp/marketing_cdp/online_retail_II.csv' TO USER VOLUME FILE 'online_retail_II.csv';
-- 第二步:从 User Volume COPY INTO 表(全量导入)
COPY INTO best_practice_marketing_cdp.ods_retail_transactions
(invoice, stock_code, description, quantity, invoice_date, price, customer_id, country)
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='', 'timestampFormat'='M/d/yyyy H:mm')
FILES ('online_retail_II.csv');
方案二:INSERT INTO 导入部分代表性数据
若暂无完整 CSV 文件,可直接插入部分代表性数据验证后续 RFM 和 BITMAP 逻辑:
INSERT INTO best_practice_marketing_cdp.ods_retail_transactions
(invoice, stock_code, description, quantity, invoice_date, price, customer_id, country)
VALUES
('489434','85048','15CM CHRISTMAS GLASS BALL 20 LIGHTS',12,CAST('2009-12-01 07:45:00' AS TIMESTAMP),6.95, 'CUS013085','United Kingdom'),
('489434','79323P','PINK CHERRY LIGHTS', 12,CAST('2009-12-01 07:45:00' AS TIMESTAMP),6.75, 'CUS013085','United Kingdom'),
('489435','22111','SCOTTIE DOG HOT WATER BOTTLE', 24,CAST('2009-12-01 07:45:00' AS TIMESTAMP),3.45, 'CUS013748','United Kingdom'),
('489436','48173C','DOOR MAT UNION JACK CARS', 10,CAST('2009-12-01 09:00:00' AS TIMESTAMP),5.95, 'CUS014085','United Kingdom'),
('489437','21080','SET OF 6 NAUTICAL PAPER PLATES', 12,CAST('2009-12-01 09:30:00' AS TIMESTAMP),3.25, 'CUS012583','United Kingdom'),
('489437','22423','REGENCY CAKESTAND 3 TIER', 12,CAST('2009-12-01 09:30:00' AS TIMESTAMP),12.75,'CUS012583','United Kingdom'),
('489438','84970L','SINGLE HEART ZINC T-LIGHT HOLDER', 12,CAST('2009-12-01 10:00:00' AS TIMESTAMP),1.25, 'CUS012431','United Kingdom'),
('489440','23256','CHILDRENS CUTLERY SPACEBOY', 12,CAST('2009-12-01 10:30:00' AS TIMESTAMP),4.15, 'CUS013047','United Kingdom'),
('490100','22421','GINGHAM HEART', 6, CAST('2009-12-10 09:30:00' AS TIMESTAMP),4.95, 'CUS013085','United Kingdom'),
('490200','84029E','TREE TOP STAR', 12,CAST('2009-12-15 10:00:00' AS TIMESTAMP),1.65, 'CUS013748','United Kingdom')
-- 完整导入 30 条,此处截取前 10 条示意
;
验证三张 ODS 表的数据量:
SELECT 'ods_crm_members' AS tbl, COUNT(*) AS cnt FROM best_practice_marketing_cdp.ods_crm_members
UNION ALL
SELECT 'ods_app_events', COUNT(*) FROM best_practice_marketing_cdp.ods_app_events
UNION ALL
SELECT 'ods_retail_transactions', COUNT(*) FROM best_practice_marketing_cdp.ods_retail_transactions;
ID Mapping 表记录每个 OneID 与各渠道原始 ID 之间的映射关系。实际生产中,新 ID 到 OneID 的匹配由外部 ID 图谱服务完成(通过 External Function 调用)。
创建 External Function(生产方式)
在 Studio 中创建 External Function,通过阿里云 FC / AWS Lambda 调用外部 ID 图谱 API:
-- 先创建 API CONNECTION(一次性配置,连接到云函数运行环境)
CREATE API CONNECTION IF NOT EXISTS conn_id_graph
TYPE = 'ALIYUN'
REGION = 'cn-hangzhou'
ROLE_ARN = '<your-role-arn>'
NAMESPACE = 'default'
CODE_BUCKET = '<your-code-bucket>';
-- 创建外部函数(打包 ID 图谱 HTTP 调用逻辑)
CREATE EXTERNAL FUNCTION IF NOT EXISTS best_practice_marketing_cdp.call_id_graph(
id_value STRING,
id_type STRING
)
RETURNS STRING
LANGUAGE PYTHON
CONNECTION = conn_id_graph
RESOURCE_URIS = 'volume://func_volume/id_graph.zip';
若暂未接入外部 ID 图谱服务,可用 SQL UDF 模拟 ID 规范化逻辑,验证下游 Mapping 表结构:
CREATE OR REPLACE FUNCTION best_practice_marketing_cdp.normalize_id(
id_value STRING,
id_type STRING
)
RETURNS STRING
AS CASE
WHEN id_type = 'mobile_hash' THEN CONCAT('ONE_PHONE_', SUBSTR(id_value, -6))
WHEN id_type = 'email_hash' THEN CONCAT('ONE_EMAIL_', SUBSTR(id_value, -6))
WHEN id_type = 'union_id' THEN CONCAT('ONE_WX_', SUBSTR(id_value, -6))
WHEN id_type = 'device_id' THEN CONCAT('ONE_DEV_', SUBSTR(id_value, -6))
ELSE CONCAT('ONE_UNKNOWN_', id_value)
END;
验证 UDF:
SELECT
id_value,
id_type,
best_practice_marketing_cdp.normalize_id(id_value, id_type) AS normalized_id
FROM best_practice_marketing_cdp.dwd_id_mapping
WHERE one_id = 'ONE001';
INSERT INTO best_practice_marketing_cdp.dwd_id_mapping VALUES
('ONE001','member_id', 'MBR001','crm', 1.0, CAST('2020-01-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true),
('ONE001','mobile_hash','hash_mobile_001','crm',1.0,CAST('2020-01-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true),
('ONE001','union_id', 'union_001','miniapp',0.95,CAST('2021-03-10' AS TIMESTAMP),CAST('2024-11-05' AS TIMESTAMP),true),
('ONE002','member_id', 'MBR002','crm', 1.0, CAST('2019-06-20' AS TIMESTAMP),CAST('2024-10-15' AS TIMESTAMP),true),
('ONE002','union_id', 'union_002','app', 0.9, CAST('2021-08-15' AS TIMESTAMP),CAST('2024-11-01' AS TIMESTAMP),true)
-- 完整 22 条 ...
;
MERGE INTO:ID Mapping 增量更新
当 ID 图谱服务发现新的 ID 关联关系,或现有映射的置信度发生变化时,使用 MERGE INTO 进行增量 upsert:相同的
(one_id, id_type, id_value)
(one_id, id_type, id_value)
三元组已存在则更新,不存在则插入。
MERGE INTO best_practice_marketing_cdp.dwd_id_mapping AS t
USING (
-- 新发现的 email 映射(来自 ID 图谱服务返回结果)
SELECT
'ONE001' AS one_id,
'email_hash' AS id_type,
'hash_email_001' AS id_value,
'crm' AS source_channel,
1.0 AS confidence,
CAST('2020-01-15 00:00:00' AS TIMESTAMP) AS first_seen,
CAST('2024-11-20 10:00:00' AS TIMESTAMP) AS last_seen,
true AS is_active
) AS s
ON t.one_id = s.one_id
AND t.id_type = s.id_type
AND t.id_value = s.id_value
WHEN MATCHED THEN
UPDATE SET
last_seen = s.last_seen,
confidence = s.confidence
WHEN NOT MATCHED THEN
INSERT (one_id, id_type, id_value, source_channel, confidence, first_seen, last_seen, is_active)
VALUES (s.one_id, s.id_type, s.id_value, s.source_channel, s.confidence, s.first_seen, s.last_seen, s.is_active);
执行后验证 ONE001 的全部映射(新增了 email_hash 行,原有行保持不变):
SELECT one_id, id_type, id_value, confidence, last_seen
FROM best_practice_marketing_cdp.dwd_id_mapping
WHERE one_id = 'ONE001'
ORDER BY id_type;
INSERT INTO best_practice_marketing_cdp.dwd_user_events
SELECT
e.event_id,
m.one_id,
e.event_type,
e.channel,
e.platform,
e.item_id,
e.item_price,
1 AS quantity,
e.item_price AS revenue,
e.event_time,
CAST(e.event_time AS DATE) AS event_date,
e.session_id
FROM best_practice_marketing_cdp.ods_app_events e
JOIN best_practice_marketing_cdp.dwd_id_mapping m
ON m.id_value = e.union_id AND m.id_type = 'union_id'
WHERE e.event_type = 'purchase' AND e.item_id IS NOT NULL;
从零售交易表直接映射(customer_id 作为 one_id):
INSERT INTO best_practice_marketing_cdp.dwd_user_events
SELECT
CONCAT('ORT-', invoice, '-', stock_code) AS event_id,
customer_id AS one_id,
'purchase' AS event_type,
'online_retail' AS channel,
'web' AS platform,
stock_code AS item_id,
price AS item_price,
quantity,
ROUND(price * quantity, 2) AS revenue,
invoice_date AS event_time,
CAST(invoice_date AS DATE) AS event_date,
invoice AS session_id
FROM best_practice_marketing_cdp.ods_retail_transactions;
查看各渠道跨渠道购买分布:
SELECT
channel,
COUNT(DISTINCT one_id) AS unique_users,
COUNT(*) AS purchase_count,
ROUND(SUM(revenue), 2) AS total_revenue,
ROUND(AVG(revenue), 2) AS avg_order_value
FROM best_practice_marketing_cdp.dwd_user_events
WHERE event_type = 'purchase'
GROUP BY channel
ORDER BY total_revenue DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_marketing_cdp.dws_user_rfm
AS
SELECT
one_id,
DATEDIFF(CURRENT_DATE(), MAX(event_date)) AS recency_days,
COUNT(DISTINCT DATE_TRUNC('day', event_time)) AS frequency,
ROUND(SUM(revenue), 2) AS monetary,
MAX(event_date) AS last_purchase_date,
MIN(event_date) AS first_purchase_date
FROM best_practice_marketing_cdp.dwd_user_events
WHERE event_type = 'purchase'
GROUP BY one_id;
⚠️ 注意:
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
不写
REFRESH INTERVAL
REFRESH INTERVAL
。定期刷新通过在 Studio 创建调度任务来管理(见下方"配置调度任务"小节),这样可以在同一任务上附加数据质量监控和告警规则。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_marketing_cdp.dws_user_segment
AS
SELECT
r.one_id,
r.recency_days,
r.frequency,
r.monetary,
CASE
WHEN r.recency_days <= 200 AND r.frequency >= 2 AND r.monetary >= 300 THEN 'Champions'
WHEN r.recency_days <= 600 AND r.frequency >= 2 THEN 'Loyal Customers'
WHEN r.recency_days <= 600 THEN 'At Risk'
WHEN r.recency_days <= 2000 THEN 'Hibernating'
ELSE 'Lost'
END AS rfm_segment,
r.last_purchase_date,
r.first_purchase_date
FROM best_practice_marketing_cdp.dws_user_rfm r;
SELECT
rfm_segment,
COUNT(*) AS user_count,
ROUND(AVG(monetary), 2) AS avg_monetary,
ROUND(AVG(frequency), 1) AS avg_frequency,
ROUND(AVG(recency_days), 0) AS avg_recency_days
FROM best_practice_marketing_cdp.dws_user_segment
GROUP BY rfm_segment
ORDER BY avg_monetary DESC;
CREATE TABLE IF NOT EXISTS best_practice_marketing_cdp.ads_user_bitmap (
segment_tag STRING,
user_bitmap BITMAP
);
按 RFM 分层和渠道分别构建 BITMAP(user_id 使用 ONE 编号的数字部分 1–10):
⚠️ 注意:每次重建 BITMAP 前必须先清空表,否则重复执行 INSERT 会导致同一
segment_tag
segment_tag
存在多行。
GROUP_BITMAP_AND
GROUP_BITMAP_AND
是对多行 bitmap 求交集——若同一分群有两行不完全相同的 bitmap,它们的 AND 结果会趋向 0,与预期不符。
BITMAP_OR
BITMAP_OR
子查询同理,会拿到多行 bitmap 导致合并结果异常。
-- 重建前先清空,保证每次只有一行 per segment_tag(幂等)
TRUNCATE TABLE best_practice_marketing_cdp.ads_user_bitmap;
-- 按 RFM 分层构建
INSERT INTO best_practice_marketing_cdp.ads_user_bitmap
SELECT
rfm_segment AS segment_tag,
GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap
FROM best_practice_marketing_cdp.dws_user_segment
WHERE one_id LIKE 'ONE%'
GROUP BY rfm_segment;
-- 按购买渠道构建
INSERT INTO best_practice_marketing_cdp.ads_user_bitmap
SELECT
CONCAT('channel_', channel) AS segment_tag,
GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap
FROM best_practice_marketing_cdp.dwd_user_events
WHERE event_type = 'purchase' AND one_id LIKE 'ONE%'
GROUP BY channel;
-- 高价值用户(monetary >= 300)
INSERT INTO best_practice_marketing_cdp.ads_user_bitmap
SELECT
'High Value' AS segment_tag,
GROUP_BITMAP_STATE(CAST(SUBSTRING(one_id, 4) AS INT)) AS user_bitmap
FROM best_practice_marketing_cdp.dws_user_rfm
WHERE one_id LIKE 'ONE%' AND monetary >= 300;
SELECT segment_tag, BITMAP_COUNT(user_bitmap) AS user_count
FROM best_practice_marketing_cdp.ads_user_bitmap
ORDER BY user_count DESC;
segment_tag user_count
---------------- ----------
At Risk 7
channel_app 4
channel_miniapp 4
High Value 3
Loyal Customers 1
集合运算:交集 / 并集 / 差集
场景 1:ALL 分群交集(GROUP_BITMAP_AND)
统计同时出现在 At Risk 和 High Value 两个分群中的用户数,等价于两集合的 AND:
SELECT GROUP_BITMAP_AND(user_bitmap) AS users_in_all_segments
FROM best_practice_marketing_cdp.ads_user_bitmap
WHERE segment_tag IN ('At Risk', 'High Value');
users_in_all_segments
---------------------
3
3 名用户既处于 At Risk 状态,又属于高价值用户,是最值得激活的目标群体。
场景 2:两集合并集(BITMAP_OR)
统计 At Risk 或 Loyal Customers 中的去重用户总数:
SELECT BITMAP_COUNT(
BITMAP_OR(
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'),
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'Loyal Customers')
)
) AS union_count;
SELECT BITMAP_COUNT(
BITMAP_ANDNOT(
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_app'),
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_miniapp')
)
) AS app_only_users;
目标:At Risk 用户 ∩ High Value ∩ 非 miniapp(适合 APP 专属复购优惠推送):
SELECT BITMAP_COUNT(
BITMAP_ANDNOT(
BITMAP_AND(
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'),
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'High Value')
),
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'channel_miniapp')
)
) AS target_audience_count;
INSERT INTO best_practice_marketing_cdp.ads_audience_package
(package_id, package_name, segment_rule, one_id, rfm_segment)
SELECT
'PKG001' AS package_id,
'At Risk High Value' AS package_name,
'At Risk AND High Value' AS segment_rule,
CONCAT('ONE', LPAD(CAST(user_id AS STRING), 3, '0')) AS one_id,
'At Risk' AS rfm_segment
FROM (
SELECT BITMAP_TO_ARRAY(
BITMAP_AND(
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'At Risk'),
(SELECT user_bitmap FROM best_practice_marketing_cdp.ads_user_bitmap WHERE segment_tag = 'High Value')
)
) AS ids
) t
LATERAL VIEW EXPLODE(ids) tmp AS user_id;
验证导出结果:
SELECT package_id, package_name, one_id, rfm_segment
FROM best_practice_marketing_cdp.ads_audience_package
ORDER BY one_id;
package_id package_name one_id rfm_segment
---------- ----------------- ------ -----------
PKG001 At Risk High Value ONE004 At Risk
PKG001 At Risk High Value ONE006 At Risk
PKG001 At Risk High Value ONE010 At Risk
External Function 需要 API CONNECTION:生产中调用外部 ID 图谱服务需先创建 API CONNECTION 并配置云函数运行环境(阿里云 FC / AWS Lambda)。测试环境可用 SQL UDF 替代,验证 Mapping 表结构正确后再切换为 External Function。