RDD → ZettaPark 迁移实战:Web 日志分析
如果你的 Spark 代码还停留在 RDD 时代,迁移到 ZettaPark 的收益比你想象的大得多。不只是换个 API——RDD 的命令式写法(告诉 Spark 怎么做)会被替换成声明式写法(告诉 Lakehouse 要什么),代码量减少,执行效率提升,可读性也更好。
本文用一个真实项目验证这一点:将基于 PySpark RDD 的 Web 日志分析项目迁移到 ClickZetta Lakehouse,覆盖 9 种迁移模式,经过 18 项自动化验证,全部通过。
完整代码见 GitHub:spark2lakehouse-weblog
原始项目
spark2lakehouse-weblog fork 自 XD-DENG/Spark-practice(272 stars),是一个经典的 PySpark RDD 入门教程,使用 RStudio CRAN 镜像的真实下载日志作为数据集,演示
map
map
、
filter
filter
、
reduceByKey
reduceByKey
、
sortBy
sortBy
、
join
join
、
aggregateByKey
aggregateByKey
等核心 RDD 操作。
数据集:2015 年 12 月 12 日的 R 包下载记录,421,969 行,8,659 个唯一包名,237 个国家。
| 字段 | 说明 |
|---|
| date / time | 下载时间 |
| size | 文件大小(字节) |
| r_version / r_arch / r_os | R 运行环境 |
| package / version | R 包名和版本 |
| country / ip_id | 来源国家和匿名 IP |

为什么 RDD 代码值得迁移
RDD 是 Spark 最早的编程模型,在 DataFrame API 出现之前(Spark 1.x 时代)是主流写法。现在仍有大量遗留 RDD 代码在生产环境运行。
RDD 的问题不是功能,而是表达方式。 用 RDD 写"统计每个包的下载次数"需要:
content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b)
这段代码告诉 Spark 怎么做(map 成 key-value 对,然后 reduce),而不是要什么(按包名分组计数)。当逻辑变复杂时,RDD 代码会越来越难读,也越来越难优化。
ZettaPark 的声明式写法:
df.group_by("package").agg(F.count("*").alias("downloads"))
直接表达意图,执行计划由 Lakehouse 优化器决定。
迁移结论先行
你不会丢失任何分析能力,代码反而会更短。RDD 需要告诉引擎"怎么做",ZettaPark 只需说"要什么"——业务逻辑不变,但 6 步二次排序可以变成一行窗口函数,手写 cogroup 迭代器可以变成 HAVING 子句。
| RDD 操作 | ZettaPark 等价 | 代码量变化 |
|---|
sc.textFile
sc.textFile + map(split)
map(split) | session.read.csv()
session.read.csv() | 减少 |
map(lambda x: (x[6], 1)).reduceByKey(+)
map(lambda x: (x[6], 1)).reduceByKey(+) | group_by().agg(F.count("*"))
group_by().agg(F.count("*")) | 相当 |
aggregateByKey((0,0), seq_op, comb_op)
aggregateByKey((0,0), seq_op, comb_op) | group_by().agg(F.avg())
group_by().agg(F.avg()) | 大幅减少 |
rdd1.join(rdd2)
rdd1.join(rdd2) | group_by().agg(count, avg)
group_by().agg(count, avg) | 减少(合并为一次扫描) |
.filter(lambda x: x[3] != "NA")
.filter(lambda x: x[3] != "NA") | .filter(F.col("r_version") != "NA")
.filter(F.col("r_version") != "NA") | 相当 |
.distinct().count()
.distinct().count() | F.count_distinct("package")
F.count_distinct("package") | 减少 |
.sortBy(lambda x: x[1], ascending=False)
.sortBy(lambda x: x[1], ascending=False) | .sort(F.col("downloads").desc())
.sort(F.col("downloads").desc()) | 相当 |
mapPartitions
mapPartitions + accumulator
accumulator | COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...) | 减少(无需分区感知) |
cogroup
cogroup + 手写迭代器过滤 | GROUP BY ... HAVING
GROUP BY ... HAVING | 大幅减少 |
| 6 步二次排序(groupByKey + flatMap) | RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...) | 大幅减少 |
sc.broadcast(dict)
sc.broadcast(dict) + .value.get()
.value.get() | LEFT JOIN
LEFT JOIN 小表 | 减少(优化器自动 broadcast) |
9 个迁移模式
模式 1:数据加载
RDD 需要手动读取文件、跳过 header、split 每一行:
RDD 版本:
raw_content = sc.textFile("2015-12-12.csv")
header = raw_content.first()
content = (
raw_content
.filter(lambda x: x != header)
.map(lambda x: [field.strip('"') for field in x.split(",")])
)
ZettaPark 内置 CSV 解析,header 自动处理:
ZettaPark 版本:
df = session.read.option("header", "true").csv("vol://public.weblog_vol/2015-12-12.csv")
模式 2:计数聚合(reduceByKey)
这是 RDD 代码里最常见的模式——先 map 成
(key, 1)
(key, 1)
对,再 reduceByKey 求和:
RDD 版本:
package_count = (
content
.map(lambda x: (x[6], 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: x[1], ascending=False)
)
top10 = package_count.take(10)
ZettaPark 用
group_by().agg()
group_by().agg()
直接表达:
ZettaPark 版本:
top10 = (
df
.group_by("package")
.agg(F.count("*").alias("downloads"))
.sort(F.col("downloads").desc())
.limit(10)
)
top10.show()
模式 3:平均值聚合(aggregateByKey → F.avg)
这是 RDD 迁移中代码量减少最多的地方。RDD 没有内置的 avg,需要手写累加器:
RDD 版本:
def seq_op(acc, val):
return (acc[0] + int(val), acc[1] + 1)
def comb_op(a, b):
return (a[0] + b[0], a[1] + b[1])
pkg_avg_size = (
content
.filter(lambda x: x[2].isdigit())
.map(lambda x: (x[6], x[2]))
.aggregateByKey((0, 0), seq_op, comb_op)
.map(lambda x: (x[0], x[1][0] // x[1][1]))
.sortBy(lambda x: x[1], ascending=False)
)
ZettaPark 用内置
F.avg()
F.avg()
,一行完成:
ZettaPark 版本:
top10_by_size = (
df
.filter(F.col("size").is_not_null())
.group_by("package")
.agg(F.avg("size").alias("avg_size_bytes"))
.sort(F.col("avg_size_bytes").desc())
.limit(10)
)
模式 4:两个 RDD join → 单次 GROUP BY
原始代码为了演示 RDD join,把计数和求和分成两个 RDD 再 join:
RDD 版本:
pkg_count_rdd = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b)
pkg_size_rdd = (
content
.filter(lambda x: x[2].isdigit())
.map(lambda x: (x[6], int(x[2])))
.reduceByKey(lambda a, b: a + b)
)
pkg_joined = pkg_count_rdd.join(pkg_size_rdd)
pkg_summary = pkg_joined.map(lambda x: (x[0], x[1][0], x[1][1] // x[1][0]))
ZettaPark 一次 GROUP BY 完成,扫描数据一次:
ZettaPark 版本:
pkg_summary = (
df
.filter(F.col("size").is_not_null())
.group_by("package")
.agg(
F.count("*").alias("downloads"),
F.avg("size").alias("avg_size_bytes"),
)
.sort(F.col("downloads").desc())
.limit(5)
)
模式 5:distinct count
RDD 版本:
unique_packages = content.map(lambda x: x[6]).distinct().count()
ZettaPark 版本:
unique_count = df.select(F.count_distinct("package").alias("unique_packages"))
unique_count.show()
模式 6:mapPartitions + accumulator → COUNT(CASE WHEN)
mapPartitions
mapPartitions
是 RDD 的批量处理原语:每次接收整个分区的迭代器,比逐行
map
map
减少函数调用开销,适合批量校验。
accumulator
accumulator
是 driver 端的共享计数器,executor 只能
.add()
.add()
,driver 读
.value
.value
。
注意:accumulator 必须在 action 触发后才保证准确。
RDD 版本:
bad_row_acc = sc.accumulator(0)
def validate_partition(rows):
for row in rows:
size_ok = row[2].isdigit() and int(row[2]) > 0
country_ok = len(row) > 8 and row[8].strip() != ""
if size_ok and country_ok:
yield row
else:
bad_row_acc.add(1)
valid_content = content.mapPartitions(validate_partition)
valid_count = valid_content.count() # action 触发后 accumulator 才准确
bad_count = bad_row_acc.value
ZettaPark 用
COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...)
单次扫描,无需分区感知,无需 accumulator:
ZettaPark 版本:
session.sql("""
SELECT
COUNT(CASE WHEN size > 0
AND country IS NOT NULL
AND country != ''
THEN 1 END) AS valid_rows,
COUNT(*) - COUNT(CASE WHEN size > 0
AND country IS NOT NULL
AND country != ''
THEN 1 END) AS invalid_rows
FROM weblog.cran_downloads
""").show()
| 维度 | RDD | ZettaPark |
|---|
| 分区感知 | 必须(mapPartitions 接收分区迭代器) | 不需要(优化器决定执行计划) |
| 坏行计数 | accumulator(driver/executor 分离) | COUNT(*) - COUNT(CASE WHEN ...) |
| 代码量 | 约 15 行(含 accumulator 声明) | 约 8 行 SQL |
模式 7:cogroup → HAVING 多条件聚合
cogroup
cogroup
是 RDD join 的泛化:即使某侧没有数据也不丢弃 key(类似 full outer join)。两侧迭代器都是 lazy 的,
必须 list()
list()
才能多次遍历。这里的场景是找出"下载量 ≥ 100 且平均大小 ≥ 1MB"的包。
RDD 版本:
pkg_count_for_cg = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b)
pkg_sizes_for_cg = (
content
.filter(lambda x: x[2].isdigit())
.map(lambda x: (x[6], int(x[2])))
.groupByKey()
)
def is_high_traffic_high_volume(item):
pkg, (count_iter, size_iter) = item
counts = list(count_iter) # lazy 迭代器必须 list() 才能多次遍历
sizes = list(size_iter)
if not counts or not sizes:
return False
total_count = counts[0]
all_sizes = [s for sublist in sizes for s in sublist]
if total_count < 100 or not all_sizes:
return False
avg_size = sum(all_sizes) / len(all_sizes)
return avg_size >= 1_048_576
high_vol_pkgs = (
pkg_count_for_cg
.cogroup(pkg_sizes_for_cg)
.filter(is_high_traffic_high_volume)
.map(lambda item: item[0])
.collect()
)
ZettaPark 用
HAVING
HAVING
同时约束 COUNT 和 AVG,优化器合并为一次聚合。cogroup 的"两侧数据结构不同"在 SQL 里自然消失:
ZettaPark 版本:
session.sql("""
SELECT package, COUNT(*) AS downloads, AVG(size) AS avg_size_bytes
FROM weblog.cran_downloads
WHERE size IS NOT NULL
GROUP BY package
HAVING COUNT(*) >= 100
AND AVG(size) >= 1048576
ORDER BY downloads DESC
""").show()
结果:102 个包满足条件(下载量 ≥ 100 且平均大小 ≥ 1MB)。
模式 8:二次排序 → 窗口函数
这是本文最有说服力的迁移案例。场景:找出每个国家下载量 Top 3 的包。
RDD 没有"分组内排序"原语,需要 6 步手工实现:
RDD 版本(6 步):
def top3_with_rank(item):
country, pkg_counts = item
sorted_pkgs = sorted(pkg_counts, key=lambda x: x[1], reverse=True)[:3]
return [(country, pkg, rank, count)
for rank, (pkg, count) in enumerate(sorted_pkgs, start=1)]
top3_per_country = (
content
.map(lambda x: ((x[8], x[6]), 1)) # Step 1: key=(country,pkg)
.reduceByKey(lambda a, b: a + b) # Step 1: 统计下载量
.map(lambda x: (x[0][0], (x[0][1], x[1]))) # Step 2: 重排 key → country
.groupByKey() # Step 2: 聚合同 country 的所有 (pkg,count)
.flatMap(top3_with_rank) # Step 3: 组内 Top 3 + rank
.sortBy(lambda x: (x[0], x[2])) # Step 4: 按 (country, rank) 排序
)
groupByKey
groupByKey
会把同 key 的所有 value 拉到同一 executor 内存。对于超大数据集,可改用
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions
——构造复合 key
(country, -count)
(country, -count)
,按 country 分区,分区内按
-count
-count
升序(即 count 降序),之后仍需手写 rank 逻辑。这正是 RDD 二次排序繁琐的根源:没有
PARTITION BY
PARTITION BY
语义,一切都要手工实现。
ZettaPark 用 2 层 CTE +
RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...)
直接表达:
ZettaPark 版本:
session.sql("""
WITH pkg_country_counts AS (
SELECT country, package, COUNT(*) AS downloads
FROM weblog.cran_downloads
GROUP BY country, package
),
ranked AS (
SELECT country, package, downloads,
RANK() OVER (PARTITION BY country ORDER BY downloads DESC) AS rnk
FROM pkg_country_counts
)
SELECT country, package, downloads, rnk
FROM ranked
WHERE rnk <= 3
ORDER BY country, rnk
""").show(20)
对比:
| 维度 | RDD | ZettaPark |
|---|
| 步骤数 | 6 步:map / reduceByKey / map / groupByKey / flatMap / sortBy | 2 层 CTE + WHERE rnk <= 3 |
| 内存压力 | groupByKey 把同 country 所有数据拉到单 executor | 优化器决定,无需手动控制 |
| 大数据集方案 | repartitionAndSortWithinPartitions(需手写 rank 逻辑) | 同一 SQL,优化器自动处理 |
| 代码量 | 约 15 行(含辅助函数) | 约 10 行 SQL |
结果:1,255 行(含 RANK 并列),US Top 1 为 Rcpp(2,042 次)。
模式 9:broadcast 变量 → JOIN 小表
sc.broadcast()
sc.broadcast()
把小表序列化一次发送到每个 executor 节点缓存,避免每个 task 都序列化传输。适合几 MB 以内的小表。用完后
.unpersist()
.unpersist()
主动释放 executor 内存。
RDD 版本:
import csv as csv_module
region_map_data = {}
with open("sample_data/country_region.csv") as f:
reader = csv_module.DictReader(f)
for row in reader:
region_map_data[row["country"]] = row["region"]
region_bc = sc.broadcast(region_map_data)
region_count = (
content
.map(lambda x: (region_bc.value.get(x[8], "Other"), 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: x[1], ascending=False)
)
for region, cnt in region_count.collect():
print(f" {region}: {cnt:,}")
region_bc.unpersist()
ZettaPark 用
LEFT JOIN
LEFT JOIN
小表,优化器自动识别小表并执行 broadcast join(与 RDD 手动 broadcast 等价)。
COALESCE
COALESCE
处理未匹配的国家代码,等价于
.get(key, "Other")
.get(key, "Other")
:
ZettaPark 版本:
session.sql("""
SELECT
COALESCE(cr.region, 'Other') AS region,
COUNT(*) AS downloads
FROM weblog.cran_downloads d
LEFT JOIN weblog.country_region cr ON d.country = cr.country
GROUP BY COALESCE(cr.region, 'Other')
ORDER BY downloads DESC
""").show()
结果:Americas 164,615 次(Top 1),Asia 153,772 次。
不适合迁移的 RDD 操作
原始项目中有几个 RDD 操作是为了演示 API 特性,不对应实际业务查询,在 ZettaPark 版本中不需要等价实现:
| RDD 操作 | 原因 | ZettaPark 处理方式 |
|---|
raw_content.union(raw_content)
raw_content.union(raw_content) | 演示 union 语义,无业务意义 | 不迁移 |
raw_content.intersection(raw_content)
raw_content.intersection(raw_content) | 演示 intersection,无业务意义 | 不迁移 |
cache()
cache() / persist()
persist() | ZettaPark 执行计划由优化器管理 | 不需要手动缓存 |
cartesian()
cartesian() | 笛卡尔积演示,无对应业务需求 | 不迁移 |
判断标准:如果一个 RDD 操作对应一个真实的业务问题("每个包下载了多少次"),就迁移。如果只是演示 API 特性("union 一个 RDD 和自身"),就不迁移。
端到端验证
迁移完成后,用 18 项自动化检查验证数据正确性:
python e2e.py
验证覆盖:
- 总行数:421,969 行
- Top 包名:Rcpp(4,783 次下载)
- Top 包下载量:4,783
- Top 国家:US
- Top R 版本:3.2.3
- 唯一包数:8,659
- 平均大小 > 0:业务合理性检查
- OS 种类数:20 个非 NA 操作系统
- Join 结果 top 包:与 Q1 一致
- Q8 有效行数:421,969(全量数据无坏行)
- Q8 无效行数:0
- Q9 高流量高体积包数:102 个(下载量 ≥ 100 且平均大小 ≥ 1MB)
- Q10 US Top 1 包名:Rcpp
- Q10 US Top 1 下载量:2,042
- Q10 结果总行数:1,255(含 RANK 并列)
- Q11 Top 地区:Americas
- Q11 Americas 下载量:164,615
- Q11 Asia 下载量:153,772
实际运行结果:18/18 全部通过。
完整兼容性对照
| 操作 | RDD | ZettaPark | 状态 |
|---|
| 数据加载 | sc.textFile
sc.textFile + map(split)
map(split) | session.read.csv()
session.read.csv() | ✅ 更简洁 |
| 计数聚合 | map(k,1).reduceByKey(+)
map(k,1).reduceByKey(+) | group_by().agg(count)
group_by().agg(count) | ✅ 完全等价 |
| 平均值 | aggregateByKey
aggregateByKey + 手写累加 | group_by().agg(F.avg())
group_by().agg(F.avg()) | ✅ 大幅简化 |
| 过滤 | .filter(lambda x: x[3] != "NA")
.filter(lambda x: x[3] != "NA") | .filter(F.col("r_version") != "NA")
.filter(F.col("r_version") != "NA") | ✅ 完全等价 |
| 排序 | .sortBy(lambda x: x[1], False)
.sortBy(lambda x: x[1], False) | ORDER BY ... DESC
ORDER BY ... DESC | ✅ 完全等价 |
| distinct count | .distinct().count()
.distinct().count() | F.count_distinct()
F.count_distinct() | ✅ 更简洁 |
| join | rdd1.join(rdd2)
rdd1.join(rdd2) | group_by().agg(count, avg)
group_by().agg(count, avg) | ✅ 合并为单次扫描 |
| 取前 N 条 | .take(10)
.take(10) | .limit(10).collect()
.limit(10).collect() | ✅ 完全等价 |
| 打印结果 | print(rdd.take(10))
print(rdd.take(10)) | df.show()
df.show() | ✅ 更友好 |
| 批量校验 | mapPartitions
mapPartitions + accumulator
accumulator | COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...) | ✅ 单次扫描,无需分区感知 |
| 多条件聚合过滤 | cogroup
cogroup + 手写迭代器 | GROUP BY ... HAVING
GROUP BY ... HAVING | ✅ 大幅简化 |
| 分组内排序 | 6 步(groupByKey + flatMap + sortBy) | RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...) | ✅ 代码量减少 60% |
| 小表广播 | sc.broadcast(dict)
sc.broadcast(dict) + .value.get()
.value.get() | LEFT JOIN
LEFT JOIN 小表 | ✅ 优化器自动 broadcast |
迁移结论
RDD 迁移到 ZettaPark 的核心变化是从命令式到声明式:不再告诉引擎怎么做,而是告诉它要什么。
代码量减少最多的地方:
完全等价的操作:
filter
filter
、
sortBy
sortBy
、
take
take
、
distinct
distinct
——这些操作的语义完全一致,只是语法更简洁。
进阶模式的迁移收益更大:
mapPartitions
mapPartitions
、
cogroup
cogroup
、二次排序、broadcast 变量这些 RDD 进阶操作,在 SQL 里都有更直接的表达方式,且不需要手动管理分区、迭代器、广播生命周期。
不需要迁移的操作:演示性的集合运算(union/intersection 自身)、手动缓存(cache/persist)——这些在 ZettaPark 中要么不需要,要么由优化器自动处理。
本项目 18/18 验证通过,证明 RDD 的业务逻辑可以完整迁移到 ZettaPark,且代码更简洁、执行效率更高。
参考