pandas → ZettaPark 迁移实战:零售数据分析

GitHub:pandas2lakehouse-retail

原始项目

数据来源:UCI Online Retail II Dataset,英国零售商 2009–2011 年的真实交易记录。

指标
原始行数1,067,371
清洗后行数805,549
客户数5,878
时间跨度2009-12 至 2011-12(25 个月)
主要字段Invoice、StockCode、Quantity、InvoiceDate、Price、Customer ID、Country

pandas 脚本实现了三个分析场景:

  1. 扩展 RFM:购买间隔 P25/P50/P75、最长断购周期、首购到复购天数
  2. 周粒度同期群留存:按首购周分组,追踪后续各周留存率
  3. 商品关联分析(Market Basket):找出同一订单中经常一起购买的商品对,计算 support 和 lift

结论先行

你不需要重新学一套新工具,也不需要推倒重写现有脚本。pandas 代码直接作为迁移蓝图——分析逻辑、指标定义、分层规则全部保留,只替换数据读取方式和少数 API。

场景pandas 耗时pandas 峰值内存ZettaPark 耗时ZettaPark 本地内存
扩展 RFM8.2s203MB3.2s0
周粒度同期群9.4s236MB5.9s0
商品关联分析(self-join)9.3s2203MB3.2s0

商品关联分析是最能体现差距的场景:self-join 产生的中间结果让 pandas 峰值内存达到 2203MB,数据量翻倍就会 OOM;ZettaPark 在服务端处理,本地内存始终为 0。

技术栈对比

pandasZettaPark
运行环境本地 Python 进程ClickZetta Lakehouse(云端)
数据读取
pd.read_csv(path)
pd.read_csv(path)
FROM VOLUME ... USING CSV OPTIONS
FROM VOLUME ... USING CSV OPTIONS
计算引擎单机内存分布式 SQL 引擎
内存占用峰值 203–2203MB(1M 行)本地 0(数据不落本地)
结果写出
to_csv()
to_csv()
/
to_parquet()
to_parquet()
write.save_as_table()
write.save_as_table()
扩展上限受本地内存限制(self-join 场景 2GB+,易 OOM)无上限

项目背景

pandas 是数据科学家的首选工具,但在生产环境中面临两个瓶颈:

  1. 内存瓶颈:1M 行数据峰值内存 200MB+,10M 行时容易 OOM
  2. 扩展瓶颈
    groupby().apply(shift())
    groupby().apply(shift())
    等操作在大数据集上极慢

ZettaPark 提供与 pandas 相似的 DataFrame API,同时支持

session.sql()
session.sql()
直接写 SQL,让迁移路径非常平滑。

迁移步骤

第一步:替换数据读取

pandas 直接读本地文件;ZettaPark 从 Volume 读取,且不支持

inferSchema
inferSchema
,必须用 SQL 指定列。

原始数据有

Customer ID
Customer ID
(含空格),在初始 SQL 里一次性重命名,避免后续所有操作都需要反引号。

# pandas df = pd.read_csv("online_retail_II.csv", parse_dates=["InvoiceDate"]) df = df[~df["Invoice"].astype(str).str.startswith("C")] df = df.dropna(subset=["Customer ID"]) df = df[df["Quantity"] > 0] df["Revenue"] = df["Quantity"] * df["Price"]

# ZettaPark — 读取 + 清洗合并到一条 SQL df = session.sql(""" SELECT Invoice, StockCode, Description, Quantity, InvoiceDate, Price, `Customer ID` AS CustomerID, Country FROM VOLUME retail_schema.retail_vol USING CSV OPTIONS ('header' = 'true', 'nullValue' = '') FILES ('raw/online_retail_II.csv') WHERE Invoice NOT LIKE 'C%' AND `Customer ID` IS NOT NULL AND Quantity > 0 AND Price > 0 """) df = df.with_column("Revenue", F.col("Quantity") * F.col("Price"))

第二步:替换窗口函数(扩展 RFM)

pandas 用

shift()
shift()
计算相邻订单间隔,需要多次
groupby + merge
groupby + merge
,内存峰值约为数据量的 3–4 倍。ZettaPark 用
LAG()
LAG()
窗口函数,在一次 SQL 扫描中完成。

# pandas — 多次 groupby + merge orders["prev_date"] = orders.groupby("Customer ID")["InvoiceDate"].shift(1) orders["gap_days"] = (orders["InvoiceDate"] - orders["prev_date"]).dt.days gap_stats = gaps.groupby("Customer ID")["gap_days"].agg( gap_p25=lambda x: np.percentile(x, 25), gap_p50=lambda x: np.percentile(x, 50), gap_p75=lambda x: np.percentile(x, 75), )

-- ZettaPark — 单次 SQL,LAG + PERCENTILE WITH with_gaps AS ( SELECT customerid, order_date, DATEDIFF(DAY, LAG(order_date) OVER (PARTITION BY customerid ORDER BY order_date), order_date ) AS gap_days FROM order_level ), gap_percentiles AS ( SELECT customerid, PERCENTILE(gap_days, 0.25) AS gap_p25, PERCENTILE(gap_days, 0.50) AS gap_p50, PERCENTILE(gap_days, 0.75) AS gap_p75, MAX(gap_days) AS max_gap FROM with_gaps WHERE gap_days IS NOT NULL GROUP BY customerid )

第三步:替换 self-join(商品关联分析)

商品关联分析的核心是找同一订单里的商品对,需要对

invoice_products
invoice_products
做自连接。pandas 的
merge(df, df, on="Invoice")
merge(df, df, on="Invoice")
会把中间结果全部加载到本地内存——1M 行数据产生约 760 万行中间结果,峰值内存 2203MB。ZettaPark 在服务端处理,本地内存为 0。

# pandas — self-join 中间结果全在本地内存 invoice_products = df[["Invoice", "StockCode"]].drop_duplicates() pairs = invoice_products.merge(invoice_products, on="Invoice", suffixes=("_a", "_b")) pairs = pairs[pairs["StockCode_a"] < pairs["StockCode_b"]] pair_counts = pairs.groupby(["StockCode_a", "StockCode_b"]).size().reset_index(name="co_count") # 峰值内存 2203MB,数据量翻倍直接 OOM

-- ZettaPark — self-join 在服务端执行,本地内存 0 WITH invoice_products AS ( SELECT DISTINCT invoice, stockcode FROM retail_clean ), pairs AS ( SELECT a.stockcode AS stockcode_a, b.stockcode AS stockcode_b, COUNT(*) AS co_count FROM invoice_products a JOIN invoice_products b ON a.invoice = b.invoice AND a.stockcode < b.stockcode GROUP BY a.stockcode, b.stockcode HAVING COUNT(*) >= 10 )

第四步:替换日期操作

pandasZettaPark SQL
dt.to_period("W").dt.start_time
dt.to_period("W").dt.start_time
DATE_TRUNC('week', CAST(col AS DATE))
DATE_TRUNC('week', CAST(col AS DATE))
(date_a - date_b).dt.days
(date_a - date_b).dt.days
DATEDIFF(DAY, date_b, date_a)
DATEDIFF(DAY, date_b, date_a)
(week_a - week_b).dt.days // 7
(week_a - week_b).dt.days // 7
DATEDIFF(WEEK, week_b, week_a)
DATEDIFF(WEEK, week_b, week_a)

完全兼容的部分(无需修改)

分析逻辑本身不需要改动:

  • RFM 指标定义(Recency = 距今天数,Frequency = 去重订单数,Monetary = 总消费金额)
  • 客户分层规则(Champions / Recent Customers / At Risk / Lost)
  • Cohort 分析逻辑(首购周、留存率 = 活跃客户数 / Cohort 初始规模)
  • 所有聚合逻辑(SUM、COUNT DISTINCT、AVG、MIN、MAX)
  • 条件逻辑(
    np.where
    np.where
    F.when().otherwise()
    F.when().otherwise()
    ,语义完全一致)

注意事项

1. inferSchema 不支持

现象

session.read.option("inferSchema", "true").csv(...)
session.read.option("inferSchema", "true").csv(...)
报错。

原因:ZettaPark 不支持

inferSchema
inferSchema
,必须显式指定列。

解决:改用

session.sql("SELECT ... FROM VOLUME ... USING CSV OPTIONS ...")
session.sql("SELECT ... FROM VOLUME ... USING CSV OPTIONS ...")
,在 SQL 里直接写列名。

2. 含空格的列名

现象

Customer ID
Customer ID
在 ZettaPark DataFrame 里变成
customer id
customer id
(小写+空格),后续
F.col("CustomerID")
F.col("CustomerID")
找不到列。

原因:ZettaPark 将所有列名转为小写,含空格的列名无法直接用标识符引用。

解决:在初始 SQL 的

SELECT
SELECT
里用反引号引用并重命名:

`Customer ID` AS CustomerID

3. COUNT(DISTINCT) 与 PERCENTILE 冲突

现象:在同一

GROUP BY
GROUP BY
里同时写
COUNT(DISTINCT invoice)
COUNT(DISTINCT invoice)
PERCENTILE(gap_days, 0.5)
PERCENTILE(gap_days, 0.5)
报错。

解决:拆成两个 CTE,分别计算。

4. Excel 原始文件的编码问题

现象:UCI 数据集原始格式为 Excel(

.xlsx
.xlsx
),
Description
Description
字段含
£
£
等特殊字符,直接转 CSV 后出现无效 UTF-8。

解决:转换时用

errors='replace'
errors='replace'
替换无法编码的字符:

df.to_csv("online_retail_II.csv", index=False, encoding="utf-8", errors="replace")

端到端验证

03_lakehouse/e2e.py
03_lakehouse/e2e.py
在完整数据集(106 万行)上执行 5 项自动化检查:

检查项预期值结果
清洗后行数805,549
客户数5,878
分层总和 = 客户数5,878
Top 国家United Kingdom
月份数25

实际运行结果:5/5 通过,总耗时 12.8s。

迁移结论

API 层面的收益

  • groupby().apply(shift())
    groupby().apply(shift())
    LAG() OVER (PARTITION BY ...)
    LAG() OVER (PARTITION BY ...)
    :代码更简洁,无需多次 merge
  • rolling(4).sum()
    rolling(4).sum()
    ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
    ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
    :标准 SQL,无需 set_index
  • np.percentile()
    np.percentile()
    PERCENTILE(col, 0.25)
    PERCENTILE(col, 0.25)
    :直接在 SQL 里完成,无需 apply
  • 多次 groupby + merge → 单次 SQL WITH 链:内存峰值从 200MB+ 降到 0

部署模式的收益

从本地 pandas 脚本迁移到 ClickZetta Lakehouse SaaS,带来的不只是性能提升:

pandas(本地)ZettaPark(SaaS)
数据规模上限受本地内存限制(~10M 行 OOM)无上限,PB 级数据同等代码
运行环境需要本地 Python 环境云端执行,无需本地资源
协作脚本文件共享结果写入共享表,团队直接查询
调度手动运行或 cron平台内置任务调度
版本管理手动维护平台自动管理

迁移后,数据科学家可以把精力从"让 pandas 跑完"转移到"让分析产生价值"。

参考

联系我们
预约咨询
微信咨询
电话咨询