Spark SQL 语法迁移指南

本文档提供从 Spark SQL 迁移到 Lakehouse 的完整指南,涵盖迁移评估、类型映射、语法差异、函数兼容性和 UDF 迁移。

迁移复杂度评估

Lakehouse 的数据类型、 SQL 里的 DDL、DML、SELECT 语法和绝大多数高频函数均和Spark兼容。对于纯 SQL 的 ETL 和分析作业,通常只需少量改动即可完成迁移。

除 Python/Java UDF 外,所有迁移项均属于一次性的批量替换或简单改写,不涉及逻辑重构。

UDF 迁移的工作量取决于函数数量——每个 Python/Java UDF 都需要单独打包部署到云函数服务,这是迁移中唯一需要较多投入的部分。如果 UDF 逻辑可以用 SQL 表达,改写为 SQL Function 可以显著降低工作量。

快速自检表

对照下表检查你的作业代码,可以在动手前快速评估迁移工作量。

检查项(在代码库中搜索)Lakehouse 行为使用频率影响工作量
以下各项均无DDL、SELECT、UPDATE/DELETE/MERGE 完全兼容
spark.udf.register
spark.udf.register
/ Python UDF / Java UDF
需改写为 External Function,部署到云函数服务UDF 越多,工作量线性增长
CREATE TEMP VIEW
CREATE TEMP VIEW
/
CREATE TEMPORARY VIEW
CREATE TEMPORARY VIEW
不支持会话级临时视图,需改为 CTE 或持久化 VIEW高频使用时改写量大,但每处改动简单低–中
aggregate(
aggregate(
/
reduce(
reduce(
/
session_window(
session_window(
/
window(
window(
无等价函数,需手动改写偶尔出现影响有限;核心逻辑依赖时需重新设计
SQL UDF(
CREATE FUNCTION
CREATE FUNCTION
调用时需加 Schema 前缀,或配置查找策略高频调用可一次性配置
udf_first
udf_first
解决
LATERAL VIEW posexplode
LATERAL VIEW posexplode
该写法不支持,需改为表函数语法出现次数通常少,逐一替换即可
PARTITIONED BY
PARTITIONED BY
子句在 CTAS 中
CTAS 不支持该子句,需拆为建表 + INSERT出现次数通常少,逐一替换即可
通过 Spark Connector 读写类型自动映射,无需修改代码

类型映射

通过 Spark Connector 读写数据时,Spark 类型会自动映射为 Lakehouse 类型,无需手动修改代码。

类型对照表

Spark 类型Lakehouse 类型说明
BooleanType
BooleanType
BOOLEAN
BOOLEAN
自动映射
ByteType
ByteType
TINYINT
TINYINT
自动映射
ShortType
ShortType
SMALLINT
SMALLINT
自动映射
IntegerType
IntegerType
INT
INT
自动映射
LongType
LongType
BIGINT
BIGINT
自动映射
FloatType
FloatType
FLOAT
FLOAT
自动映射
DoubleType
DoubleType
DOUBLE
DOUBLE
自动映射
DecimalType
DecimalType
DECIMAL(p,s)
DECIMAL(p,s)
自动映射,精度保持一致
StringType
StringType
STRING
STRING
自动映射
BinaryType
BinaryType
BINARY
BINARY
自动映射
DateType
DateType
DATE
DATE
自动映射
TimestampType
TimestampType
TIMESTAMP_LTZ
TIMESTAMP_LTZ
带时区类型
TimestampNTZType
TimestampNTZType
TIMESTAMP_NTZ
TIMESTAMP_NTZ
无时区类型(Spark 3.4+)
ArrayType
ArrayType
ARRAY<T>
ARRAY<T>
自动映射,元素类型递归映射
MapType
MapType
MAP<K,V>
MAP<K,V>
自动映射,键值类型递归映射
StructType
StructType
STRUCT<...>
STRUCT<...>
自动映射,字段类型递归映射

Timestamp 时区说明

Spark 的

TimestampType
TimestampType
默认带时区,对应 Lakehouse 的
TIMESTAMP_LTZ
TIMESTAMP_LTZ
。若 Spark 代码中使用
TimestampNTZType
TimestampNTZType
(无时区),在 Lakehouse 中建表时应使用
TIMESTAMP_NTZ
TIMESTAMP_NTZ
类型,避免时区转换导致时间值偏移。

DDL 语法

建表语法

Spark 的建表语法在 Lakehouse 中高度兼容。以下语法均支持,迁移时无需修改:

-- 注释语法 CREATE TABLE orders ( id INT COMMENT '主键', name STRING COMMENT '姓名' ) COMMENT '订单表'; -- NOT NULL 约束 CREATE TABLE users (id INT NOT NULL, name STRING); -- 默认值 CREATE TABLE tasks (id INT, status STRING DEFAULT 'pending'); -- 生成列 CREATE TABLE order_items ( id INT, price DOUBLE, quantity INT, total DOUBLE GENERATED ALWAYS AS (price * quantity) ); -- 表属性 CREATE TABLE events (id INT) TBLPROPERTIES ('key1' = 'value1'); -- 条件建表 CREATE TABLE IF NOT EXISTS backup_table AS SELECT * FROM original_table;

分区表

Spark 的分区语法在 Lakehouse 中完全兼容,迁移时无需修改。

-- Spark 原始写法,Lakehouse 完全支持 CREATE TABLE orders ( order_id INT, customer_id INT, amount DOUBLE ) PARTITIONED BY (order_date STRING);

Lakehouse 额外支持 Iceberg 风格的分区语法,分区列可以出现在字段列表中,语义更清晰:

-- Lakehouse 推荐写法(Iceberg 风格) CREATE TABLE orders ( order_id INT, customer_id INT, amount DOUBLE, order_date STRING -- 分区列在字段列表中 ) PARTITIONED BY (order_date); -- 此处仅声明列名

两种写法效果相同,迁移时保持原有写法即可,新建表推荐使用 Iceberg 风格。

CTAS 建表

CTAS(Create Table As Select)语法在 Lakehouse 中基本兼容,但有以下差异:

USING
USING
关键字可选

-- Spark CREATE TABLE orders_ctas USING parquet AS SELECT * FROM orders; -- Lakehouse(USING 可保留,也可省略) CREATE TABLE orders_ctas AS SELECT * FROM orders;

CTAS 不支持分区子句

Lakehouse 的 CTAS 语法不支持

PARTITIONED BY
PARTITIONED BY
子句。如需创建分区表,应先建表再插入数据:

-- 错误写法:CTAS 不支持 PARTITIONED BY CREATE TABLE orders_partitioned PARTITIONED BY (order_date) AS SELECT * FROM orders; -- 正确写法:先建表,再插入 CREATE TABLE orders_partitioned ( order_id INT, customer_id INT, amount DOUBLE, order_date STRING ) PARTITIONED BY (order_date); INSERT INTO orders_partitioned SELECT * FROM orders;

分桶表

分桶表语法与 Spark 完全一致,无需修改:

-- Spark 和 Lakehouse 写法完全一致 CREATE TABLE users ( id INT, name STRING ) CLUSTERED BY (id) INTO 16 BUCKETS;

USING parquet
USING parquet
关键字在 Lakehouse 中也是可选的。

分区转换函数

Lakehouse 与 Spark 在分区转换函数上完全一致,无需修改代码:

功能函数
按年分区
years(ts)
years(ts)
按月分区
months(ts)
months(ts)
按天分区
days(ts)
days(ts)
按小时分区
hours(ts)
hours(ts)
哈希分桶
bucket(N, col)
bucket(N, col)
截断分区
truncate(col, W)
truncate(col, W)

隐式分区

Lakehouse 采用类似 Apache Iceberg 的隐式分区(Hidden Partitioning)机制:

  • 分区信息存储在元数据中,不依赖文件路径
  • 可随时修改分区策略,无需重写数据
  • 分区数量无限制
  • 查询时优化器自动进行分区裁剪

动态分区限制

Lakehouse 单任务最多写入 2048 个动态分区。若分区数超过此限制,建议分批写入或使用 Cluster Key 替代分区。

DML 语法

INSERT

-- 完全兼容 INSERT INTO orders VALUES (1, 100, '2024-01-15'); INSERT INTO orders SELECT * FROM staging_orders;

INSERT OVERWRITE

Lakehouse 支持三种

INSERT OVERWRITE
INSERT OVERWRITE
模式,但默认语义与 Spark 不同,迁移时需注意:

覆盖整张表

-- 清空表并写入新数据 INSERT OVERWRITE orders VALUES (1, 100, '2024-01-15');

静态分区覆盖

-- 只覆盖指定分区,其他分区不受影响 INSERT OVERWRITE orders PARTITION (order_date='2024-01-15') SELECT order_id, customer_id, amount FROM staging WHERE order_date='2024-01-15';

动态分区覆盖

不指定具体分区值,系统自动覆盖本次写入涉及的所有分区,未涉及的分区保留不变。这与 Spark 的

spark.sql.sources.partitionOverwriteMode=dynamic
spark.sql.sources.partitionOverwriteMode=dynamic
行为一致:

-- 只覆盖 SELECT 结果中出现的分区,其他分区保留 INSERT OVERWRITE orders SELECT order_id, customer_id, amount, order_date FROM staging;

UPDATE / DELETE

-- 完全兼容 UPDATE orders SET status = 'shipped' WHERE id = 1; DELETE FROM orders WHERE status = 'cancelled';

MERGE INTO

-- 完全兼容 MERGE INTO orders AS target USING staging_orders AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.amount = source.amount WHEN NOT MATCHED THEN INSERT (id, amount, order_date) VALUES (source.id, source.amount, source.order_date);

SELECT 查询

完全兼容的语法

以下 Spark SQL 常用语法在 Lakehouse 中完全兼容,无需修改:

语法示例状态
反引号引用列名
SELECT `id`, `name` FROM t
SELECT `id`, `name` FROM t
字符串连接`SELECT 'Hello'
CASE WHEN
CASE WHEN x > 0 THEN 'positive' END
CASE WHEN x > 0 THEN 'positive' END
GROUP BY 位置引用
GROUP BY 1, 2
GROUP BY 1, 2
ORDER BY 位置引用
ORDER BY 1 DESC
ORDER BY 1 DESC
HAVING 使用别名
HAVING total > 100
HAVING total > 100
LIMIT
LIMIT 10
LIMIT 10
VALUES 子句
SELECT * FROM VALUES (1, 'a'), (2, 'b') AS t(id, name)
SELECT * FROM VALUES (1, 'a'), (2, 'b') AS t(id, name)
JOIN ... USING
JOIN b USING (id)
JOIN b USING (id)
CROSS JOIN
CROSS JOIN
CROSS JOIN
RLIKE / REGEXP
'abc' RLIKE '[a-z]+$'
'abc' RLIKE '[a-z]+$'
NULLIF / NVL
NULLIF(a, b)
NULLIF(a, b)
,
NVL(col, 'default')
NVL(col, 'default')
EXCEPT / INTERSECT
SELECT ... EXCEPT SELECT ...
SELECT ... EXCEPT SELECT ...
QUALIFY
QUALIFY ROW_NUMBER() = 1
QUALIFY ROW_NUMBER() = 1
隐式类型转换
WHERE str_col = 123
WHERE str_col = 123
除以零
SELECT 1/0
SELECT 1/0
NULL
NULL

相关子查询

Lakehouse 完全支持相关子查询(Correlated Subquery),包括

EXISTS
EXISTS
/
NOT EXISTS
NOT EXISTS
中引用外部列,语法与 Spark 一致,无需修改:

-- 完全兼容,无需改写 SELECT id FROM orders a WHERE EXISTS (SELECT 1 FROM customers b WHERE b.id = a.customer_id); SELECT id FROM orders a WHERE NOT EXISTS (SELECT 1 FROM blacklist b WHERE b.order_id = a.id);

窗口函数

Lakehouse 完全兼容 Spark SQL 的窗口函数,语法与 Spark 完全一致:

SELECT id, amount, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at) AS rn, SUM(amount) OVER (PARTITION BY user_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total FROM orders;

支持的窗口函数:

ROW_NUMBER()
ROW_NUMBER()
RANK()
RANK()
DENSE_RANK()
DENSE_RANK()
LAG()
LAG()
LEAD()
LEAD()
FIRST_VALUE()
FIRST_VALUE()
LAST_VALUE()
LAST_VALUE()
NTILE()
NTILE()
SUM() OVER()
SUM() OVER()
AVG() OVER()
AVG() OVER()
等。

表生成函数

Lakehouse 支持 Spark 常用的表生成函数:

函数支持说明
explode()
explode()
完全兼容,支持
LATERAL VIEW
LATERAL VIEW
和表函数语法
posexplode()
posexplode()
⚠️
AS (pos, val)
AS (pos, val)
别名语法支持;
LATERAL VIEW ... AS pos, val
LATERAL VIEW ... AS pos, val
语法不支持,见下方说明
inline()
inline()
直接调用
stack()
stack()
直接调用
json_tuple()
json_tuple()
完全兼容,支持
LATERAL VIEW
LATERAL VIEW
语法

-- explode 两种写法均支持 SELECT id, item FROM orders LATERAL VIEW explode(items) t AS item; SELECT id, item FROM orders, explode(items) AS t(item); -- posexplode:AS (pos, val) 别名语法支持 SELECT pos, val FROM posexplode(ARRAY('a', 'b', 'c')) AS t(pos, val); -- 返回:(0,'a'), (1,'b'), (2,'c') -- posexplode:LATERAL VIEW AS pos, val 语法不支持,需改写 -- Spark 写法(不支持): -- SELECT pos, val FROM t LATERAL VIEW posexplode(items) tmp AS pos, val -- Lakehouse 改写: SELECT pos, val FROM t, posexplode(items) AS tmp(pos, val); -- inline/stack/json_tuple 直接调用 SELECT inline(ARRAY(named_struct('a', 1), named_struct('a', 2))); SELECT stack(2, 'a', 1, 'b', 2); SELECT a, b FROM json_tuple('{"a":1,"b":2}', 'a', 'b') AS j(a, b);

JSON 处理函数

Spark 函数Lakehouse 函数说明
from_json(str, schema)
from_json(str, schema)
from_json(str, schema)
from_json(str, schema)
✅ 完全兼容,返回 STRUCT 类型
to_json(struct)
to_json(struct)
to_json(expr)
to_json(expr)
✅ 完全兼容
get_json_object(str, path)
get_json_object(str, path)
get_json_object(str, path)
get_json_object(str, path)
✅ 完全兼容
json_tuple(str, path1, path2)
json_tuple(str, path1, path2)
json_tuple(str, path1, path2)
json_tuple(str, path1, path2)
✅ 完全兼容
parse_json(str)
parse_json(str)
parse_json(str)
parse_json(str)
✅ Lakehouse 特有,返回 JSON 类型

-- from_json 完全兼容 SELECT from_json('{"a":1}', 'a INT').a; -- get_json_object 完全兼容 SELECT get_json_object('{"a":{"b":123}}', '$.a.b'); -- parse_json 也可用(返回 JSON 类型) SELECT parse_json('{"a":1}')['a'];

临时视图与 CTE

临时视图

Lakehouse 不支持

TEMPORARY VIEW
TEMPORARY VIEW
/
TEMP VIEW
TEMP VIEW
语法,这是与 Spark 的一个重要差异。

Spark 的 TEMP VIEW 是会话级对象,会话结束自动销毁。Lakehouse 没有等价的会话级视图,迁移时有两种选择:

方案一:改为持久化 VIEW(适合多次复用的逻辑)

-- Spark CREATE OR REPLACE TEMP VIEW temp_orders AS SELECT * FROM orders WHERE status = 'active'; -- Lakehouse:改为持久化视图,需要有 Schema 写权限 CREATE OR REPLACE VIEW my_schema.temp_orders AS SELECT * FROM orders WHERE status = 'active';

方案二:改为 CTE(适合单次查询内的临时逻辑)

-- Spark CREATE OR REPLACE TEMP VIEW daily_orders AS SELECT order_date, SUM(amount) AS total FROM orders GROUP BY order_date; SELECT * FROM daily_orders WHERE total > 1000; -- Lakehouse:改为 CTE,逻辑内联在查询中 WITH daily_orders AS ( SELECT order_date, SUM(amount) AS total FROM orders GROUP BY order_date ) SELECT * FROM daily_orders WHERE total > 1000;

CTE 方案不需要任何权限,也不产生持久化对象,是最简单的替代方式。

CTE(WITH 子句)

CTE 语法与 Spark 完全一致,无需修改。

函数兼容性

完全兼容的函数

以下 Spark SQL 常用函数在 Lakehouse 中完全兼容:

数组函数

split
split
,
regexp_replace
regexp_replace
,
regexp_extract
regexp_extract
,
concat_ws
concat_ws
,
size
size
,
array_sort
array_sort
,
sort_array
sort_array
,
array_contains
array_contains
,
array_position
array_position
,
slice
slice
,
sequence
sequence
,
flatten
flatten
,
arrays_zip
arrays_zip
,
array_repeat
array_repeat
,
array_distinct
array_distinct
,
array_union
array_union
,
array_intersect
array_intersect
,
array_except
array_except
,
arrays_overlap
arrays_overlap
,
array_min
array_min
,
array_max
array_max
,
array_join
array_join
,
array_remove
array_remove
,
cardinality
cardinality
,
reverse
reverse
,
element_at
element_at

Map 函数

map_keys
map_keys
,
map_values
map_values
,
map_from_arrays
map_from_arrays
,
map_concat
map_concat
,
str_to_map
str_to_map
,
map_filter
map_filter
,
transform_keys
transform_keys
,
transform_values
transform_values
,
map_zip_with
map_zip_with
,
map_from_entries
map_from_entries
,
element_at
element_at

高阶函数

transform
transform
,
filter
filter
,
exists
exists
,
forall
forall
,
zip_with
zip_with

聚合函数

collect_list
collect_list
,
collect_set
collect_set
,
first
first
,
last
last
,
approx_count_distinct
approx_count_distinct
,
percentile
percentile
,
percentile_approx
percentile_approx
,
corr
corr
,
covar_pop
covar_pop
,
covar_samp
covar_samp

字符串函数

split
split
,
regexp_replace
regexp_replace
,
regexp_extract
regexp_extract
,
concat_ws
concat_ws

日期函数

date_format
date_format
,
to_date
to_date
,
current_date
current_date
,
current_timestamp
current_timestamp

条件函数

CASE WHEN
CASE WHEN
,
NULLIF
NULLIF
,
NVL
NVL
,
IFNULL
IFNULL
,
COALESCE
COALESCE
,
TRY_CAST
TRY_CAST
,
TRY_ELEMENT_AT
TRY_ELEMENT_AT

其他函数

typeof
typeof
,
named_struct
named_struct
,
monotonically_increasing_id
monotonically_increasing_id
,
current_database
current_database
,
current_schema
current_schema
,
current_user
current_user
,
version
version
,
raise_error
raise_error
,
assert_true
assert_true
,
aes_encrypt
aes_encrypt
,
aes_decrypt
aes_decrypt

不支持的函数

以下 Spark SQL 函数在 Lakehouse 中不支持,迁移时需替换:

Spark 函数替代方案说明
aggregate(arr, init, merge)
aggregate(arr, init, merge)
使用子查询或 UDF数组聚合
reduce(arr, init, merge)
reduce(arr, init, merge)
同上数组归约
nanvl(x, y)
nanvl(x, y)
CASE WHEN isnan(x) THEN y ELSE x END
CASE WHEN isnan(x) THEN y ELSE x END
NaN 处理
bin(n)
bin(n)
conv(n, 10, 2)
conv(n, 10, 2)
二进制转换
hash(x)
hash(x)
murmurhash3_32(x)
murmurhash3_32(x)
sha2(x, 256)
sha2(x, 256)
哈希函数
xxhash64(x)
xxhash64(x)
murmurhash3_32(x)
murmurhash3_32(x)
64 位哈希
shuffle(arr)
shuffle(arr)
不支持数组随机打乱
array_sort(arr, comparator)
array_sort(arr, comparator)
仅支持单参数版本自定义排序
soundex(str)
soundex(str)
不支持语音编码
levenshtein(s1, s2)
levenshtein(s1, s2)
不支持编辑距离
overlay(str, replace, pos)
overlay(str, replace, pos)
不支持字符串替换
sentences(str)
sentences(str)
不支持句子分割
session_window(ts, gap)
session_window(ts, gap)
使用
LAG
LAG
/
LEAD
LEAD
手动计算
会话窗口
window(ts, interval)
window(ts, interval)
不支持时间窗口
width_bucket(v, min, max, n)
width_bucket(v, min, max, n)
不支持分桶函数
histogram_numeric(col, n)
histogram_numeric(col, n)
不支持数值直方图
kurtosis(col)
kurtosis(col)
不支持峰度
skewness(col)
skewness(col)
不支持偏度
reflect(class, method, args)
reflect(class, method, args)
不支持Java 反射
java_method(class, method, args)
java_method(class, method, args)
不支持Java 方法调用
xpath_string(xml, xpath)
xpath_string(xml, xpath)
不支持XML 解析
input_file_name()
input_file_name()
不支持文件名
input_file_block_start()
input_file_block_start()
不支持文件块起始
spark_partition_id()
spark_partition_id()
不支持Spark 分区 ID
entries_to_map(keys, values)
entries_to_map(keys, values)
map_from_arrays(keys, values)
map_from_arrays(keys, values)
Map 构建
length(array)
length(array)
size(array)
size(array)
cardinality(array)
cardinality(array)
数组长度

UDF 迁移

Spark 支持多种 UDF 类型,Lakehouse 提供两类对应机制:SQL Function(纯 SQL 逻辑)和 External Function(Python/Java 代码)。

迁移对照

Spark UDF 类型典型写法Lakehouse 对应迁移复杂度
SQL UDF(纯表达式)
CREATE FUNCTION f(x INT) RETURNS INT RETURN x * 2
CREATE FUNCTION f(x INT) RETURNS INT RETURN x * 2
SQL Function低,语法基本一致
Python UDF(标量)
spark.udf.register("f", lambda x: x*2, IntegerType())
spark.udf.register("f", lambda x: x*2, IntegerType())
External Function(Python,仅支持 UDF)中,需部署到云函数服务
Java UDF实现
UDF1<T,R>
UDF1<T,R>
等 Spark 接口,或继承 Hive
GenericUDF
GenericUDF
/
UDF
UDF
External Function(Java)中,需打包部署;若已用 Hive API 写法,代码可部分复用
Java UDAF继承
Aggregator
Aggregator
UserDefinedAggregateFunction
UserDefinedAggregateFunction
External Function(Java UDAF)高,需重写聚合逻辑
Java UDTF继承
GenericUDTF
GenericUDTF
External Function(Java UDTF)高,需重写表函数逻辑
Python UDAF / Python UDTFpandas_udf 聚合、yield 多行不支持,需用 Java 重写或改用 SQL Function

SQL Function(纯 SQL 逻辑)

Spark 的 SQL UDF 可直接迁移为 Lakehouse SQL Function,语法高度兼容:

-- Spark SQL UDF CREATE FUNCTION multiply(x INT, y INT) RETURNS INT RETURN x * y; -- Lakehouse SQL Function(语法相同) CREATE FUNCTION my_schema.multiply(x INT, y INT) RETURNS INT RETURN x * y;

与 Spark 的关键差异:必须加 Schema 前缀

Lakehouse SQL Function 是 Schema 级对象,调用时默认需要加 Schema 前缀:

-- 报错:找不到函数 SELECT multiply(3, 4); -- 正确:加 Schema 前缀 SELECT my_schema.multiply(3, 4);

如果原 Spark 代码大量调用无前缀 UDF,可通过以下配置开启 UDF 优先查找,使无前缀调用生效:

-- 开启后,无前缀函数名优先在用户 UDF 中查找,再查内置函数 SET cz.sql.remote.udf.lookup.policy = udf_first; -- 之后可以无前缀调用 SELECT multiply(3, 4);

SQL Function 也支持表函数(返回多行),对应 Spark 的 UDTF 中纯 SQL 可表达的场景:

-- 返回多行的 SQL Function(表函数) CREATE FUNCTION my_schema.get_employees(dept INT) RETURNS TABLE(name STRING) RETURN SELECT name FROM employee WHERE deptno = dept; SELECT * FROM my_schema.get_employees(10);

External Function(Python/Java 代码)

Spark 的 Python UDF、Scala/Java UDF 需迁移为 Lakehouse External Function。External Function 将函数逻辑部署到云函数服务(阿里云 FC、腾讯云 SCF 或 AWS Lambda),Lakehouse 通过 HTTP 调用执行。

支持范围:

  • Python 3.10:仅支持 UDF(标量函数)
  • Java 8:支持 UDF、UDAF(聚合函数)、UDTF(表函数)

迁移步骤概览:

  1. 将 UDF 代码改写为云函数 Handler(Python 3.10 或 Java 8 Hive 风格 UDF)
  2. 打包上传至对象存储或 Lakehouse Volume
  3. 在 Lakehouse 中创建 API Connection(存储云函数服务的认证信息)
  4. 创建 External Function 并绑定 Connection

-- 第 4 步:创建 External Function(Java UDF 示例) CREATE EXTERNAL FUNCTION my_schema.my_upper AS 'com.example.GenericUdfUpper' USING ARCHIVE 'volume://fc_volume/udfs/my_upper.zip' CONNECTION my_fc_conn WITH PROPERTIES ('remote.udf.api' = 'java8.hive2.v0'); -- 调用方式与普通函数相同(需加 Schema 前缀) SELECT my_schema.my_upper(name) FROM users;

-- Python UDF 示例 CREATE EXTERNAL FUNCTION my_schema.clean_phone AS 'handler.clean_phone' USING FILE 'volume:user://~/clean_phone.zip' CONNECTION my_fc_conn WITH PROPERTIES ('remote.udf.api' = 'python3.mc.v0'); SELECT my_schema.clean_phone(phone_number) FROM users;

会话级 UDF 注册

Spark 支持在 Driver 端动态注册 UDF(

spark.udf.register
spark.udf.register
),注册后当前 Session 立即可用。Lakehouse 没有等价的会话级注册机制,所有函数都是持久化的 Schema 对象。

迁移时,将

spark.udf.register
spark.udf.register
的注册逻辑改为
CREATE OR REPLACE FUNCTION
CREATE OR REPLACE FUNCTION
,在部署脚本或初始化阶段执行一次即可。

配置参数

Spark 的查询优化参数在 Lakehouse 中由系统自动管理,无需手动配置:

Spark 配置Lakehouse 行为
spark.sql.adaptive.enabled
spark.sql.adaptive.enabled
默认开启自适应查询优化
spark.sql.shuffle.partitions
spark.sql.shuffle.partitions
自动管理并行度
spark.sql.broadcastTimeout
spark.sql.broadcastTimeout
自动处理广播 Join
spark.sql.files.maxPartitionBytes
spark.sql.files.maxPartitionBytes
自动优化文件切分

DataFrame 写入限制

通过 Spark Connector 写入 Lakehouse 时,需注意以下限制:

  • 必须全字段写入,不支持部分列写入
  • 不支持写入带主键的表(PK 表)
  • 写入模式仅支持
    append
    append
    ,不支持
    overwrite
    overwrite
    单分区覆盖

迁移检查清单

  • 确认
    TimestampType
    TimestampType
    映射为
    TIMESTAMP_LTZ
    TIMESTAMP_LTZ
    TimestampNTZType
    TimestampNTZType
    映射为
    TIMESTAMP_NTZ
    TIMESTAMP_NTZ
  • CTAS 建表:不支持
    PARTITIONED BY
    PARTITIONED BY
    ,需拆为建表 + INSERT
  • INSERT OVERWRITE
    INSERT OVERWRITE
    :Lakehouse 默认动态分区覆盖;若原代码依赖 Spark 2.x 的整表覆盖,需改为
    TRUNCATE TABLE
    TRUNCATE TABLE
    +
    INSERT INTO
    INSERT INTO
  • TEMP VIEW
    TEMP VIEW
    :不支持,改为持久化
    VIEW
    VIEW
    或内联
    CTE
    CTE
  • posexplode
    posexplode
    LATERAL VIEW ... AS pos, val
    LATERAL VIEW ... AS pos, val
    语法不支持,改为表函数语法
    FROM t, posexplode(col) AS tmp(pos, val)
    FROM t, posexplode(col) AS tmp(pos, val)
  • 函数兼容性:检查是否使用了不支持的函数(如
    aggregate
    aggregate
    reduce
    reduce
    hash
    hash
    shuffle
    shuffle
    session_window
    session_window
    window
    window
    levenshtein
    levenshtein
    overlay
    overlay
    等),见函数兼容性章节
  • DataFrame 写入:确认全字段写入,不使用 PK 表
  • 动态分区:确认单任务分区数不超过 2048
  • SQL UDF:调用时需加 Schema 前缀,或配置
    cz.sql.remote.udf.lookup.policy = udf_first
    cz.sql.remote.udf.lookup.policy = udf_first
    兼容无前缀调用
  • Python/Scala/Java UDF:迁移为 External Function,部署到云函数服务并创建 API Connection
  • spark.udf.register
    spark.udf.register
    动态注册:改为
    CREATE OR REPLACE FUNCTION
    CREATE OR REPLACE FUNCTION
    ,在部署脚本中执行一次

相关文档

联系我们
预约咨询
微信咨询
电话咨询