pandas → ZettaPark 迁移实战:零售数据分析
GitHub:pandas2lakehouse-retail
原始项目
数据来源:UCI Online Retail II Dataset ,英国零售商 2009–2011 年的真实交易记录。
指标 值 原始行数 1,067,371 清洗后行数 805,549 客户数 5,878 时间跨度 2009-12 至 2011-12(25 个月) 主要字段 Invoice、StockCode、Quantity、InvoiceDate、Price、Customer ID、Country
pandas 脚本实现了三个分析场景:
扩展 RFM :购买间隔 P25/P50/P75、最长断购周期、首购到复购天数
周粒度同期群留存 :按首购周分组,追踪后续各周留存率
商品关联分析(Market Basket) :找出同一订单中经常一起购买的商品对,计算 support 和 lift
结论先行
你不需要重新学一套新工具,也不需要推倒重写现有脚本。pandas 代码直接作为迁移蓝图——分析逻辑、指标定义、分层规则全部保留,只替换数据读取方式和少数 API。
场景 pandas 耗时 pandas 峰值内存 ZettaPark 耗时 ZettaPark 本地内存 扩展 RFM 8.2s 203MB 3.2s 0 周粒度同期群 9.4s 236MB 5.9s 0 商品关联分析(self-join) 9.3s 2203MB 3.2s 0
商品关联分析是最能体现差距的场景:self-join 产生的中间结果让 pandas 峰值内存达到 2203MB ,数据量翻倍就会 OOM;ZettaPark 在服务端处理,本地内存始终为 0。
技术栈对比
pandas ZettaPark 运行环境 本地 Python 进程 ClickZetta Lakehouse(云端) 数据读取 pd.read_csv(path)pd.read_csv(path)
FROM VOLUME ... USING CSV OPTIONSFROM VOLUME ... USING CSV OPTIONS
计算引擎 单机内存 分布式 SQL 引擎 内存占用 峰值 203–2203MB(1M 行) 本地 0(数据不落本地) 结果写出 to_csv()to_csv()
/ to_parquet()to_parquet()
write.save_as_table()write.save_as_table()
扩展上限 受本地内存限制(self-join 场景 2GB+,易 OOM) 无上限
项目背景
pandas 是数据科学家的首选工具,但在生产环境中面临两个瓶颈:
内存瓶颈 :1M 行数据峰值内存 200MB+,10M 行时容易 OOM
扩展瓶颈 :groupby().apply(shift())groupby().apply(shift())
等操作在大数据集上极慢
ZettaPark 提供与 pandas 相似的 DataFrame API,同时支持
session.sql()session.sql()
直接写 SQL,让迁移路径非常平滑。
迁移步骤
第一步:替换数据读取
pandas 直接读本地文件;ZettaPark 从 Volume 读取,且不支持
inferSchemainferSchema
,必须用 SQL 指定列。
原始数据有
Customer IDCustomer ID
(含空格),在初始 SQL 里一次性重命名,避免后续所有操作都需要反引号。
# pandas
df = pd.read_csv("online_retail_II.csv", parse_dates=["InvoiceDate"])
df = df[~df["Invoice"].astype(str).str.startswith("C")]
df = df.dropna(subset=["Customer ID"])
df = df[df["Quantity"] > 0]
df["Revenue"] = df["Quantity"] * df["Price"]
# ZettaPark — 读取 + 清洗合并到一条 SQL
df = session.sql("""
SELECT
Invoice, StockCode, Description, Quantity, InvoiceDate, Price,
`Customer ID` AS CustomerID,
Country
FROM VOLUME retail_schema.retail_vol
USING CSV OPTIONS ('header' = 'true', 'nullValue' = '')
FILES ('raw/online_retail_II.csv')
WHERE Invoice NOT LIKE 'C%'
AND `Customer ID` IS NOT NULL
AND Quantity > 0 AND Price > 0
""")
df = df.with_column("Revenue", F.col("Quantity") * F.col("Price"))
第二步:替换窗口函数(扩展 RFM)
pandas 用
shift()shift()
计算相邻订单间隔,需要多次
groupby + mergegroupby + merge
,内存峰值约为数据量的 3–4 倍。ZettaPark 用
LAG()LAG()
窗口函数,在一次 SQL 扫描中完成。
# pandas — 多次 groupby + merge
orders["prev_date"] = orders.groupby("Customer ID")["InvoiceDate"].shift(1)
orders["gap_days"] = (orders["InvoiceDate"] - orders["prev_date"]).dt.days
gap_stats = gaps.groupby("Customer ID")["gap_days"].agg(
gap_p25=lambda x: np.percentile(x, 25),
gap_p50=lambda x: np.percentile(x, 50),
gap_p75=lambda x: np.percentile(x, 75),
)
-- ZettaPark — 单次 SQL,LAG + PERCENTILE
WITH with_gaps AS (
SELECT
customerid,
order_date,
DATEDIFF(DAY,
LAG(order_date) OVER (PARTITION BY customerid ORDER BY order_date),
order_date
) AS gap_days
FROM order_level
),
gap_percentiles AS (
SELECT
customerid,
PERCENTILE(gap_days, 0.25) AS gap_p25,
PERCENTILE(gap_days, 0.50) AS gap_p50,
PERCENTILE(gap_days, 0.75) AS gap_p75,
MAX(gap_days) AS max_gap
FROM with_gaps
WHERE gap_days IS NOT NULL
GROUP BY customerid
)
⚠️ 注意 :
COUNT(DISTINCT ...)COUNT(DISTINCT ...)
和
PERCENTILE(...)PERCENTILE(...)
不能在同一个
GROUP BYGROUP BY
里共存,需要拆成两个 CTE(
customer_statscustomer_stats
和
gap_percentilesgap_percentiles
)。
第三步:替换 self-join(商品关联分析)
商品关联分析的核心是找同一订单里的商品对,需要对
invoice_productsinvoice_products
做自连接。pandas 的
merge(df, df, on="Invoice")merge(df, df, on="Invoice")
会把中间结果全部加载到本地内存——1M 行数据产生约 760 万行中间结果,峰值内存
2203MB 。ZettaPark 在服务端处理,本地内存为 0。
# pandas — self-join 中间结果全在本地内存
invoice_products = df[["Invoice", "StockCode"]].drop_duplicates()
pairs = invoice_products.merge(invoice_products, on="Invoice", suffixes=("_a", "_b"))
pairs = pairs[pairs["StockCode_a"] < pairs["StockCode_b"]]
pair_counts = pairs.groupby(["StockCode_a", "StockCode_b"]).size().reset_index(name="co_count")
# 峰值内存 2203MB,数据量翻倍直接 OOM
-- ZettaPark — self-join 在服务端执行,本地内存 0
WITH invoice_products AS (
SELECT DISTINCT invoice, stockcode FROM retail_clean
),
pairs AS (
SELECT
a.stockcode AS stockcode_a,
b.stockcode AS stockcode_b,
COUNT(*) AS co_count
FROM invoice_products a
JOIN invoice_products b
ON a.invoice = b.invoice
AND a.stockcode < b.stockcode
GROUP BY a.stockcode, b.stockcode
HAVING COUNT(*) >= 10
)
第四步:替换日期操作
pandas ZettaPark SQL dt.to_period("W").dt.start_timedt.to_period("W").dt.start_time
DATE_TRUNC('week', CAST(col AS DATE))DATE_TRUNC('week', CAST(col AS DATE))
(date_a - date_b).dt.days(date_a - date_b).dt.days
DATEDIFF(DAY, date_b, date_a)DATEDIFF(DAY, date_b, date_a)
(week_a - week_b).dt.days // 7(week_a - week_b).dt.days // 7
DATEDIFF(WEEK, week_b, week_a)DATEDIFF(WEEK, week_b, week_a)
⚠️ 注意 :
DATEDIFF(WEEK, ...)DATEDIFF(WEEK, ...)
中
WEEKWEEK
不加引号;
DATE_TRUNC('week', ...)DATE_TRUNC('week', ...)
中
'week''week'
加引号。
完全兼容的部分(无需修改)
分析逻辑本身不需要改动:
RFM 指标定义(Recency = 距今天数,Frequency = 去重订单数,Monetary = 总消费金额)
客户分层规则(Champions / Recent Customers / At Risk / Lost)
Cohort 分析逻辑(首购周、留存率 = 活跃客户数 / Cohort 初始规模)
所有聚合逻辑(SUM、COUNT DISTINCT、AVG、MIN、MAX)
条件逻辑(np.wherenp.where
→ F.when().otherwise()F.when().otherwise()
,语义完全一致)
注意事项
1. inferSchema 不支持
现象 :
session.read.option("inferSchema", "true").csv(...)session.read.option("inferSchema", "true").csv(...)
报错。
原因 :ZettaPark 不支持
inferSchemainferSchema
,必须显式指定列。
解决 :改用
session.sql("SELECT ... FROM VOLUME ... USING CSV OPTIONS ...")session.sql("SELECT ... FROM VOLUME ... USING CSV OPTIONS ...")
,在 SQL 里直接写列名。
2. 含空格的列名
现象 :
Customer IDCustomer ID
在 ZettaPark DataFrame 里变成
customer idcustomer id
(小写+空格),后续
F.col("CustomerID")F.col("CustomerID")
找不到列。
原因 :ZettaPark 将所有列名转为小写,含空格的列名无法直接用标识符引用。
解决 :在初始 SQL 的
SELECTSELECT
里用反引号引用并重命名:
`Customer ID` AS CustomerID
3. COUNT(DISTINCT) 与 PERCENTILE 冲突
现象 :在同一
GROUP BYGROUP BY
里同时写
COUNT(DISTINCT invoice)COUNT(DISTINCT invoice)
和
PERCENTILE(gap_days, 0.5)PERCENTILE(gap_days, 0.5)
报错。
解决 :拆成两个 CTE,分别计算。
4. Excel 原始文件的编码问题
现象 :UCI 数据集原始格式为 Excel(
.xlsx.xlsx
),
DescriptionDescription
字段含
££
等特殊字符,直接转 CSV 后出现无效 UTF-8。
解决 :转换时用
errors='replace'errors='replace'
替换无法编码的字符:
df.to_csv("online_retail_II.csv", index=False, encoding="utf-8", errors="replace")
端到端验证
03_lakehouse/e2e.py03_lakehouse/e2e.py
在完整数据集(106 万行)上执行 5 项自动化检查:
检查项 预期值 结果 清洗后行数 805,549 ✓ 客户数 5,878 ✓ 分层总和 = 客户数 5,878 ✓ Top 国家 United Kingdom ✓ 月份数 25 ✓
实际运行结果:5/5 通过 ,总耗时 12.8s。
迁移结论
API 层面的收益
groupby().apply(shift())groupby().apply(shift())
→ LAG() OVER (PARTITION BY ...)LAG() OVER (PARTITION BY ...)
:代码更简洁,无需多次 merge
rolling(4).sum()rolling(4).sum()
→ ROWS BETWEEN 3 PRECEDING AND CURRENT ROWROWS BETWEEN 3 PRECEDING AND CURRENT ROW
:标准 SQL,无需 set_index
np.percentile()np.percentile()
→ PERCENTILE(col, 0.25)PERCENTILE(col, 0.25)
:直接在 SQL 里完成,无需 apply
多次 groupby + merge → 单次 SQL WITH 链:内存峰值从 200MB+ 降到 0
部署模式的收益
从本地 pandas 脚本迁移到 ClickZetta Lakehouse SaaS,带来的不只是性能提升:
pandas(本地) ZettaPark(SaaS) 数据规模上限 受本地内存限制(~10M 行 OOM) 无上限,PB 级数据同等代码 运行环境 需要本地 Python 环境 云端执行,无需本地资源 协作 脚本文件共享 结果写入共享表,团队直接查询 调度 手动运行或 cron 平台内置任务调度 版本管理 手动维护 平台自动管理
迁移后,数据科学家可以把精力从"让 pandas 跑完"转移到"让分析产生价值"。
参考