Spark 任务平滑迁移 Lakehouse 实战指南

本文档旨在帮助拥有存量 Spark 任务的团队快速迁移至云器 Lakehouse。

为什么选择迁移?

这是什么: 云器 Lakehouse 原生支持 Spark 类型的虚拟集群(VCluster)。你的 RDD、DataFrame、UDF 逻辑可以几乎无缝运行,只需将数据读写入口从 Hive/Parquet 替换为 Spark Connector 即可。

迁移收益

  • 无需重写逻辑:核心计算代码(RDD/DataFrame)100% 兼容。
  • 统一入口:计算任务直接跑在 Lakehouse 存算分离架构上,弹性扩容,告别传统 Hadoop 集群维护。
  • 数据湖原生:直接读写 Iceberg/Parquet 数据,无需中转。

1. 兼容性概览

在迁移前,请先评估你的代码是否符合支持范围:

Spark 操作支持情况说明与建议
RDD 操作✅ 支持老版本代码、灵活控制逻辑无需修改。
DataFrame 操作✅ 支持数仓任务开发、大数据处理。
读写数仓数据⚠️ 需适配不支持
df.write.format("parquet").saveAsTable("hive_db.table")
df.write.format("parquet").saveAsTable("hive_db.table")
。必须改为
df.write.format("clickzetta")
df.write.format("clickzetta")
SQL 操作⚠️ 需适配不支持
spark.sql("SELECT ... FROM lakehouse_table")
spark.sql("SELECT ... FROM lakehouse_table")
直接查询。需先用 Connector 读取并注册为临时表。
Spark Streaming❌ 暂不支持实时流计算建议迁移至 Kafka Pipe实时同步任务
Hive 元数据访问❌ 暂不支持无法通过
enableHiveSupport()
enableHiveSupport()
访问 Hive 表。需先将数据导入 Lakehouse。

2. 迁移核心步骤

2.1 环境准备

  1. 创建 Spark VCluster: 在 Lakehouse SQL 窗口执行:

    CREATE VCLUSTER spark_vc TYPE SPARK;

  2. 下载工具

2.2 代码改造:封装读写方法

由于不支持直接通过

spark.sql
spark.sql
操作 Lakehouse 表,我们需要封装两个通用方法:

读取表 (Read):通过 Connector 加载数据并注册为临时视图,这样后续的

spark.sql
spark.sql
就可以像操作普通表一样操作它。

import org.apache.spark.sql.SparkSession import com.clickzetta.spark.connector.ClickzettaOptions def readClickzettaTable(spark: SparkSession, tableName: String, schema: String = "public"): Unit = { val df = spark.read.format("clickzetta") .option(ClickzettaOptions.CZ_ENDPOINT, "cn-shanghai-alicloud.api.clickzetta.com") .option(ClickzettaOptions.CZ_USERNAME, "your_username") .option(ClickzettaOptions.CZ_PASSWORD, "your_password") .option(ClickzettaOptions.CZ_WORKSPACE, "your_workspace") .option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "spark_vc") .option(ClickzettaOptions.CZ_SCHEMA, schema) .option(ClickzettaOptions.CZ_TABLE, tableName) .load() // 注册为临时表,供后续 spark.sql 使用 df.createOrReplaceTempView(tableName) }

写入表 (Write):使用

format("clickzetta")
format("clickzetta")
将 DataFrame 写入 Lakehouse。

def writeClickzettaTable(df: org.apache.spark.sql.DataFrame, tableName: String, mode: String = "overwrite"): Unit = { df.write.format("clickzetta") .option(ClickzettaOptions.CZ_ENDPOINT, "cn-shanghai-alicloud.api.clickzetta.com") .option(ClickzettaOptions.CZ_USERNAME, "your_username") .option(ClickzettaOptions.CZ_PASSWORD, "your_password") .option(ClickzettaOptions.CZ_WORKSPACE, "your_workspace") .option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "spark_vc") .option(ClickzettaOptions.CZ_SCHEMA, "public") .option(ClickzettaOptions.CZ_TABLE, tableName) .mode(mode) .save() }

2.3 改造案例对比

原 Spark 代码 (Hive)改造后代码 (Lakehouse)
spark.sql("SELECT * FROM hive_db.users")
spark.sql("SELECT * FROM hive_db.users")
1.
readClickzettaTable(spark, "users")
readClickzettaTable(spark, "users")

2.
spark.sql("SELECT * FROM users")
spark.sql("SELECT * FROM users")
df.write.saveAsTable("hive_db.result")
df.write.saveAsTable("hive_db.result")
writeClickzettaTable(df, "result")
writeClickzettaTable(df, "result")

3. 作业提交指南

将打包好的 JAR 上传至 OSS,使用 Lakehouse 提供的

spark-submit
spark-submit
客户端提交。

命令示例

./bin/spark-submit oss://your-bucket/jars/app-1.0-SNAPSHOT.jar \\ --jars oss://your-bucket/jars/spark-clickzetta-connector.jar \\ --master cn-shanghai-alicloud.api.clickzetta.com \\ --name my_spark_job \\ --class com.example.MyMainClass \\ --conf spark.cz.instance.name=your_instance_id \\ --conf spark.cz.workspace=your_workspace \\ --conf spark.cz.user.name=your_username \\ --conf spark.cz.password=your_password \\ --conf spark.cz.vcluster=spark_vc \\ --conf spark.cz.job.type=SPARK

关键参数说明

参数说明获取路径
--master
--master
Lakehouse API EndpointStudio -> 管理 -> 工作空间 -> JDBC 连接串域名
--jars
--jars
必填 Connector 依赖 JARSpark Connector 概述
spark.cz.vcluster
spark.cz.vcluster
指定的计算集群必须填写类型为 SPARK 的 VCluster 名称
spark.cz.instance.name
spark.cz.instance.name
实例 IDStudio -> 管理 -> 工作空间 -> 实例 ID

4. Maven 依赖配置建议

pom.xml
pom.xml
中,建议将 Spark 核心依赖设为
<scope>provided</scope>
<scope>provided</scope>
,而 Connector 建议通过
--jars
--jars
引入或设为
compile
compile
(如果打包进 Fat Jar)。

<dependency> <groupId>com.clickzetta</groupId> <artifactId>spark-clickzetta</artifactId> <version>1.0.0-SNAPSHOT</version> <!-- 若使用 --jars 提交,此处可不设或设 provided --> </dependency> <!-- Spark 核心库通常集群已有,设为 provided 以减小包体积 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.5.0</version> <scope>provided</scope> </dependency>


相关文档

联系我们
预约咨询
微信咨询
电话咨询