Spark 任务平滑迁移 Lakehouse 实战指南
本文档旨在帮助拥有存量 Spark 任务的团队快速迁移至云器 Lakehouse。
为什么选择迁移?
这是什么:
云器 Lakehouse 原生支持 Spark 类型的虚拟集群(VCluster)。你的 RDD、DataFrame、UDF 逻辑可以几乎无缝运行,只需将数据读写入口从 Hive/Parquet 替换为 Spark Connector 即可。
迁移收益:
- 无需重写逻辑:核心计算代码(RDD/DataFrame)100% 兼容。
- 统一入口:计算任务直接跑在 Lakehouse 存算分离架构上,弹性扩容,告别传统 Hadoop 集群维护。
- 数据湖原生:直接读写 Iceberg/Parquet 数据,无需中转。
1. 兼容性概览
在迁移前,请先评估你的代码是否符合支持范围:
| Spark 操作 | 支持情况 | 说明与建议 |
|---|
| RDD 操作 | ✅ 支持 | 老版本代码、灵活控制逻辑无需修改。 |
| DataFrame 操作 | ✅ 支持 | 数仓任务开发、大数据处理。 |
| 读写数仓数据 | ⚠️ 需适配 | 不支持 df.write.format("parquet").saveAsTable("hive_db.table")
df.write.format("parquet").saveAsTable("hive_db.table") 。必须改为 df.write.format("clickzetta")
df.write.format("clickzetta") 。 |
| SQL 操作 | ⚠️ 需适配 | 不支持 spark.sql("SELECT ... FROM lakehouse_table")
spark.sql("SELECT ... FROM lakehouse_table") 直接查询。需先用 Connector 读取并注册为临时表。 |
| Spark Streaming | ❌ 暂不支持 | 实时流计算建议迁移至 Kafka Pipe 或 实时同步任务。 |
| Hive 元数据访问 | ❌ 暂不支持 | 无法通过 enableHiveSupport()
enableHiveSupport() 访问 Hive 表。需先将数据导入 Lakehouse。 |
核心建议:
如果任务依赖
spark.sql
spark.sql
直接读写 Hive 表,或者使用了
Spark Streaming
Spark Streaming
,本方案不适用。
2. 迁移核心步骤
2.1 环境准备
- 创建 Spark VCluster:
在 Lakehouse SQL 窗口执行:
CREATE VCLUSTER spark_vc TYPE SPARK;
- 下载工具:
2.2 代码改造:封装读写方法
由于不支持直接通过
spark.sql
spark.sql
操作 Lakehouse 表,我们需要封装两个通用方法:
读取表 (Read):通过 Connector 加载数据并注册为临时视图,这样后续的
spark.sql
spark.sql
就可以像操作普通表一样操作它。
import org.apache.spark.sql.SparkSession
import com.clickzetta.spark.connector.ClickzettaOptions
def readClickzettaTable(spark: SparkSession, tableName: String, schema: String = "public"): Unit = {
val df = spark.read.format("clickzetta")
.option(ClickzettaOptions.CZ_ENDPOINT, "cn-shanghai-alicloud.api.clickzetta.com")
.option(ClickzettaOptions.CZ_USERNAME, "your_username")
.option(ClickzettaOptions.CZ_PASSWORD, "your_password")
.option(ClickzettaOptions.CZ_WORKSPACE, "your_workspace")
.option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "spark_vc")
.option(ClickzettaOptions.CZ_SCHEMA, schema)
.option(ClickzettaOptions.CZ_TABLE, tableName)
.load()
// 注册为临时表,供后续 spark.sql 使用
df.createOrReplaceTempView(tableName)
}
写入表 (Write):使用
format("clickzetta")
format("clickzetta")
将 DataFrame 写入 Lakehouse。
def writeClickzettaTable(df: org.apache.spark.sql.DataFrame, tableName: String, mode: String = "overwrite"): Unit = {
df.write.format("clickzetta")
.option(ClickzettaOptions.CZ_ENDPOINT, "cn-shanghai-alicloud.api.clickzetta.com")
.option(ClickzettaOptions.CZ_USERNAME, "your_username")
.option(ClickzettaOptions.CZ_PASSWORD, "your_password")
.option(ClickzettaOptions.CZ_WORKSPACE, "your_workspace")
.option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "spark_vc")
.option(ClickzettaOptions.CZ_SCHEMA, "public")
.option(ClickzettaOptions.CZ_TABLE, tableName)
.mode(mode)
.save()
}
2.3 改造案例对比
| 原 Spark 代码 (Hive) | 改造后代码 (Lakehouse) |
|---|
spark.sql("SELECT * FROM hive_db.users")
spark.sql("SELECT * FROM hive_db.users") | 1. readClickzettaTable(spark, "users")
readClickzettaTable(spark, "users") 2. spark.sql("SELECT * FROM users")
spark.sql("SELECT * FROM users") |
df.write.saveAsTable("hive_db.result")
df.write.saveAsTable("hive_db.result") | writeClickzettaTable(df, "result")
writeClickzettaTable(df, "result") |
3. 作业提交指南
将打包好的 JAR 上传至 OSS,使用 Lakehouse 提供的
spark-submit
spark-submit
客户端提交。
命令示例:
./bin/spark-submit oss://your-bucket/jars/app-1.0-SNAPSHOT.jar \\
--jars oss://your-bucket/jars/spark-clickzetta-connector.jar \\
--master cn-shanghai-alicloud.api.clickzetta.com \\
--name my_spark_job \\
--class com.example.MyMainClass \\
--conf spark.cz.instance.name=your_instance_id \\
--conf spark.cz.workspace=your_workspace \\
--conf spark.cz.user.name=your_username \\
--conf spark.cz.password=your_password \\
--conf spark.cz.vcluster=spark_vc \\
--conf spark.cz.job.type=SPARK
关键参数说明:
| 参数 | 说明 | 获取路径 |
|---|
--master
--master | Lakehouse API Endpoint | Studio -> 管理 -> 工作空间 -> JDBC 连接串域名 |
--jars
--jars | 必填 Connector 依赖 JAR | Spark Connector 概述 |
spark.cz.vcluster
spark.cz.vcluster | 指定的计算集群 | 必须填写类型为 SPARK 的 VCluster 名称 |
spark.cz.instance.name
spark.cz.instance.name | 实例 ID | Studio -> 管理 -> 工作空间 -> 实例 ID |
4. Maven 依赖配置建议
在
pom.xml
pom.xml
中,建议将 Spark 核心依赖设为
<scope>provided</scope>
<scope>provided</scope>
,而 Connector 建议通过
--jars
--jars
引入或设为
compile
compile
(如果打包进 Fat Jar)。
<dependency>
<groupId>com.clickzetta</groupId>
<artifactId>spark-clickzetta</artifactId>
<version>1.0.0-SNAPSHOT</version>
<!-- 若使用 --jars 提交,此处可不设或设 provided -->
</dependency>
<!-- Spark 核心库通常集群已有,设为 provided 以减小包体积 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.0</version>
<scope>provided</scope>
</dependency>
相关文档