RDD → ZettaPark 迁移实战:Web 日志分析

如果你的 Spark 代码还停留在 RDD 时代,迁移到 ZettaPark 的收益比你想象的大得多。不只是换个 API——RDD 的命令式写法(告诉 Spark 怎么做)会被替换成声明式写法(告诉 Lakehouse 要什么),代码量减少,执行效率提升,可读性也更好。

本文用一个真实项目验证这一点:将基于 PySpark RDD 的 Web 日志分析项目迁移到 ClickZetta Lakehouse,覆盖 9 种迁移模式,经过 18 项自动化验证,全部通过。

完整代码见 GitHub:spark2lakehouse-weblog


原始项目

spark2lakehouse-weblog fork 自 XD-DENG/Spark-practice(272 stars),是一个经典的 PySpark RDD 入门教程,使用 RStudio CRAN 镜像的真实下载日志作为数据集,演示

map
map
filter
filter
reduceByKey
reduceByKey
sortBy
sortBy
join
join
aggregateByKey
aggregateByKey
等核心 RDD 操作。

数据集:2015 年 12 月 12 日的 R 包下载记录,421,969 行,8,659 个唯一包名,237 个国家。

字段说明
date / time下载时间
size文件大小(字节)
r_version / r_arch / r_osR 运行环境
package / versionR 包名和版本
country / ip_id来源国家和匿名 IP


为什么 RDD 代码值得迁移

RDD 是 Spark 最早的编程模型,在 DataFrame API 出现之前(Spark 1.x 时代)是主流写法。现在仍有大量遗留 RDD 代码在生产环境运行。

RDD 的问题不是功能,而是表达方式。 用 RDD 写"统计每个包的下载次数"需要:

content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b)

这段代码告诉 Spark 怎么做(map 成 key-value 对,然后 reduce),而不是要什么(按包名分组计数)。当逻辑变复杂时,RDD 代码会越来越难读,也越来越难优化。

ZettaPark 的声明式写法:

df.group_by("package").agg(F.count("*").alias("downloads"))

直接表达意图,执行计划由 Lakehouse 优化器决定。


迁移结论先行

你不会丢失任何分析能力,代码反而会更短。RDD 需要告诉引擎"怎么做",ZettaPark 只需说"要什么"——业务逻辑不变,但 6 步二次排序可以变成一行窗口函数,手写 cogroup 迭代器可以变成 HAVING 子句。

RDD 操作ZettaPark 等价代码量变化
sc.textFile
sc.textFile
+
map(split)
map(split)
session.read.csv()
session.read.csv()
减少
map(lambda x: (x[6], 1)).reduceByKey(+)
map(lambda x: (x[6], 1)).reduceByKey(+)
group_by().agg(F.count("*"))
group_by().agg(F.count("*"))
相当
aggregateByKey((0,0), seq_op, comb_op)
aggregateByKey((0,0), seq_op, comb_op)
group_by().agg(F.avg())
group_by().agg(F.avg())
大幅减少
rdd1.join(rdd2)
rdd1.join(rdd2)
group_by().agg(count, avg)
group_by().agg(count, avg)
减少(合并为一次扫描)
.filter(lambda x: x[3] != "NA")
.filter(lambda x: x[3] != "NA")
.filter(F.col("r_version") != "NA")
.filter(F.col("r_version") != "NA")
相当
.distinct().count()
.distinct().count()
F.count_distinct("package")
F.count_distinct("package")
减少
.sortBy(lambda x: x[1], ascending=False)
.sortBy(lambda x: x[1], ascending=False)
.sort(F.col("downloads").desc())
.sort(F.col("downloads").desc())
相当
mapPartitions
mapPartitions
+
accumulator
accumulator
COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...)
减少(无需分区感知)
cogroup
cogroup
+ 手写迭代器过滤
GROUP BY ... HAVING
GROUP BY ... HAVING
大幅减少
6 步二次排序(groupByKey + flatMap)
RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...)
大幅减少
sc.broadcast(dict)
sc.broadcast(dict)
+
.value.get()
.value.get()
LEFT JOIN
LEFT JOIN
小表
减少(优化器自动 broadcast)

9 个迁移模式

模式 1:数据加载

RDD 需要手动读取文件、跳过 header、split 每一行:

RDD 版本:

raw_content = sc.textFile("2015-12-12.csv") header = raw_content.first() content = ( raw_content .filter(lambda x: x != header) .map(lambda x: [field.strip('"') for field in x.split(",")]) )

ZettaPark 内置 CSV 解析,header 自动处理:

ZettaPark 版本:

df = session.read.option("header", "true").csv("vol://public.weblog_vol/2015-12-12.csv")

模式 2:计数聚合(reduceByKey)

这是 RDD 代码里最常见的模式——先 map 成

(key, 1)
(key, 1)
对,再 reduceByKey 求和:

RDD 版本:

package_count = ( content .map(lambda x: (x[6], 1)) .reduceByKey(lambda a, b: a + b) .sortBy(lambda x: x[1], ascending=False) ) top10 = package_count.take(10)

ZettaPark 用

group_by().agg()
group_by().agg()
直接表达:

ZettaPark 版本:

top10 = ( df .group_by("package") .agg(F.count("*").alias("downloads")) .sort(F.col("downloads").desc()) .limit(10) ) top10.show()

模式 3:平均值聚合(aggregateByKey → F.avg)

这是 RDD 迁移中代码量减少最多的地方。RDD 没有内置的 avg,需要手写累加器:

RDD 版本:

def seq_op(acc, val): return (acc[0] + int(val), acc[1] + 1) def comb_op(a, b): return (a[0] + b[0], a[1] + b[1]) pkg_avg_size = ( content .filter(lambda x: x[2].isdigit()) .map(lambda x: (x[6], x[2])) .aggregateByKey((0, 0), seq_op, comb_op) .map(lambda x: (x[0], x[1][0] // x[1][1])) .sortBy(lambda x: x[1], ascending=False) )

ZettaPark 用内置

F.avg()
F.avg()
,一行完成:

ZettaPark 版本:

top10_by_size = ( df .filter(F.col("size").is_not_null()) .group_by("package") .agg(F.avg("size").alias("avg_size_bytes")) .sort(F.col("avg_size_bytes").desc()) .limit(10) )

模式 4:两个 RDD join → 单次 GROUP BY

原始代码为了演示 RDD join,把计数和求和分成两个 RDD 再 join:

RDD 版本:

pkg_count_rdd = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b) pkg_size_rdd = ( content .filter(lambda x: x[2].isdigit()) .map(lambda x: (x[6], int(x[2]))) .reduceByKey(lambda a, b: a + b) ) pkg_joined = pkg_count_rdd.join(pkg_size_rdd) pkg_summary = pkg_joined.map(lambda x: (x[0], x[1][0], x[1][1] // x[1][0]))

ZettaPark 一次 GROUP BY 完成,扫描数据一次:

ZettaPark 版本:

pkg_summary = ( df .filter(F.col("size").is_not_null()) .group_by("package") .agg( F.count("*").alias("downloads"), F.avg("size").alias("avg_size_bytes"), ) .sort(F.col("downloads").desc()) .limit(5) )

模式 5:distinct count

RDD 版本:

unique_packages = content.map(lambda x: x[6]).distinct().count()

ZettaPark 版本:

unique_count = df.select(F.count_distinct("package").alias("unique_packages")) unique_count.show()

模式 6:mapPartitions + accumulator → COUNT(CASE WHEN)

mapPartitions
mapPartitions
是 RDD 的批量处理原语:每次接收整个分区的迭代器,比逐行
map
map
减少函数调用开销,适合批量校验。
accumulator
accumulator
是 driver 端的共享计数器,executor 只能
.add()
.add()
,driver 读
.value
.value
注意:accumulator 必须在 action 触发后才保证准确。

RDD 版本:

bad_row_acc = sc.accumulator(0) def validate_partition(rows): for row in rows: size_ok = row[2].isdigit() and int(row[2]) > 0 country_ok = len(row) > 8 and row[8].strip() != "" if size_ok and country_ok: yield row else: bad_row_acc.add(1) valid_content = content.mapPartitions(validate_partition) valid_count = valid_content.count() # action 触发后 accumulator 才准确 bad_count = bad_row_acc.value

ZettaPark 用

COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...)
单次扫描,无需分区感知,无需 accumulator:

ZettaPark 版本:

session.sql(""" SELECT COUNT(CASE WHEN size > 0 AND country IS NOT NULL AND country != '' THEN 1 END) AS valid_rows, COUNT(*) - COUNT(CASE WHEN size > 0 AND country IS NOT NULL AND country != '' THEN 1 END) AS invalid_rows FROM weblog.cran_downloads """).show()

维度RDDZettaPark
分区感知必须(mapPartitions 接收分区迭代器)不需要(优化器决定执行计划)
坏行计数accumulator(driver/executor 分离)COUNT(*) - COUNT(CASE WHEN ...)
代码量约 15 行(含 accumulator 声明)约 8 行 SQL

模式 7:cogroup → HAVING 多条件聚合

cogroup
cogroup
是 RDD join 的泛化:即使某侧没有数据也不丢弃 key(类似 full outer join)。两侧迭代器都是 lazy 的,必须
list()
list()
才能多次遍历
。这里的场景是找出"下载量 ≥ 100 且平均大小 ≥ 1MB"的包。

RDD 版本:

pkg_count_for_cg = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a, b: a + b) pkg_sizes_for_cg = ( content .filter(lambda x: x[2].isdigit()) .map(lambda x: (x[6], int(x[2]))) .groupByKey() ) def is_high_traffic_high_volume(item): pkg, (count_iter, size_iter) = item counts = list(count_iter) # lazy 迭代器必须 list() 才能多次遍历 sizes = list(size_iter) if not counts or not sizes: return False total_count = counts[0] all_sizes = [s for sublist in sizes for s in sublist] if total_count < 100 or not all_sizes: return False avg_size = sum(all_sizes) / len(all_sizes) return avg_size >= 1_048_576 high_vol_pkgs = ( pkg_count_for_cg .cogroup(pkg_sizes_for_cg) .filter(is_high_traffic_high_volume) .map(lambda item: item[0]) .collect() )

ZettaPark 用

HAVING
HAVING
同时约束 COUNT 和 AVG,优化器合并为一次聚合。cogroup 的"两侧数据结构不同"在 SQL 里自然消失:

ZettaPark 版本:

session.sql(""" SELECT package, COUNT(*) AS downloads, AVG(size) AS avg_size_bytes FROM weblog.cran_downloads WHERE size IS NOT NULL GROUP BY package HAVING COUNT(*) >= 100 AND AVG(size) >= 1048576 ORDER BY downloads DESC """).show()

结果:102 个包满足条件(下载量 ≥ 100 且平均大小 ≥ 1MB)。

模式 8:二次排序 → 窗口函数

这是本文最有说服力的迁移案例。场景:找出每个国家下载量 Top 3 的包。

RDD 没有"分组内排序"原语,需要 6 步手工实现:

RDD 版本(6 步):

def top3_with_rank(item): country, pkg_counts = item sorted_pkgs = sorted(pkg_counts, key=lambda x: x[1], reverse=True)[:3] return [(country, pkg, rank, count) for rank, (pkg, count) in enumerate(sorted_pkgs, start=1)] top3_per_country = ( content .map(lambda x: ((x[8], x[6]), 1)) # Step 1: key=(country,pkg) .reduceByKey(lambda a, b: a + b) # Step 1: 统计下载量 .map(lambda x: (x[0][0], (x[0][1], x[1]))) # Step 2: 重排 key → country .groupByKey() # Step 2: 聚合同 country 的所有 (pkg,count) .flatMap(top3_with_rank) # Step 3: 组内 Top 3 + rank .sortBy(lambda x: (x[0], x[2])) # Step 4: 按 (country, rank) 排序 )

groupByKey
groupByKey
会把同 key 的所有 value 拉到同一 executor 内存。对于超大数据集,可改用
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions
——构造复合 key
(country, -count)
(country, -count)
,按 country 分区,分区内按
-count
-count
升序(即 count 降序),之后仍需手写 rank 逻辑。这正是 RDD 二次排序繁琐的根源:没有
PARTITION BY
PARTITION BY
语义,一切都要手工实现。

ZettaPark 用 2 层 CTE +

RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...)
直接表达:

ZettaPark 版本:

session.sql(""" WITH pkg_country_counts AS ( SELECT country, package, COUNT(*) AS downloads FROM weblog.cran_downloads GROUP BY country, package ), ranked AS ( SELECT country, package, downloads, RANK() OVER (PARTITION BY country ORDER BY downloads DESC) AS rnk FROM pkg_country_counts ) SELECT country, package, downloads, rnk FROM ranked WHERE rnk <= 3 ORDER BY country, rnk """).show(20)

对比:

维度RDDZettaPark
步骤数6 步:map / reduceByKey / map / groupByKey / flatMap / sortBy2 层 CTE + WHERE rnk <= 3
内存压力groupByKey 把同 country 所有数据拉到单 executor优化器决定,无需手动控制
大数据集方案repartitionAndSortWithinPartitions(需手写 rank 逻辑)同一 SQL,优化器自动处理
代码量约 15 行(含辅助函数)约 10 行 SQL

结果:1,255 行(含 RANK 并列),US Top 1 为 Rcpp(2,042 次)。

模式 9:broadcast 变量 → JOIN 小表

sc.broadcast()
sc.broadcast()
把小表序列化一次发送到每个 executor 节点缓存,避免每个 task 都序列化传输。适合几 MB 以内的小表。用完后
.unpersist()
.unpersist()
主动释放 executor 内存。

RDD 版本:

import csv as csv_module region_map_data = {} with open("sample_data/country_region.csv") as f: reader = csv_module.DictReader(f) for row in reader: region_map_data[row["country"]] = row["region"] region_bc = sc.broadcast(region_map_data) region_count = ( content .map(lambda x: (region_bc.value.get(x[8], "Other"), 1)) .reduceByKey(lambda a, b: a + b) .sortBy(lambda x: x[1], ascending=False) ) for region, cnt in region_count.collect(): print(f" {region}: {cnt:,}") region_bc.unpersist()

ZettaPark 用

LEFT JOIN
LEFT JOIN
小表,优化器自动识别小表并执行 broadcast join(与 RDD 手动 broadcast 等价)。
COALESCE
COALESCE
处理未匹配的国家代码,等价于
.get(key, "Other")
.get(key, "Other")

ZettaPark 版本:

session.sql(""" SELECT COALESCE(cr.region, 'Other') AS region, COUNT(*) AS downloads FROM weblog.cran_downloads d LEFT JOIN weblog.country_region cr ON d.country = cr.country GROUP BY COALESCE(cr.region, 'Other') ORDER BY downloads DESC """).show()

结果:Americas 164,615 次(Top 1),Asia 153,772 次。


不适合迁移的 RDD 操作

原始项目中有几个 RDD 操作是为了演示 API 特性,不对应实际业务查询,在 ZettaPark 版本中不需要等价实现:

RDD 操作原因ZettaPark 处理方式
raw_content.union(raw_content)
raw_content.union(raw_content)
演示 union 语义,无业务意义不迁移
raw_content.intersection(raw_content)
raw_content.intersection(raw_content)
演示 intersection,无业务意义不迁移
cache()
cache()
/
persist()
persist()
ZettaPark 执行计划由优化器管理不需要手动缓存
cartesian()
cartesian()
笛卡尔积演示,无对应业务需求不迁移

判断标准:如果一个 RDD 操作对应一个真实的业务问题("每个包下载了多少次"),就迁移。如果只是演示 API 特性("union 一个 RDD 和自身"),就不迁移。


端到端验证

迁移完成后,用 18 项自动化检查验证数据正确性:

python e2e.py

验证覆盖:

  1. 总行数:421,969 行
  2. Top 包名:Rcpp(4,783 次下载)
  3. Top 包下载量:4,783
  4. Top 国家:US
  5. Top R 版本:3.2.3
  6. 唯一包数:8,659
  7. 平均大小 > 0:业务合理性检查
  8. OS 种类数:20 个非 NA 操作系统
  9. Join 结果 top 包:与 Q1 一致
  10. Q8 有效行数:421,969(全量数据无坏行)
  11. Q8 无效行数:0
  12. Q9 高流量高体积包数:102 个(下载量 ≥ 100 且平均大小 ≥ 1MB)
  13. Q10 US Top 1 包名:Rcpp
  14. Q10 US Top 1 下载量:2,042
  15. Q10 结果总行数:1,255(含 RANK 并列)
  16. Q11 Top 地区:Americas
  17. Q11 Americas 下载量:164,615
  18. Q11 Asia 下载量:153,772

实际运行结果:18/18 全部通过


完整兼容性对照

操作RDDZettaPark状态
数据加载
sc.textFile
sc.textFile
+
map(split)
map(split)
session.read.csv()
session.read.csv()
✅ 更简洁
计数聚合
map(k,1).reduceByKey(+)
map(k,1).reduceByKey(+)
group_by().agg(count)
group_by().agg(count)
✅ 完全等价
平均值
aggregateByKey
aggregateByKey
+ 手写累加
group_by().agg(F.avg())
group_by().agg(F.avg())
✅ 大幅简化
过滤
.filter(lambda x: x[3] != "NA")
.filter(lambda x: x[3] != "NA")
.filter(F.col("r_version") != "NA")
.filter(F.col("r_version") != "NA")
✅ 完全等价
排序
.sortBy(lambda x: x[1], False)
.sortBy(lambda x: x[1], False)
ORDER BY ... DESC
ORDER BY ... DESC
✅ 完全等价
distinct count
.distinct().count()
.distinct().count()
F.count_distinct()
F.count_distinct()
✅ 更简洁
join
rdd1.join(rdd2)
rdd1.join(rdd2)
group_by().agg(count, avg)
group_by().agg(count, avg)
✅ 合并为单次扫描
取前 N 条
.take(10)
.take(10)
.limit(10).collect()
.limit(10).collect()
✅ 完全等价
打印结果
print(rdd.take(10))
print(rdd.take(10))
df.show()
df.show()
✅ 更友好
批量校验
mapPartitions
mapPartitions
+
accumulator
accumulator
COUNT(CASE WHEN ...)
COUNT(CASE WHEN ...)
✅ 单次扫描,无需分区感知
多条件聚合过滤
cogroup
cogroup
+ 手写迭代器
GROUP BY ... HAVING
GROUP BY ... HAVING
✅ 大幅简化
分组内排序6 步(groupByKey + flatMap + sortBy)
RANK() OVER (PARTITION BY ...)
RANK() OVER (PARTITION BY ...)
✅ 代码量减少 60%
小表广播
sc.broadcast(dict)
sc.broadcast(dict)
+
.value.get()
.value.get()
LEFT JOIN
LEFT JOIN
小表
✅ 优化器自动 broadcast

迁移结论

RDD 迁移到 ZettaPark 的核心变化是从命令式到声明式:不再告诉引擎怎么做,而是告诉它要什么。

代码量减少最多的地方

  • aggregateByKey
    aggregateByKey
    替换为
    F.avg()
    F.avg()
  • 两个 RDD join 合并为单次
    group_by().agg()
    group_by().agg()
  • 6 步二次排序替换为
    RANK() OVER (PARTITION BY ...)
    RANK() OVER (PARTITION BY ...)
  • cogroup
    cogroup
    + 手写迭代器替换为
    HAVING
    HAVING
    多条件聚合

完全等价的操作

filter
filter
sortBy
sortBy
take
take
distinct
distinct
——这些操作的语义完全一致,只是语法更简洁。

进阶模式的迁移收益更大

mapPartitions
mapPartitions
cogroup
cogroup
、二次排序、broadcast 变量这些 RDD 进阶操作,在 SQL 里都有更直接的表达方式,且不需要手动管理分区、迭代器、广播生命周期。

不需要迁移的操作:演示性的集合运算(union/intersection 自身)、手动缓存(cache/persist)——这些在 ZettaPark 中要么不需要,要么由优化器自动处理。

本项目 18/18 验证通过,证明 RDD 的业务逻辑可以完整迁移到 ZettaPark,且代码更简洁、执行效率更高。


参考

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询