Zettapark 特征工程
特征工程是机器学习的核心环节。Zettapark 将 DataFrame 操作翻译为 SQL 在 Lakehouse 中分布式执行,可以高效处理大规模用户行为数据,构建结构化特征表供模型训练使用。
前置准备
from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F from clickzetta.zettapark.window import Window session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()
建测试数据(用户行为事件表):
session.sql(""" CREATE TABLE IF NOT EXISTS user_events ( user_id INT, event_type STRING, amount DECIMAL(10, 2), category STRING, event_date STRING, hour INT ) """).collect() session.sql(""" INSERT INTO user_events VALUES (101, 'purchase', 299.0, 'electronics', '2024-01-10', 14), (101, 'purchase', 99.0, 'clothing', '2024-01-12', 10), (101, 'browse', 0.0, 'electronics', '2024-01-13', 20), (101, 'purchase', 599.0, 'electronics', '2024-01-15', 15), (102, 'purchase', 49.0, 'food', '2024-01-10', 12), (102, 'browse', 0.0, 'clothing', '2024-01-11', 18), (102, 'purchase', 199.0, 'clothing', '2024-01-14', 11), (103, 'purchase', 999.0, 'electronics', '2024-01-08', 16), (103, 'purchase',1299.0, 'electronics', '2024-01-15', 14), (103, 'browse', 0.0, 'food', '2024-01-16', 9) """).collect() session.sql(""" CREATE TABLE IF NOT EXISTS users ( user_id INT, age INT, city STRING, register_date STRING ) """).collect() session.sql(""" INSERT INTO users VALUES (101, 28, 'Beijing', '2023-06-01'), (102, 35, 'Shanghai', '2023-03-15'), (103, 22, 'Guangzhou', '2023-09-20') """).collect()
特征1:统计类特征
对用户购买行为做聚合,提取消费频次、金额分布等统计特征:
events = session.table("user_events") stat_features = events.filter(F.col("event_type") == "purchase") \ .group_by("user_id") \ .agg( F.count(F.col("user_id")).alias("purchase_count"), F.sum(F.col("amount")).alias("total_spend"), F.avg(F.col("amount")).alias("avg_order_value"), F.max(F.col("amount")).alias("max_order_value"), F.min(F.col("amount")).alias("min_order_value"), F.count_distinct(F.col("category")).alias("category_diversity"), ) stat_features.show()
+-------+--------------+-----------+---------------+------------------+ |user_id|purchase_count|total_spend|avg_order_value|category_diversity| +-------+--------------+-----------+---------------+------------------+ | 101| 3| 997.00| 332.333333| 2| | 102| 2| 248.00| 124.000000| 2| | 103| 2| 2298.00| 1149.000000| 1| +-------+--------------+-----------+---------------+------------------+
特征2:时间类特征
提取最近活跃时间、活跃天数、活跃时段等时间维度特征:
time_features = events.group_by("user_id") \ .agg( F.max(F.to_date(F.col("event_date"))).alias("last_active_date"), F.min(F.to_date(F.col("event_date"))).alias("first_active_date"), F.datediff( F.to_date(F.lit("2024-01-16")), # 统计基准日期 F.max(F.to_date(F.col("event_date"))) ).alias("days_since_last_active"), F.avg(F.col("hour")).alias("avg_active_hour"), ) time_features.show()
+-------+----------------+-----------------+----------------------+-----------------+ |user_id|last_active_date|first_active_date|days_since_last_active| avg_active_hour| +-------+----------------+-----------------+----------------------+-----------------+ | 101| 2024-01-15| 2024-01-10| 1| 14.75| | 102| 2024-01-14| 2024-01-10| 2|13.67 | | 103| 2024-01-16| 2024-01-08| 0| 13| +-------+----------------+-----------------+----------------------+-----------------+
特征3:窗口行为特征
使用窗口函数提取最近一次购买信息和累计消费:
w_time = Window.partition_by("user_id").order_by(F.col("event_date").desc()) w_cumulative = Window.partition_by("user_id").order_by("event_date") behavior_features = events.filter(F.col("event_type") == "purchase") \ .with_column("purchase_rank", F.rank().over(w_time)) \ .with_column("cumulative_spend", F.sum("amount").over(w_cumulative)) \ .filter(F.col("purchase_rank") == 1) \ .select("user_id", "amount", "category", "cumulative_spend") behavior_features.show()
+-------+-------+-----------+----------------+ |user_id| amount| category|cumulative_spend| +-------+-------+-----------+----------------+ | 101| 599.00|electronics| 997.00| | 102| 199.00| clothing| 248.00| | 103|1299.00|electronics| 2298.00| +-------+-------+-----------+----------------+
特征4:类别特征编码
将类别型特征(商品品类)转为数值型(各品类消费金额):
category_features = events.filter(F.col("event_type") == "purchase") \ .group_by("user_id") \ .agg( F.sum(F.iff(F.col("category") == "electronics", F.col("amount"), F.lit(0))).alias("electronics_spend"), F.sum(F.iff(F.col("category") == "clothing", F.col("amount"), F.lit(0))).alias("clothing_spend"), F.sum(F.iff(F.col("category") == "food", F.col("amount"), F.lit(0))).alias("food_spend"), ) category_features.show()
+-------+-----------------+--------------+----------+ |user_id|electronics_spend|clothing_spend|food_spend| +-------+-----------------+--------------+----------+ | 101| 898.00| 99.00| 0.00| | 102| 0.00| 199.00| 49.00| | 103| 2298.00| 0.00| 0.00| +-------+-----------------+--------------+----------+
特征5:合并特征表
将各维度特征 join 成一张宽表:
users = session.table("users") all_features = stat_features \ .join(time_features, "user_id") \ .join(category_features, "user_id") \ .join(users.select("user_id", "age", "city"), "user_id") all_features.show()
特征6:归一化(Min-Max Scaling)
将数值特征缩放到 [0, 1] 区间:
先计算全局最大最小值:
stats = all_features.select( F.min("total_spend").alias("min_spend"), F.max("total_spend").alias("max_spend"), ).collect()[0] min_val = float(stats["min_spend"]) max_val = float(stats["max_spend"])
应用归一化:
normalized = all_features.with_column( "total_spend_normalized", F.round( (F.col("total_spend") - F.lit(min_val)) / F.lit(max_val - min_val), 4 ) ) normalized.select("user_id", "total_spend", "total_spend_normalized").show()
+-------+-----------+----------------------+ |user_id|total_spend|total_spend_normalized| +-------+-----------+----------------------+ | 101| 997.00| 0.3654| | 102| 248.00| 0.0000| | 103| 2298.00| 1.0000| +-------+-----------+----------------------+
写入特征存储
特征工程完成后,将结果写入特征表供训练使用:
all_features.write.save_as_table("user_features", mode="overwrite") print(f"特征表行数: {session.table('user_features').count()}")
也可以创建 Dynamic Table,让特征随源数据自动刷新:
all_features.create_or_replace_dynamic_table( "user_features_auto", lag="1 hour", warehouse="default" )
导出特征供模型训练
特征表写入后,可转为 pandas DataFrame 直接用于模型训练:
从 Lakehouse 读取特征:
import pandas as pd from sklearn.ensemble import GradientBoostingClassifier df = session.table("user_features").to_pandas()
选择数值特征列:
feature_cols = [ "purchase_count", "total_spend", "avg_order_value", "days_since_last_active", "electronics_spend", "clothing_spend", "food_spend", "age" ] X = df[feature_cols].fillna(0)
目标变量
ydf["label"]相关文档
| 文档 | 说明 |
|---|---|
| Zettapark DataFrame API 指南 | 基础 DataFrame 操作 |
| Zettapark 常用函数参考 | 窗口函数、聚合函数详细说明 |
| Zettapark 创建 Dynamic Table | 特征自动刷新 |
| 使用 Zettapark 和 Python 机器学习库进行信用评分 | 完整 ML 场景示例 |
