构建内容平台推荐系统数仓

以 Steam 游戏平台数据集(Steam Dataset 2025,约 24 万款游戏、390 万条评价)为基础,把用户互动行为(观看、点赞、分享)与内容元数据结合,构建支持推荐模型特征工程的三层数仓。本文端到端演示 Kafka PIPE → OSS PIPE → External Function → Bronze → Silver → Gold → ZettaPark 的完整链路,覆盖 Vector Index(IVFPQ 向量召回)、Inverted Index(中文全文检索)、Dynamic Table(每日刷新 Gold 层)四项关键能力。


概述

内容推荐系统的核心挑战是把用户行为信号和内容语义信息统一到同一个特征空间。云器 Lakehouse 通过以下组合解决数仓建设中的关键问题:

问题解决方案
用户行为事件实时高频写入,需低延迟入库Kafka PIPE 持续摄取,无需手写消费者
内容元数据(标题、标签、描述)批量导入OSS PIPE 扫描导入,文件落地自动触发
内容文本需要生成 Embedding 用于相似召回External Function 调用向量化模型,结果存入 VECTOR 列
海量内容向量的近似最近邻(ANN)检索Vector Index IVFPQ,亚线性时间复杂度的向量召回
中文内容标题/描述全文搜索Inverted Index + Chinese Analyzer,
MATCH_ALL
MATCH_ALL
函数
Bronze → Silver → Gold 自动增量计算Dynamic Table,声明式 SQL,系统自动调度依赖链
特征工程脚本需要访问 Gold 层数据ZettaPark Python Task,直接操作 Lakehouse 表

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 Bronze 层原始表和 Embedding 存储表
VECTOR(N)
VECTOR(N)
类型列
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
content_id
content_id
列创建 Bloomfilter 索引
加速高基数列点查
CREATE INVERTED INDEX
CREATE INVERTED INDEX
description
description
列创建中文倒排索引
analyzer='chinese'
analyzer='chinese'
BUILD INDEX
BUILD INDEX
对存量数据构建索引Vector / Inverted 索引均需执行
CREATE VECTOR INDEX
CREATE VECTOR INDEX
IVFPQ 向量索引,加速 ANN 检索
PROPERTIES('index_type'='IVFPQ')
PROPERTIES('index_type'='IVFPQ')
COSINE_DISTANCE
COSINE_DISTANCE
计算两个向量的余弦距离用于相似度排序,越小越相似
MATCH_ALL
MATCH_ALL
全文搜索,返回布尔值需先创建并 BUILD Inverted Index
CREATE PIPE
CREATE PIPE
创建 Kafka 或 OSS 持续摄取管道绑定到 Bronze 层目标表
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 Silver / Gold 层增量计算表不写 REFRESH INTERVAL,由 Studio Task 调度
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用
PARTITIONED BY
PARTITIONED BY
按日期分区的 Gold 层 DT配合
TBLPROPERTIES('static_partitions'='true')
TBLPROPERTIES('static_partitions'='true')

前置准备

本文所有示例在

best_practice_content_rec
best_practice_content_rec
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_content_rec;


Bronze 层:行为事件表(Kafka PIPE 目标)

建表

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_interaction_events ( event_id STRING, user_id STRING, content_id STRING, event_type STRING, -- watch / like / share session_id STRING, duration_sec INT, -- 观看时长(秒),like/share 为 0 event_time TIMESTAMP, platform STRING, -- pc / mobile / console ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

创建 Bloomfilter Index

Silver 层查询会频繁按

content_id
content_id
过滤,该列基数高,适合 Bloomfilter Index。

-- 必须在 best_practice_content_rec 上下文中执行 USE SCHEMA best_practice_content_rec; CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_content_id ON TABLE bronze_interaction_events (content_id);

配置 Kafka PIPE

用户行为事件经客户端 SDK 采集后发往 Kafka topic,PIPE 负责持续消费并写入 Bronze 表。

先创建 raw 接收表(PIPE 写入 JSON 字符串),再创建 PIPE:

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_kafka_raw_events ( value STRING ); CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_user_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '30' AS COPY INTO best_practice_content_rec.bronze_kafka_raw_events FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- 替换为实际 broker 地址 'user_behavior_events', -- topic 名称 '', 'cz_content_rec_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

方式一:通过 Kafka 实际写入(推荐)

在生产环境中,客户端 SDK 将行为事件序列化为 JSON 后发往 Kafka topic,PIPE 自动消费并写入

bronze_kafka_raw_events
bronze_kafka_raw_events
。下面是使用
kafka-python
kafka-python
构造消息的示例:

from kafka import KafkaProducer import json import time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) event = { "event_id": "EVT001", "user_id": "USR001", "content_id": "GAME_730", "event_type": "watch", "session_id": "SES001", "duration_sec": 3600, "event_time": "2026-05-01 10:00:00", "platform": "pc" } producer.send('user_behavior_events', value=event) producer.flush()

方式二:INSERT 模拟(无 Kafka 环境时)

若暂未配置 Kafka,可通过以下两种方式写入

bronze_interaction_events
bronze_interaction_events
,模拟 Kafka 消息已解析写入的效果,后续 Silver 层逻辑可以正常验证。

从本地 CSV 导入(推荐)

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/interaction_events.csv' TO USER VOLUME FILE 'interaction_events.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_content_rec.bronze_interaction_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('interaction_events.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_content_rec.bronze_interaction_events (event_id, user_id, content_id, event_type, session_id, duration_sec, event_time, platform) VALUES ('EVT001','USR001','GAME_730', 'watch', 'SES001', 3600, CAST('2026-05-01 10:00:00' AS TIMESTAMP), 'pc'), ('EVT002','USR001','GAME_570', 'like', 'SES001', 0, CAST('2026-05-01 10:05:00' AS TIMESTAMP), 'pc'), ('EVT003','USR002','GAME_730', 'share', 'SES002', 0, CAST('2026-05-01 11:00:00' AS TIMESTAMP), 'mobile'), ('EVT004','USR002','GAME_292030', 'watch', 'SES002', 7200, CAST('2026-05-01 11:30:00' AS TIMESTAMP), 'mobile'), ('EVT005','USR003','GAME_1091500','watch', 'SES003', 1800, CAST('2026-05-01 12:00:00' AS TIMESTAMP), 'pc'), ('EVT006','USR003','GAME_730', 'like', 'SES003', 0, CAST('2026-05-01 12:10:00' AS TIMESTAMP), 'pc'), ('EVT007','USR004','GAME_570', 'watch', 'SES004', 5400, CAST('2026-05-01 13:00:00' AS TIMESTAMP), 'console'), ('EVT008','USR004','GAME_292030', 'like', 'SES004', 0, CAST('2026-05-01 13:20:00' AS TIMESTAMP), 'console'), ('EVT009','USR005','GAME_1091500','share', 'SES005', 0, CAST('2026-05-01 14:00:00' AS TIMESTAMP), 'mobile'), ('EVT010','USR005','GAME_730', 'watch', 'SES005', 2700, CAST('2026-05-01 14:15:00' AS TIMESTAMP), 'mobile'), ('EVT011','USR001','GAME_1091500','watch', 'SES006', 4320, CAST('2026-05-02 09:00:00' AS TIMESTAMP), 'pc'), ('EVT012','USR002','GAME_570', 'share', 'SES007', 0, CAST('2026-05-02 10:00:00' AS TIMESTAMP), 'pc'), ('EVT013','USR003','GAME_292030', 'watch', 'SES008', 6600, CAST('2026-05-02 11:00:00' AS TIMESTAMP), 'console'), ('EVT014','USR004','GAME_730', 'share', 'SES009', 0, CAST('2026-05-02 12:00:00' AS TIMESTAMP), 'mobile'), ('EVT015','USR005','GAME_570', 'like', 'SES010', 0, CAST('2026-05-02 13:00:00' AS TIMESTAMP), 'pc');

验证 Bronze 层行数:

SELECT COUNT(*) AS bronze_event_count FROM best_practice_content_rec.bronze_interaction_events;

bronze_event_count ------------------ 15


Bronze 层:内容元数据表(OSS PIPE 导入)

建表

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_metadata ( content_id STRING, title STRING, description STRING, -- 中文简介,支持全文搜索 tags STRING, -- 逗号分隔的标签 category STRING, release_date DATE, language STRING, developer STRING, price DOUBLE, positive_pct DOUBLE, -- 好评率(0–1) load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

创建 Inverted Index(中文全文检索)

内容描述为中文,对

description
description
列创建倒排索引,并指定
chinese
chinese
分词器:

USE SCHEMA best_practice_content_rec; CREATE INVERTED INDEX IF NOT EXISTS idx_inv_description ON TABLE bronze_content_metadata (description) WITH PROPERTIES ('analyzer' = 'chinese');

对存量数据构建索引(

CREATE INDEX
CREATE INDEX
只对新写入数据生效,存量数据须手动 BUILD):

USE SCHEMA best_practice_content_rec; BUILD INDEX idx_inv_description ON bronze_content_metadata;

配置 OSS PIPE(批量导入内容元数据)

内容运营团队定期将新增游戏信息以 CSV 格式上传到 OSS,OSS PIPE 使用 LIST_PURGE 模式自动扫描导入:

-- 先创建 Storage Connection(OSS 访问凭据) CREATE STORAGE CONNECTION IF NOT EXISTS best_practice_content_rec.conn_content_oss TYPE = OSS ACCESS_ID = '<your-access-id>' ACCESS_KEY = '<your-access-key>' ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'; -- 创建 External Volume CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_content_rec.vol_content_metadata TYPE = OSS BUCKET = '<your-bucket>' PATH = 'content-rec/metadata/' CONNECTION = conn_content_oss; -- 创建 OSS PIPE(LIST_PURGE:导入后删除原文件,避免重复导入) CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_content_metadata VIRTUAL_CLUSTER = 'DEFAULT' INGEST_MODE = 'LIST_PURGE' AS COPY INTO best_practice_content_rec.bronze_content_metadata FROM VOLUME vol_content_metadata USING csv OPTIONS('header'='true', 'sep'=',', 'quote'='"');

写入样本数据

以 Steam Dataset 2025 中的五款经典游戏为例:

INSERT INTO best_practice_content_rec.bronze_content_metadata (content_id, title, description, tags, category, release_date, language, developer, price, positive_pct) VALUES ('GAME_730', 'Counter-Strike 2', '多人竞技射击游戏,玩家加入恐怖分子或反恐精英阵营对抗', 'FPS,Shooter,Multiplayer,Competitive,Action', 'Action', CAST('2023-09-27' AS DATE), 'zh', 'Valve', 0.0, 0.78), ('GAME_570', 'Dota 2', '团队对战策略游戏,两队5人争夺胜利', 'MOBA,Strategy,Multiplayer,Free to Play', 'Strategy', CAST('2013-07-09' AS DATE), 'zh', 'Valve', 0.0, 0.84), ('GAME_292030', 'The Witcher 3: Wild Hunt', '开放世界角色扮演游戏,剧情丰富', 'RPG,Open World,Adventure,Story Rich,Fantasy', 'RPG', CAST('2015-05-18' AS DATE), 'zh', 'CD Projekt Red', 39.99, 0.97), ('GAME_1091500','Cyberpunk 2077', '未来世界开放世界RPG,赛博朋克风格', 'RPG,Open World,Cyberpunk,Action,Sci-fi', 'RPG', CAST('2020-12-10' AS DATE), 'zh', 'CD Projekt Red', 59.99, 0.79), ('GAME_271590', 'Grand Theft Auto V', '开放世界动作冒险游戏', 'Action,Open World,Multiplayer,Crime', 'Action', CAST('2015-04-14' AS DATE), 'en', 'Rockstar Games', 29.99, 0.88);

验证中文全文搜索:

SELECT content_id, title, description FROM best_practice_content_rec.bronze_content_metadata WHERE MATCH_ALL(description, '开放世界');

content_id | title | description -------------+--------------------------+---------------------- GAME_292030 | The Witcher 3: Wild Hunt | 开放世界角色扮演游戏,剧情丰富 GAME_1091500 | Cyberpunk 2077 | 未来世界开放世界RPG,赛博朋克风格 GAME_271590 | Grand Theft Auto V | 开放世界动作冒险游戏

3 款 Open World 游戏被正确召回。


Bronze 层:内容 Embedding 表(External Function + Vector 类型)

建表

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_embedding ( content_id STRING, title STRING, description STRING, tags STRING, embedding VECTOR(128) -- 由 External Function 生成的 128 维 Embedding );

External Function 生成 Embedding

在生产环境中,通过部署 External Function 调用向量化模型(如 DashScope text-embedding-v3)自动生成 Embedding:

-- 假设已部署 External Function:text2vec -- 调用示例(在运营脚本或 ZettaPark Task 中执行) INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) SELECT content_id, title, description, tags, CAST(best_practice_content_rec.text2vec(description) AS VECTOR(128)) FROM best_practice_content_rec.bronze_content_metadata;

在测试环境中,如果暂未部署 External Function,可以用 SQL 生成确定性的 128 维测试向量。下面的

vector_seed
vector_seed
只用于构造可执行的示例数据,不代表真实模型 Embedding:

INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) SELECT content_id, title, description, tags, CAST( CONCAT( '[', ARRAY_JOIN( TRANSFORM( SEQUENCE(1, 128), x -> CAST(ROUND(((x * 37 + vector_seed) % 1000) / 1000.0, 4) AS STRING) ), ',' ), ']' ) AS VECTOR(128) ) AS embedding FROM ( SELECT 'GAME_730' AS content_id, 'Counter-Strike 2' AS title, '多人竞技射击游戏,玩家加入恐怖分子或反恐精英阵营对抗' AS description, 'FPS,Shooter,Multiplayer,Competitive,Action' AS tags, 300 AS vector_seed UNION ALL SELECT 'GAME_570', 'Dota 2', '团队对战策略游戏,两队5人争夺胜利', 'MOBA,Strategy,Multiplayer,Free to Play', 420 UNION ALL SELECT 'GAME_292030', 'The Witcher 3: Wild Hunt', '开放世界角色扮演游戏,剧情丰富', 'RPG,Open World,Adventure,Story Rich,Fantasy', 180 UNION ALL SELECT 'GAME_1091500', 'Cyberpunk 2077', '未来世界开放世界RPG,赛博朋克风格', 'RPG,Open World,Cyberpunk,Action,Sci-fi', 190 UNION ALL SELECT 'GAME_271590', 'Grand Theft Auto V', '开放世界动作冒险游戏', 'Action,Open World,Multiplayer,Crime', 260 ) s;

验证写入的向量维度:

SELECT content_id, size(embedding) AS vector_dim FROM best_practice_content_rec.bronze_content_embedding ORDER BY content_id;

content_id | vector_dim -------------+----------- GAME_1091500 | 128 GAME_271590 | 128 GAME_292030 | 128 GAME_570 | 128 GAME_730 | 128

如需手工构造单条测试数据,也可以把完整 128 维数组字符串直接

CAST
CAST
VECTOR(128)
VECTOR(128)
。以下示例可直接执行:

INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) VALUES ('GAME_730', 'Counter-Strike 2', '多人竞技射击游戏,玩家加入恐怖分子或反恐精英阵营对抗', 'FPS,Shooter,Multiplayer,Competitive,Action', CAST('[0.3370,0.3740,0.4110,0.4480,0.4850,0.5220,0.5590,0.5960,0.6330,0.6700,0.7070,0.7440,0.7810,0.8180,0.8550,0.8920,0.9290,0.9660,0.0030,0.0400,0.0770,0.1140,0.1510,0.1880,0.2250,0.2620,0.2990,0.3360,0.3730,0.4100,0.4470,0.4840,0.5210,0.5580,0.5950,0.6320,0.6690,0.7060,0.7430,0.7800,0.8170,0.8540,0.8910,0.9280,0.9650,0.0020,0.0390,0.0760,0.1130,0.1500,0.1870,0.2240,0.2610,0.2980,0.3350,0.3720,0.4090,0.4460,0.4830,0.5200,0.5570,0.5940,0.6310,0.6680,0.7050,0.7420,0.7790,0.8160,0.8530,0.8900,0.9270,0.9640,0.0010,0.0380,0.0750,0.1120,0.1490,0.1860,0.2230,0.2600,0.2970,0.3340,0.3710,0.4080,0.4450,0.4820,0.5190,0.5560,0.5930,0.6300,0.6670,0.7040,0.7410,0.7780,0.8150,0.8520,0.8890,0.9260,0.9630,0.0000,0.0370,0.0740,0.1110,0.1480,0.1850,0.2220,0.2590,0.2960,0.3330,0.3700,0.4070,0.4440,0.4810,0.5180,0.5550,0.5920,0.6290,0.6660,0.7030,0.7400,0.7770,0.8140,0.8510,0.8880,0.9250,0.9620,0.9990,0.0360]' AS VECTOR(128)));

创建 Vector Index(IVFPQ)

IVFPQ(倒排文件 + 乘积量化)是推荐系统中常用的 ANN 索引类型,通过量化和分区大幅降低搜索时间复杂度:

USE SCHEMA best_practice_content_rec; CREATE VECTOR INDEX IF NOT EXISTS idx_vec_content_embedding ON TABLE bronze_content_embedding (embedding) PROPERTIES( 'index_type' = 'IVFPQ', 'distance_function' = 'cosine', 'nlist' = '100', -- 聚类中心数,数据量大时调高 'M' = '32', -- 子量化器数量,影响精度和速度 'm' = '4' -- 压缩字节数 );

对存量数据构建索引:

USE SCHEMA best_practice_content_rec; BUILD INDEX idx_vec_content_embedding ON bronze_content_embedding;

向量相似检索

以 The Witcher 3 的 Embedding 为查询向量,找出最相似的内容:

SELECT e.content_id, e.title, ROUND(COSINE_DISTANCE(e.embedding, q.query_embedding), 6) AS cos_dist FROM best_practice_content_rec.bronze_content_embedding e CROSS JOIN ( SELECT embedding AS query_embedding FROM best_practice_content_rec.bronze_content_embedding WHERE content_id = 'GAME_292030' ) q ORDER BY cos_dist ASC LIMIT 4;

content_id | title | cos_dist -------------+--------------------------+--------------------- GAME_292030 | The Witcher 3: Wild Hunt | 0 GAME_1091500 | Cyberpunk 2077 | 0.045093998312950134 GAME_271590 | Grand Theft Auto V | 0.09233599901199341 GAME_730 | Counter-Strike 2 | 0.16785000264644623

COSINE_DISTANCE
COSINE_DISTANCE
为 0 表示完全相同(查询向量本身)。测试向量使用固定 seed 生成,仅用于验证 VECTOR 写入、索引构建和相似度 SQL 能直接执行;生产环境中应使用真实 Embedding 模型生成向量,语义相似结果才具备业务解释性。


Silver 层 Dynamic Table:去噪清洗与交互序列

Silver 层在 Bronze 行为事件基础上做两件事:

  1. LEFT JOIN
    bronze_content_metadata
    bronze_content_metadata
    ,为每条事件关联内容标题、类别、开发商等维度
  2. 计算归一化互动权重并过滤无效交互(
    is_valid
    is_valid
    )——观看时长不足 60 秒的视为无效浏览噪声

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.silver_user_content_interactions AS SELECT e.event_id, e.user_id, e.content_id, e.event_type, e.session_id, e.duration_sec, e.event_time, e.platform, m.title AS content_title, m.tags AS content_tags, m.category AS content_category, m.developer AS developer, -- 归一化互动权重:信号强度从高到低 CASE WHEN e.event_type = 'share' THEN 3.0 WHEN e.event_type = 'like' THEN 2.0 WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1.0 ELSE 0.0 END AS interaction_weight, -- 有效交互标记(watch 时长 < 60s 视为噪声) CASE WHEN e.event_type IN ('like','share') THEN 1 WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1 ELSE 0 END AS is_valid FROM best_practice_content_rec.bronze_interaction_events e LEFT JOIN best_practice_content_rec.bronze_content_metadata m ON e.content_id = m.content_id;

互动权重设计说明

事件类型权重理由
share
share
3.0强意愿信号,主动传播
like
like
2.0明确的正向反馈
watch
watch
(≥60s)
1.0隐式偏好,时长阈值过滤误触
watch
watch
(<60s)
0.0(无效)可能是误点击或切换,去除噪声

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_content_rec.silver_user_content_interactions; SELECT COUNT(*) AS silver_count FROM best_practice_content_rec.silver_user_content_interactions;

silver_count ------------ 15

查看用户-内容交互序列样例:

SELECT user_id, content_id, content_title, event_type, interaction_weight, is_valid FROM best_practice_content_rec.silver_user_content_interactions ORDER BY user_id, event_time LIMIT 10;

user_id | content_id | content_title | event_type | interaction_weight | is_valid --------+-------------+--------------------------+------------+--------------------+--------- USR001 | GAME_730 | Counter-Strike 2 | watch | 1.0 | 1 USR001 | GAME_570 | Dota 2 | like | 2.0 | 1 USR001 | GAME_1091500| Cyberpunk 2077 | watch | 1.0 | 1 USR002 | GAME_730 | Counter-Strike 2 | share | 3.0 | 1 USR002 | GAME_292030 | The Witcher 3: Wild Hunt | watch | 1.0 | 1 USR002 | GAME_570 | Dota 2 | share | 3.0 | 1 USR003 | GAME_1091500| Cyberpunk 2077 | watch | 1.0 | 1 USR003 | GAME_730 | Counter-Strike 2 | like | 2.0 | 1 USR003 | GAME_292030 | The Witcher 3: Wild Hunt | watch | 1.0 | 1 USR004 | GAME_570 | Dota 2 | watch | 1.0 | 1

所有 15 条样本数据均为有效交互(

is_valid=1
is_valid=1
),因为 INSERT 模拟数据中 watch 时长均 ≥ 60 秒。


Gold 层 Dynamic Table:内容热度指标

Gold 层以

content_id
content_id
+ 日期为粒度,从 Silver 层聚合内容热度指标,用于推荐模型的内容侧特征。

该 DT 按

stat_date
stat_date
分区,声明为静态分区模式:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_content_popularity PARTITIONED BY (stat_date) TBLPROPERTIES ('static_partitions' = 'true') AS SELECT content_id, content_title, content_category, developer, CAST(DATE_TRUNC('day', event_time) AS DATE) AS stat_date, COUNT(*) AS total_interactions, SUM(CASE WHEN event_type = 'watch' AND duration_sec >= 60 THEN 1 ELSE 0 END) AS watch_count, SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS like_count, SUM(CASE WHEN event_type = 'share' THEN 1 ELSE 0 END) AS share_count, SUM(interaction_weight) AS weighted_score, COUNT(DISTINCT user_id) AS unique_users, ROUND(AVG(CASE WHEN event_type = 'watch' THEN duration_sec ELSE NULL END), 2) AS avg_watch_sec FROM best_practice_content_rec.silver_user_content_interactions WHERE is_valid = 1 GROUP BY content_id, content_title, content_category, developer, CAST(DATE_TRUNC('day', event_time) AS DATE);

手动触发首次刷新并查看结果:

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity; SELECT content_id, content_title, content_category, stat_date, total_interactions, watch_count, like_count, share_count, weighted_score, unique_users FROM best_practice_content_rec.gold_content_popularity ORDER BY weighted_score DESC;

content_id | content_title | content_category | stat_date | total | watch | like | share | weighted_score | unique_users -------------+--------------------------+------------------+------------+-------+-------+------+-------+----------------+------------- GAME_730 | Counter-Strike 2 | Action | 2026-05-01 | 4 | 2 | 1 | 1 | 7.0 | 4 GAME_570 | Dota 2 | Strategy | 2026-05-02 | 2 | 0 | 1 | 1 | 5.0 | 2 GAME_1091500 | Cyberpunk 2077 | RPG | 2026-05-01 | 2 | 1 | 0 | 1 | 4.0 | 2 GAME_730 | Counter-Strike 2 | Action | 2026-05-02 | 1 | 0 | 0 | 1 | 3.0 | 1 GAME_570 | Dota 2 | Strategy | 2026-05-01 | 2 | 1 | 1 | 0 | 3.0 | 2 GAME_292030 | The Witcher 3: Wild Hunt | RPG | 2026-05-01 | 2 | 1 | 1 | 0 | 3.0 | 2 GAME_1091500 | Cyberpunk 2077 | RPG | 2026-05-02 | 1 | 1 | 0 | 0 | 1.0 | 1 GAME_292030 | The Witcher 3: Wild Hunt | RPG | 2026-05-02 | 1 | 1 | 0 | 0 | 1.0 | 1

结果解读:Counter-Strike 2(GAME_730)在 2026-05-01 加权得分最高(7.0),因为该天有 4 个不同用户交互(watch×2、like×1、share×1),share 权重高推动了总分。Cyberpunk 2077 在 5月1日的加权分(4.0)高于 Dota 2 同天(3.0),主要来自一次 share(权重 3.0)。


Gold 层 Dynamic Table:用户兴趣向量

用户兴趣向量汇总每个用户在各内容类别上的加权互动分,作为推荐模型的用户侧特征:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_user_interest_profile AS SELECT user_id, CAST(DATE_TRUNC('day', MAX(event_time)) AS DATE) AS profile_date, COUNT(DISTINCT content_id) AS content_count, SUM(interaction_weight) AS total_weight, COLLECT_LIST(content_id) AS interacted_content_ids, -- 按内容类别累加权重,形成类别兴趣向量 SUM(CASE WHEN content_category = 'Action' THEN interaction_weight ELSE 0 END) AS action_score, SUM(CASE WHEN content_category = 'Strategy' THEN interaction_weight ELSE 0 END) AS strategy_score, SUM(CASE WHEN content_category = 'RPG' THEN interaction_weight ELSE 0 END) AS rpg_score FROM best_practice_content_rec.silver_user_content_interactions WHERE is_valid = 1 GROUP BY user_id;

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile; SELECT user_id, content_count, total_weight, action_score, strategy_score, rpg_score, interacted_content_ids FROM best_practice_content_rec.gold_user_interest_profile ORDER BY total_weight DESC;

user_id | content_count | total_weight | action_score | strategy_score | rpg_score | interacted_content_ids --------+---------------+--------------+--------------+----------------+-----------+----------------------- USR002 | 3 | 7.0 | 3.0 | 3.0 | 1.0 | [GAME_730, GAME_292030, GAME_570] USR004 | 3 | 6.0 | 3.0 | 1.0 | 2.0 | [GAME_570, GAME_292030, GAME_730] USR005 | 3 | 6.0 | 1.0 | 2.0 | 3.0 | [GAME_1091500, GAME_730, GAME_570] USR001 | 3 | 4.0 | 1.0 | 2.0 | 1.0 | [GAME_730, GAME_570, GAME_1091500] USR003 | 3 | 4.0 | 2.0 | 0.0 | 2.0 | [GAME_1091500, GAME_730, GAME_292030]

结果解读:USR002 的类别分布均衡(action=3.0、strategy=3.0),表明其兴趣广泛;USR005 的 rpg_score(3.0)明显高于其他类别,是 RPG 偏好用户,推荐系统应优先为其召回 RPG 内容。

interacted_content_ids
interacted_content_ids
字段可直接用于构造 item-to-item 协同过滤的训练样本。


Dynamic Table 刷新调度(Studio Task)

不在 DDL 中写

REFRESH INTERVAL
REFRESH INTERVAL
,通过在 Studio 创建刷新任务来统一管理调度、告警和数据质检。

已创建的 Studio Task(路径:

best_practices/content_rec/
best_practices/content_rec/
):

  1. 在 Studio 的 best_practices/content_rec/ 路径下创建 SQL 类型任务
    refresh_gold_content_popularity
    refresh_gold_content_popularity
  2. 任务内容:

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity; REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile;

  1. 配置调度周期(每日 02:00 执行):

cz-cli task save-cron refresh_gold_content_popularity -p skill_test --cron "0 2 * * *"

  1. 发布任务(上线后开始按计划调度):

cz-cli task deploy refresh_gold_content_popularity -p skill_test

Silver 层 DT 也应创建独立刷新任务(路径相同,任务名

refresh_silver_interactions
refresh_silver_interactions
),或将 Silver 和 Gold 的刷新合并为一个 DAG,按依赖顺序执行:Silver 完成后触发 Gold 刷新。


ZettaPark Python Task:特征工程与样本导出

ZettaPark Task 运行 Python 脚本,直接访问 Gold 层,生成推荐模型训练所需的特征矩阵。

在 Studio 的

best_practices/content_rec/
best_practices/content_rec/
路径下创建 VIRTUAL 类型任务
feature_engineering_export
feature_engineering_export
,脚本示例:

from clickzetta_zettapark.session import Session session = Session.builder.configs({ "instance": "<instance>", "workspace": "<workspace>", "schema": "best_practice_content_rec", "vcluster": "DEFAULT", "username": "<username>", "password": "<password>", }).create() # 读取 Gold 层用户兴趣向量 user_profile_df = session.table("gold_user_interest_profile") # 读取 Gold 层内容热度 content_pop_df = session.table("gold_content_popularity") # 交叉 JOIN 构建用户-内容特征对 feature_df = user_profile_df.join( content_pop_df, how="cross" ).select( "user_id", "content_id", "total_weight", "action_score", "strategy_score", "rpg_score", "weighted_score", "unique_users", ) # 导出为 Parquet 到 Volume feature_df.write.mode("overwrite").parquet( "volume://vol_content_metadata/features/user_content_features.parquet" ) print(f"Feature matrix exported: {feature_df.count()} rows") session.close()


数仓对象总览

全部构建完成后,

best_practice_content_rec
best_practice_content_rec
Schema 下的对象:

SHOW TABLES IN best_practice_content_rec;

schema_name | table_name | is_dynamic -----------------------------+----------------------------------+----------- best_practice_content_rec | bronze_content_embedding | false best_practice_content_rec | bronze_content_metadata | false best_practice_content_rec | bronze_interaction_events | false best_practice_content_rec | gold_content_popularity | true best_practice_content_rec | gold_user_interest_profile | true best_practice_content_rec | silver_user_content_interactions | true

层级关系:

Kafka Topic (user_behavior_events) │ Kafka PIPE (pipe_user_events · BATCH_INTERVAL=30s) ▼ bronze_interaction_events bronze_content_metadata Bloomfilter Index (content_id) Inverted Index (description, chinese) │ │ └────────────┬─────────────────┘ ▼ silver_user_content_interactions (Dynamic Table) interaction_weight · is_valid denoising │ ┌────────────┴──────────────────┐ ▼ ▼ gold_content_popularity (DT) gold_user_interest_profile (DT) PARTITIONED BY (stat_date) category-level interest vectors static_partitions=true │ ▼ ZettaPark Python Task feature_engineering_export → user × content feature matrix → Volume OSS Volume (vol_content_metadata) │ OSS PIPE (pipe_content_metadata · LIST_PURGE) ▼ bronze_content_metadata │ External Function (text2vec) ▼ bronze_content_embedding (VECTOR(128)) Vector Index IVFPQ (idx_vec_content_embedding) → COSINE_DISTANCE ANN recall

Studio Task 调度路径:

best_practices/content_rec/
best_practices/content_rec/

  • refresh_gold_content_popularity
    refresh_gold_content_popularity
    (每日 02:00,刷新两张 Gold DT)
  • feature_engineering_export
    feature_engineering_export
    (依赖 Gold 刷新完成后触发)

注意事项

  • Bloomfilter Index 不支持 BUILD INDEX:Bloomfilter 类型索引创建后只对新写入数据生效,不支持对存量数据执行

    BUILD INDEX
    BUILD INDEX
    (与 Vector / Inverted Index 不同)。若表中已有大量历史数据需要覆盖,需重建表并重新写入。

  • Vector Index 和 Inverted Index 必须显式 BUILD

    CREATE INDEX
    CREATE INDEX
    只处理后续新增数据;对已有存量数据必须手动执行
    BUILD INDEX <index_name> ON <table_name>
    BUILD INDEX <index_name> ON <table_name>
    (在同 Schema 上下文中),否则向量检索和全文搜索的召回结果不包含建索引前的数据。

  • 分区 Dynamic Table 必须声明 static_partitions

    PARTITIONED BY
    PARTITIONED BY
    的 DT 必须设置
    TBLPROPERTIES ('static_partitions' = 'true')
    TBLPROPERTIES ('static_partitions' = 'true')
    。不声明时系统使用动态分区推断,在增量刷新时可能导致旧分区数据被覆盖或丢失。

  • Dynamic Table 不写 REFRESH INTERVAL:通过 Studio Task(路径

    best_practices/content_rec/
    best_practices/content_rec/
    )统一管理刷新调度,可在同一任务节点附加告警规则和数据质检,不要在
    CREATE DYNAMIC TABLE
    CREATE DYNAMIC TABLE
    DDL 里写
    REFRESH INTERVAL
    REFRESH INTERVAL
    参数。

  • COSINE_DISTANCE 越小越相似:与余弦相似度(-1 到 1)不同,

    COSINE_DISTANCE
    COSINE_DISTANCE
    返回值范围为 0 到 2,0 表示完全相同。查询时用
    ORDER BY cos_dist ASC
    ORDER BY cos_dist ASC
    排序,取 TOP-K 作为近似最近邻结果。

  • Kafka PIPE DDL 验证 broker 连接

    CREATE PIPE
    CREATE PIPE
    时系统会尝试连接 Kafka broker 验证 topic 是否存在。在没有 Kafka 环境的开发场景中,先创建目标表,通过 INSERT 模拟数据验证下游 Silver/Gold 逻辑,等 Kafka 环境就绪后再创建 PIPE。

  • OSS PIPE LIST_PURGE 模式不可逆:导入成功后会删除 Volume 中的原始文件。如果业务上需要保留文件(如重跑场景),改用

    LIST
    LIST
    模式并在 Bronze 层增加去重逻辑(例如按
    content_id
    content_id
    +
    load_time
    load_time
    做 DISTINCT)。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询