CREATE SCHEMA IF NOT EXISTS best_practice_content_rec;
Bronze 层:行为事件表(Kafka PIPE 目标)
建表
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_interaction_events (
event_id STRING,
user_id STRING,
content_id STRING,
event_type STRING, -- watch / like / share
session_id STRING,
duration_sec INT, -- 观看时长(秒),like/share 为 0
event_time TIMESTAMP,
platform STRING, -- pc / mobile / console
ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
创建 Bloomfilter Index
Silver 层查询会频繁按
content_id
content_id
过滤,该列基数高,适合 Bloomfilter Index。
-- 必须在 best_practice_content_rec 上下文中执行
USE SCHEMA best_practice_content_rec;
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_content_id
ON TABLE bronze_interaction_events (content_id);
⚠️ 注意:
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
要求与目标表在同一 Schema 上下文。若跨 Schema 执行会报 "index and table must in the same schema" 错误。可通过
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_kafka_raw_events (
value STRING
);
CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_user_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '30'
AS
COPY INTO best_practice_content_rec.bronze_kafka_raw_events
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'user_behavior_events', -- topic 名称
'',
'cz_content_rec_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/interaction_events.csv' TO USER VOLUME FILE 'interaction_events.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_content_rec.bronze_interaction_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('interaction_events.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_content_rec.bronze_interaction_events
(event_id, user_id, content_id, event_type, session_id, duration_sec, event_time, platform)
VALUES
('EVT001','USR001','GAME_730', 'watch', 'SES001', 3600, CAST('2026-05-01 10:00:00' AS TIMESTAMP), 'pc'),
('EVT002','USR001','GAME_570', 'like', 'SES001', 0, CAST('2026-05-01 10:05:00' AS TIMESTAMP), 'pc'),
('EVT003','USR002','GAME_730', 'share', 'SES002', 0, CAST('2026-05-01 11:00:00' AS TIMESTAMP), 'mobile'),
('EVT004','USR002','GAME_292030', 'watch', 'SES002', 7200, CAST('2026-05-01 11:30:00' AS TIMESTAMP), 'mobile'),
('EVT005','USR003','GAME_1091500','watch', 'SES003', 1800, CAST('2026-05-01 12:00:00' AS TIMESTAMP), 'pc'),
('EVT006','USR003','GAME_730', 'like', 'SES003', 0, CAST('2026-05-01 12:10:00' AS TIMESTAMP), 'pc'),
('EVT007','USR004','GAME_570', 'watch', 'SES004', 5400, CAST('2026-05-01 13:00:00' AS TIMESTAMP), 'console'),
('EVT008','USR004','GAME_292030', 'like', 'SES004', 0, CAST('2026-05-01 13:20:00' AS TIMESTAMP), 'console'),
('EVT009','USR005','GAME_1091500','share', 'SES005', 0, CAST('2026-05-01 14:00:00' AS TIMESTAMP), 'mobile'),
('EVT010','USR005','GAME_730', 'watch', 'SES005', 2700, CAST('2026-05-01 14:15:00' AS TIMESTAMP), 'mobile'),
('EVT011','USR001','GAME_1091500','watch', 'SES006', 4320, CAST('2026-05-02 09:00:00' AS TIMESTAMP), 'pc'),
('EVT012','USR002','GAME_570', 'share', 'SES007', 0, CAST('2026-05-02 10:00:00' AS TIMESTAMP), 'pc'),
('EVT013','USR003','GAME_292030', 'watch', 'SES008', 6600, CAST('2026-05-02 11:00:00' AS TIMESTAMP), 'console'),
('EVT014','USR004','GAME_730', 'share', 'SES009', 0, CAST('2026-05-02 12:00:00' AS TIMESTAMP), 'mobile'),
('EVT015','USR005','GAME_570', 'like', 'SES010', 0, CAST('2026-05-02 13:00:00' AS TIMESTAMP), 'pc');
验证 Bronze 层行数:
SELECT COUNT(*) AS bronze_event_count
FROM best_practice_content_rec.bronze_interaction_events;
bronze_event_count
------------------
15
Bronze 层:内容元数据表(OSS PIPE 导入)
建表
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_metadata (
content_id STRING,
title STRING,
description STRING, -- 中文简介,支持全文搜索
tags STRING, -- 逗号分隔的标签
category STRING,
release_date DATE,
language STRING,
developer STRING,
price DOUBLE,
positive_pct DOUBLE, -- 好评率(0–1)
load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
创建 Inverted Index(中文全文检索)
内容描述为中文,对
description
description
列创建倒排索引,并指定
chinese
chinese
分词器:
USE SCHEMA best_practice_content_rec;
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_description
ON TABLE bronze_content_metadata (description)
WITH PROPERTIES ('analyzer' = 'chinese');
对存量数据构建索引(
CREATE INDEX
CREATE INDEX
只对新写入数据生效,存量数据须手动 BUILD):
USE SCHEMA best_practice_content_rec;
BUILD INDEX idx_inv_description ON bronze_content_metadata;
INSERT INTO best_practice_content_rec.bronze_content_embedding
(content_id, title, description, tags, embedding)
VALUES
('GAME_730', 'Counter-Strike 2', '多人竞技射击游戏,玩家加入恐怖分子或反恐精英阵营对抗',
'FPS,Shooter,Multiplayer,Competitive,Action',
CAST('[0.3370,0.3740,0.4110,0.4480,0.4850,0.5220,0.5590,0.5960,0.6330,0.6700,0.7070,0.7440,0.7810,0.8180,0.8550,0.8920,0.9290,0.9660,0.0030,0.0400,0.0770,0.1140,0.1510,0.1880,0.2250,0.2620,0.2990,0.3360,0.3730,0.4100,0.4470,0.4840,0.5210,0.5580,0.5950,0.6320,0.6690,0.7060,0.7430,0.7800,0.8170,0.8540,0.8910,0.9280,0.9650,0.0020,0.0390,0.0760,0.1130,0.1500,0.1870,0.2240,0.2610,0.2980,0.3350,0.3720,0.4090,0.4460,0.4830,0.5200,0.5570,0.5940,0.6310,0.6680,0.7050,0.7420,0.7790,0.8160,0.8530,0.8900,0.9270,0.9640,0.0010,0.0380,0.0750,0.1120,0.1490,0.1860,0.2230,0.2600,0.2970,0.3340,0.3710,0.4080,0.4450,0.4820,0.5190,0.5560,0.5930,0.6300,0.6670,0.7040,0.7410,0.7780,0.8150,0.8520,0.8890,0.9260,0.9630,0.0000,0.0370,0.0740,0.1110,0.1480,0.1850,0.2220,0.2590,0.2960,0.3330,0.3700,0.4070,0.4440,0.4810,0.5180,0.5550,0.5920,0.6290,0.6660,0.7030,0.7400,0.7770,0.8140,0.8510,0.8880,0.9250,0.9620,0.9990,0.0360]' AS VECTOR(128)));
💡 提示:
TO_VECTOR
TO_VECTOR
函数在当前版本中不可用,生成 VECTOR 列数据需使用
CAST('<array_string>' AS VECTOR(N))
CAST('<array_string>' AS VECTOR(N))
语法。VECTOR 维度 N 必须与建表时一致;维度不一致时会得到
NULL
NULL
向量,后续
COSINE_DISTANCE
COSINE_DISTANCE
也会返回
NULL
NULL
。
创建 Vector Index(IVFPQ)
IVFPQ(倒排文件 + 乘积量化)是推荐系统中常用的 ANN 索引类型,通过量化和分区大幅降低搜索时间复杂度:
USE SCHEMA best_practice_content_rec;
CREATE VECTOR INDEX IF NOT EXISTS idx_vec_content_embedding
ON TABLE bronze_content_embedding (embedding)
PROPERTIES(
'index_type' = 'IVFPQ',
'distance_function' = 'cosine',
'nlist' = '100', -- 聚类中心数,数据量大时调高
'M' = '32', -- 子量化器数量,影响精度和速度
'm' = '4' -- 压缩字节数
);
对存量数据构建索引:
USE SCHEMA best_practice_content_rec;
BUILD INDEX idx_vec_content_embedding ON bronze_content_embedding;
⚠️ 注意:
nlist
nlist
建议设为
sqrt(N)
sqrt(N)
到
4*sqrt(N)
4*sqrt(N)
之间(N 为向量总数)。生产环境数十万条记录时,
nlist=100
nlist=100
、
M=32
M=32
是常见起点,可根据召回率和延迟指标调优。
向量相似检索
以 The Witcher 3 的 Embedding 为查询向量,找出最相似的内容:
SELECT
e.content_id,
e.title,
ROUND(COSINE_DISTANCE(e.embedding, q.query_embedding), 6) AS cos_dist
FROM best_practice_content_rec.bronze_content_embedding e
CROSS JOIN (
SELECT embedding AS query_embedding
FROM best_practice_content_rec.bronze_content_embedding
WHERE content_id = 'GAME_292030'
) q
ORDER BY cos_dist ASC
LIMIT 4;
content_id | title | cos_dist
-------------+--------------------------+---------------------
GAME_292030 | The Witcher 3: Wild Hunt | 0
GAME_1091500 | Cyberpunk 2077 | 0.045093998312950134
GAME_271590 | Grand Theft Auto V | 0.09233599901199341
GAME_730 | Counter-Strike 2 | 0.16785000264644623
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.silver_user_content_interactions
AS
SELECT
e.event_id,
e.user_id,
e.content_id,
e.event_type,
e.session_id,
e.duration_sec,
e.event_time,
e.platform,
m.title AS content_title,
m.tags AS content_tags,
m.category AS content_category,
m.developer AS developer,
-- 归一化互动权重:信号强度从高到低
CASE
WHEN e.event_type = 'share' THEN 3.0
WHEN e.event_type = 'like' THEN 2.0
WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1.0
ELSE 0.0
END AS interaction_weight,
-- 有效交互标记(watch 时长 < 60s 视为噪声)
CASE
WHEN e.event_type IN ('like','share') THEN 1
WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1
ELSE 0
END AS is_valid
FROM best_practice_content_rec.bronze_interaction_events e
LEFT JOIN best_practice_content_rec.bronze_content_metadata m
ON e.content_id = m.content_id;
互动权重设计说明:
事件类型
权重
理由
share
share
3.0
强意愿信号,主动传播
like
like
2.0
明确的正向反馈
watch
watch
(≥60s)
1.0
隐式偏好,时长阈值过滤误触
watch
watch
(<60s)
0.0(无效)
可能是误点击或切换,去除噪声
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_content_rec.silver_user_content_interactions;
SELECT COUNT(*) AS silver_count
FROM best_practice_content_rec.silver_user_content_interactions;
silver_count
------------
15
查看用户-内容交互序列样例:
SELECT user_id, content_id, content_title, event_type, interaction_weight, is_valid
FROM best_practice_content_rec.silver_user_content_interactions
ORDER BY user_id, event_time
LIMIT 10;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_content_popularity
PARTITIONED BY (stat_date)
TBLPROPERTIES ('static_partitions' = 'true')
AS
SELECT
content_id,
content_title,
content_category,
developer,
CAST(DATE_TRUNC('day', event_time) AS DATE) AS stat_date,
COUNT(*) AS total_interactions,
SUM(CASE WHEN event_type = 'watch' AND duration_sec >= 60 THEN 1 ELSE 0 END) AS watch_count,
SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS like_count,
SUM(CASE WHEN event_type = 'share' THEN 1 ELSE 0 END) AS share_count,
SUM(interaction_weight) AS weighted_score,
COUNT(DISTINCT user_id) AS unique_users,
ROUND(AVG(CASE WHEN event_type = 'watch' THEN duration_sec ELSE NULL END), 2) AS avg_watch_sec
FROM best_practice_content_rec.silver_user_content_interactions
WHERE is_valid = 1
GROUP BY
content_id, content_title, content_category, developer,
CAST(DATE_TRUNC('day', event_time) AS DATE);
⚠️ 注意:有分区的 Dynamic Table 必须显式声明
TBLPROPERTIES ('static_partitions' = 'true')
TBLPROPERTIES ('static_partitions' = 'true')
,使用静态分区模式。不声明时系统默认动态分区推断,在增量刷新场景下可能导致分区数据覆盖异常。
手动触发首次刷新并查看结果:
REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity;
SELECT content_id, content_title, content_category, stat_date,
total_interactions, watch_count, like_count, share_count,
weighted_score, unique_users
FROM best_practice_content_rec.gold_content_popularity
ORDER BY weighted_score DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_user_interest_profile
AS
SELECT
user_id,
CAST(DATE_TRUNC('day', MAX(event_time)) AS DATE) AS profile_date,
COUNT(DISTINCT content_id) AS content_count,
SUM(interaction_weight) AS total_weight,
COLLECT_LIST(content_id) AS interacted_content_ids,
-- 按内容类别累加权重,形成类别兴趣向量
SUM(CASE WHEN content_category = 'Action' THEN interaction_weight ELSE 0 END) AS action_score,
SUM(CASE WHEN content_category = 'Strategy' THEN interaction_weight ELSE 0 END) AS strategy_score,
SUM(CASE WHEN content_category = 'RPG' THEN interaction_weight ELSE 0 END) AS rpg_score
FROM best_practice_content_rec.silver_user_content_interactions
WHERE is_valid = 1
GROUP BY user_id;
REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile;
SELECT user_id, content_count, total_weight,
action_score, strategy_score, rpg_score,
interacted_content_ids
FROM best_practice_content_rec.gold_user_interest_profile
ORDER BY total_weight DESC;