MaxCompute → Lakehouse 迁移实战:电商数据工程项目
如果你的数据工程项目跑在 MaxCompute + DataWorks 上,迁移到 ClickZetta Lakehouse 的核心工作量集中在两块:SQL 语法适配 和任务编排替换 。MaxCompute SQL 与 Lakehouse SQL 的差异比你想象的少——大部分标准 SQL(JOIN、窗口函数、CTE、聚合)写法完全一致,需要改的主要是 6 处 MaxCompute 专有语法,以及把 DataWorks 任务节点迁移为 Studio 任务。
本文用一个真实项目验证这一点:将基于 MaxCompute + DataWorks 的电商数据工程项目完整迁移到 ClickZetta Lakehouse,覆盖 ODS/DWD/ADS 三层、8 张原始表、5 个 ETL 任务,经过完整端到端验证,全部通过。
原始项目
来源 :rcdelacruz/dataworks-maxcompute-practice
数据集 :电商场景,8 张表(customers / products / orders / order_items / web_sessions / page_views / user_events / suppliers)
任务编排 :DataWorks daily_etl_workflow.jsondaily_etl_workflow.json
,5 个节点,每天 02:00 触发
UDF :Python 文本分析(情感分析、关键词提取)+ Java 字符串处理
结论先行
你不需要重写业务逻辑,也不需要重新搭建任务编排体系。6 处 SQL 语法替换覆盖所有改动;DataWorks 的任务依赖关系可以完整迁移到 Studio,迁移后用
cz-clicz-cli
全自动化,比原来更容易维护。
SQL 改动量 :6 处语法替换,其余标准 SQL 零改动
任务编排 :5 个 DataWorks 节点 → 5 个 Studio 任务,依赖关系完整保留;迁移后通过 cz-clicz-cli
全自动化,setup.pysetup.py
一键完成
数据验证 :ODS 8 张表 / DWD 3 张汇总表 / ADS 3 张分析表,全部跑通
UDF :代码逻辑零改动,部署方式从"引擎内"改为"云函数服务"
技术栈对比
维度 MaxCompute + DataWorks ClickZetta Lakehouse 计算引擎 MaxCompute(ODPS) Lakehouse SQL Engine 任务编排 DataWorks Workflow JSON(描述性配置,原项目未实现 API 自动化,需手动在控制台创建任务) Studio 任务 + cz-cli (全命令行操作,setup.pysetup.py
一键自动创建任务、配置依赖、发布上线) 对象存储 OSS(LOAD DATA INPATH) Volume(COPY INTO FROM VOLUME) UDF 内联 Python/Java(引擎内执行) External Function(云函数服务) 数据保留 LIFECYCLE 365LIFECYCLE 365
(自动删除数据,最长无限制)TBLPROPERTIES ('data_retention_days' = '7')TBLPROPERTIES ('data_retention_days' = '7')
(Time Travel 历史版本保留,最长 90 天;数据物理删除需手动或定期任务处理)分区 PARTITIONED BY (ds STRING) 相同 参数变量 ${bizdate}${bizdate}
Studio 任务 SQL 中可直接使用 ${bizdate}${bizdate}
;Python/cz-cli 直接执行时用 f-string 拼接
项目背景
原始项目是一个标准的电商数据工程实践,包含三层数据架构:
ODS 层 :8 张原始表,从 OSS 加载 CSV 数据
DWD 层 :每日销售汇总、客户分层、商品表现分析
ADS 层 :Web 流量分析、增量变更检测、数据质量监控
DataWorks Workflow 定义了 5 个任务节点的依赖关系:
data_quality_check
├── customer_segmentation
├── product_performance_etl
└── web_analytics_etl
└── daily_sales_summary(依赖前两个)
迁移步骤
第一步:建立 Repo 和连接配置
Fork 原始 repo 到
clickzetta/clickzetta/
组织,重组目录结构:
gh repo fork rcdelacruz/dataworks-maxcompute-practice \
--org clickzetta \
--fork-name maxcompute2lakehouse-ecommerce
gh repo clone clickzetta/maxcompute2lakehouse-ecommerce
原始代码归入
01_source/01_source/
,迁移代码放
03_lakehouse/03_lakehouse/
:
maxcompute2lakehouse-ecommerce/
├── 01_source/ # 原始 MaxCompute 代码(原样保留)
├── 02_migration/ # 语法差异说明、任务映射文档
├── 03_lakehouse/ # 迁移后代码
│ ├── sql/ # Lakehouse SQL
│ ├── tasks/ # Studio 任务列表
│ ├── udf/ # External Function 代码
│ ├── setup.py # 一键初始化
│ └── e2e.py # 端到端验证
└── data/ # 8 个 CSV 样本文件
配置连接(
.env.env
):
cp .env.example .env
# 填写 CLICKZETTA_SERVICE / INSTANCE / WORKSPACE / USERNAME / PASSWORD
初始化环境(建 profile、建表、上传数据、创建 Studio 任务):
pip install -r requirements.txt
python 03_lakehouse/setup.py
setup.pysetup.py
自动完成 6 步:创建 cz-cli profile → 建 Schema → 建 Volume 并上传 CSV → 建表 → COPY INTO 加载数据 → 创建 Studio 任务(含依赖配置和 cron 调度)。
第二步:SQL 语法适配(6 处改动)
MaxCompute 与 Lakehouse SQL 的差异集中在以下 6 处,其余标准 SQL 写法完全一致。
1. LIFECYCLE → TBLPROPERTIES data_retention_days
MaxCompute 用
LIFECYCLELIFECYCLE
控制数据自动删除天数(无上限)。Lakehouse 用
data_retention_daysdata_retention_days
控制 Time Travel 历史版本保留期(最长 90 天),两者语义不完全相同——
data_retention_daysdata_retention_days
不会自动删除当前数据,只影响历史版本可回溯的时间窗口:
-- MaxCompute(自动删除超期数据)
CREATE TABLE orders (...) LIFECYCLE 365;
-- Lakehouse(设置 Time Travel 保留期,最长 90 天)
CREATE TABLE orders (...);
ALTER TABLE orders SET PROPERTIES ('data_retention_days' = '7');
-- 或建表时直接指定
CREATE TABLE orders (...)
TBLPROPERTIES ('data_retention_days' = '7');
⚠️ 注意 :如果原来依赖
LIFECYCLELIFECYCLE
自动清理历史分区数据,迁移后需要用定期 Studio 任务执行
ALTER TABLE ... DROP PARTITIONALTER TABLE ... DROP PARTITION
替代。
2. DATETIME → STRING(ODS 层)
MaxCompute 的
DATETIMEDATETIME
类型在
COPY INTOCOPY INTO
时无法从 CSV 字符串隐式转换,ODS 层统一用
STRINGSTRING
接收原始值,DWD 层转换时再
CASTCAST
:
-- MaxCompute ODS
order_date DATETIME
-- Lakehouse ODS(接收 CSV 原始字符串)
order_date STRING
-- Lakehouse DWD(转换时显式 CAST)
CAST(order_date AS TIMESTAMP)
3. LOAD DATA INPATH → COPY INTO FROM VOLUME
MaxCompute 从 OSS 加载数据用
LOAD DATA INPATHLOAD DATA INPATH
,Lakehouse 用
COPY INTO FROM VOLUMECOPY INTO FROM VOLUME
,语法结构不同:
-- MaxCompute
LOAD DATA INPATH 'oss://bucket/data/customers.csv' INTO TABLE customers;
-- Lakehouse
COPY INTO ecommerce.customers
FROM VOLUME ecommerce.ecommerce_vol
USING CSV
OPTIONS ('header' = 'true')
FILES ('raw/customers.csv');
⚠️ 注意 :Lakehouse 的
COPY INTOCOPY INTO
不支持
FILE_FORMAT = (TYPE = 'CSV' ...)FILE_FORMAT = (TYPE = 'CSV' ...)
语法,必须用
USING CSV OPTIONS(...)USING CSV OPTIONS(...)
形式。
4. ${bizdate} 参数变量
Studio 任务 SQL 中可以直接使用
${bizdate}${bizdate}
,运行时由调度系统替换,与 DataWorks 行为一致。只有在 Python 代码或 cz-cli 直接执行 SQL 时,
${bizdate}${bizdate}
不会被替换(返回空字符串),此时需要用 f-string 动态拼接:
# Studio 任务 SQL 中可直接保留(调度运行时替换)
INSERT OVERWRITE TABLE daily_sales PARTITION (ds = '${bizdate}') ...
# Python / cz-cli 直接执行时,需要 f-string 传入
bizdate = "20240115"
session.sql(f"""
INSERT OVERWRITE TABLE ecommerce_dwd.daily_sales
PARTITION (ds = '{bizdate}')
SELECT ...
""").collect()
5. GETDATE() → CURRENT_TIMESTAMP()
-- MaxCompute
GETDATE()
-- Lakehouse
CURRENT_TIMESTAMP()
6. RLIKE → REGEXP,CAST AS STRING → CAST AS VARCHAR
-- MaxCompute
email RLIKE '[A-Za-z0-9+_.-]+@...$'
CAST(count AS STRING)
-- Lakehouse
email REGEXP '[A-Za-z0-9+_.-]+@...$'
CAST(count AS VARCHAR)
完全兼容、零改动的部分:
JOINJOIN
(INNER / LEFT / RIGHT / FULL OUTER / SELF)
窗口函数(ROW_NUMBERROW_NUMBER
、RANKRANK
、DENSE_RANKDENSE_RANK
、NTILENTILE
、LAGLAG
、LEADLEAD
、SUM OVERSUM OVER
、AVG OVERAVG OVER
)
CTE(WITH ... AS (...)WITH ... AS (...)
)
CASE WHENCASE WHEN
、COALESCECOALESCE
、NULLIFNULLIF
DATE_FORMATDATE_FORMAT
、DATEDIFFDATEDIFF
、YEARYEAR
、MONTHMONTH
、DAYOFWEEKDAYOFWEEK
CONCATCONCAT
、UPPERUPPER
、LOWERLOWER
、LIKELIKE
PARTITIONED BY (ds STRING)PARTITIONED BY (ds STRING)
分区语法
INSERT OVERWRITE TABLE ... PARTITION (...)INSERT OVERWRITE TABLE ... PARTITION (...)
UNION ALLUNION ALL
、HAVINGHAVING
、LIMITLIMIT
、OFFSETOFFSET
第三步:数据加载
setup.pysetup.py
把
data/data/
目录下的 8 个 CSV 文件上传到 Volume,再用
COPY INTOCOPY INTO
加载:
# 上传到 Volume
session.file.put(str(csv_file), f"vol://ecommerce.ecommerce_vol/raw/")
# 加载(SQL 文件中)
COPY INTO ecommerce.orders PARTITION (ds = '20240115')
FROM VOLUME ecommerce.ecommerce_vol
USING CSV OPTIONS ('header' = 'true')
FILES ('raw/orders.csv');
实测加载结果:
表 行数 customers 10 products 10 orders 10 order_items 30 web_sessions 20 page_views 30 user_events 30 suppliers 9
第四步:迁移 DataWorks Workflow → Studio 任务
原始项目的
daily_etl_workflow.jsondaily_etl_workflow.json
是描述性配置文件。DataWorks 提供了 API 可以实现任务自动化,但原项目未实现,实际使用时需要登录控制台手动逐个创建节点、配置依赖和调度。
迁移后,
setup.pysetup.py
的第 6 步通过
cz-cli taskcz-cli task
全自动完成:创建任务、写入 SQL 内容、配置依赖、设置 cron、发布上线,无需任何手动操作。
如需手动操作或了解底层命令,核心步骤如下:
# 创建任务(--profile 保证上下文一致)
cz-cli task create data_quality_check --type SQL \
--folder ecommerce_etl --profile ecommerce_dev
# 写入 SQL 内容
cz-cli task save-content data_quality_check \
--file 03_lakehouse/sql/06_data_quality.sql \
--profile ecommerce_dev
# 配置依赖(dep-tasks 需要 taskId + taskName 的 JSON 数组,taskId 通过 cz-cli task list 获取)
cz-cli task save-config customer_segmentation \
--deps replace \
--dep-tasks '[{"taskId":10353489,"taskName":"data_quality_check"}]' \
--profile ecommerce_dev
# 配置 cron 调度(每天 02:00)
cz-cli task save-cron data_quality_check \
--cron "0 2 * * *" --profile ecommerce_dev
# 发布上线
cz-cli task deploy data_quality_check --profile ecommerce_dev
DataWorks → Studio 任务映射:
DataWorks 节点 Studio 任务 依赖 data_quality_check data_quality_check 无(入口) customer_segmentation customer_segmentation data_quality_check product_performance product_performance_etl data_quality_check web_analytics_summary web_analytics_etl data_quality_check daily_sales_summary daily_sales_summary customer_segmentation + product_performance_etl
第五步:UDF 迁移
MaxCompute UDF 在引擎内直接执行,Lakehouse UDF 需要部署到云函数服务(阿里云 FC / 腾讯云 SCF)。代码逻辑零改动,只需适配 Lakehouse External Function 规范:
# MaxCompute(继承 com.aliyun.odps.udf.UDF)
from odps.udf import annotate
@annotate("string->string")
class Upper(BaseUDTF):
def evaluate(self, arg):
return arg.upper()
# Lakehouse(适配 cz.udf,其余不变)
try:
from cz.udf import annotate
except ImportError:
annotate = lambda _: lambda cls: cls # 本地开发占位
@annotate("string->string")
class Upper(object):
def evaluate(self, arg):
return arg.upper() if arg else None
注册函数:
CREATE EXTERNAL FUNCTION IF NOT EXISTS ecommerce.text_sentiment(text STRING)
RETURNS STRING
AS 'text_analytics.TextSentiment'
USING FILE = 'volume:user://~/text_analytics.zip'
CONNECTION = ecommerce_fc_conn
WITH PROPERTIES ('remote.udf.api' = 'python3.mc.v0');
⚠️ 注意 :
CREATE EXTERNAL FUNCTIONCREATE EXTERNAL FUNCTION
不支持
OR REPLACEOR REPLACE
,只能用
IF NOT EXISTSIF NOT EXISTS
。调用时必须带完整 schema 前缀:
ecommerce.text_sentiment(...)ecommerce.text_sentiment(...)
。
注意事项
1. COPY INTO 语法与 Snowflake 不同
第一次写成了 Snowflake 风格的
FILE_FORMAT = (TYPE = 'CSV' ...)FILE_FORMAT = (TYPE = 'CSV' ...)
语法,报
CZLH-60001 parser return nullCZLH-60001 parser return null
。Lakehouse 的正确写法是
USING CSV OPTIONS(...)USING CSV OPTIONS(...)
:
-- ❌ 报错(Snowflake 风格)
COPY INTO ecommerce.customers
FROM VOLUME ecommerce_vol
FILES = ('raw/customers.csv')
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);
-- ✅ 正确
COPY INTO ecommerce.customers
FROM VOLUME ecommerce_vol
USING CSV OPTIONS ('header' = 'true')
FILES ('raw/customers.csv');
2. ODS 层日期列不能用 TIMESTAMP
COPY INTOCOPY INTO
不支持从 CSV 字符串隐式转换为
TIMESTAMPTIMESTAMP
,报
CZLH-42000 implicit cast not allowedCZLH-42000 implicit cast not allowed
。ODS 层日期列必须用
STRINGSTRING
接收,DWD 层再
CASTCAST
:
-- ❌ ODS 建表用 TIMESTAMP,COPY INTO 报错
order_date TIMESTAMP
-- ✅ ODS 用 STRING,DWD 转换时 CAST
order_date STRING -- ODS 层
CAST(order_date AS TIMESTAMP) -- DWD 层使用时
3. CSV 数据中含逗号的字段导致列数溢出
user_events.csvuser_events.csv
第 30 行的
event_dataevent_data
字段值为
products:PROD006,PROD007products:PROD006,PROD007
,含逗号但没有引号包裹,导致
COPY INTOCOPY INTO
报
Expected 9 columns, got 10Expected 9 columns, got 10
。Lakehouse 的
on_error='continue'on_error='continue'
选项在 Arrow 解析层之前就失败,无法跳过。
解决方案:在 Python 中修复源数据,把多余列合并回最后一列:
import csv
rows = list(csv.reader(open('user_events.csv')))
header_len = len(rows[0])
fixed = [r[:header_len-1] + [','.join(r[header_len-1:])]
if len(r) > header_len else r for r in rows]
csv.writer(open('user_events.csv', 'w')).writerows(fixed)
4. cz-cli task save-config --deps 参数含义
--deps--deps
不是传任务名,而是控制依赖操作模式(
keepkeep
/
replacereplace
/
clearclear
)。实际的上游任务通过
--dep-tasks--dep-tasks
传 JSON 数组:
# ❌ 错误(--deps 不接受任务名)
cz-cli task save-config customer_segmentation \
--deps data_quality_check
# ✅ 正确
cz-cli task save-config customer_segmentation \
--deps replace \
--dep-tasks '[{"taskId":10353489,"taskName":"data_quality_check"}]'
端到端验证
运行
python 03_lakehouse/e2e.py --resetpython 03_lakehouse/e2e.py --reset
全量验证:
=== 数据汇总 ===
ecommerce(ODS):
customers 10 行
products 10 行
orders 10 行
order_items 30 行
web_sessions 20 行
page_views 30 行
user_events 30 行
suppliers 9 行
ecommerce_dwd(DWD):
daily_sales_summary 4 行
customer_segments 10 行
product_performance 10 行
ecommerce_ads(ADS):
web_analytics_summary 1 行
customer_changes 0 行
data_quality_metrics 3 行
dq_rules 6 行
dq_assessment 6 行
data_profile 3 行
=== 数据校验 ===
[行数断言]
PASS ecommerce.customers (10)
PASS ecommerce.orders (10)
...(17 张表全部通过)
[ODS 完整性]
PASS customers.customer_id 无 NULL (0)
PASS orders.total_amount 全部 > 0 (0)
PASS order_items.order_id 全部存在于 orders (0)
...
[DWD 业务断言]
PASS customer_segments 覆盖全部客户 (10)
PASS customer_segments 包含 3 个分层 (3)
...
[ADS 业务断言]
PASS dq_assessment 至少 1 条 PASS (6 条)
PASS data_profile 覆盖 3 张表 (3)
...
校验结果:26/26 通过 ✓ 全部通过
=== Studio 任务验证 ===
data_quality_check 触发成功
customer_segmentation 触发成功
product_performance_etl 触发成功
web_analytics_etl 触发成功
daily_sales_summary 触发成功
完整语法对照
场景 MaxCompute Lakehouse 数据保留 LIFECYCLE 365LIFECYCLE 365
TBLPROPERTIES ('data_retention_days' = 'N')TBLPROPERTIES ('data_retention_days' = 'N')
(最长 90 天,控制 Time Travel 保留期)日期时间类型 DATETIMEDATETIME
TIMESTAMPTIMESTAMP
(ODS 层用 STRINGSTRING
)当前时间 GETDATE()GETDATE()
CURRENT_TIMESTAMP()CURRENT_TIMESTAMP()
分区写入 INSERT OVERWRITE TABLE t PARTITION (ds='${bizdate}')INSERT OVERWRITE TABLE t PARTITION (ds='${bizdate}')
Studio 任务中可保留 ${bizdate}${bizdate}
;Python 直接执行时用 f-string 参数变量 ${bizdate}${bizdate}
Studio 任务 SQL 支持;Python/cz-cli 直接执行时用 f-string 数据加载 LOAD DATA INPATH 'oss://...'LOAD DATA INPATH 'oss://...'
COPY INTO ... FROM VOLUME ... USING CSV OPTIONS(...)COPY INTO ... FROM VOLUME ... USING CSV OPTIONS(...)
正则匹配 col RLIKE 'pattern'col RLIKE 'pattern'
col REGEXP 'pattern'col REGEXP 'pattern'
类型转换为字符串 CAST(x AS STRING)CAST(x AS STRING)
CAST(x AS VARCHAR)CAST(x AS VARCHAR)
UDF 注册 CREATE FUNCTION f AS 'Class' USING 'file.py'CREATE FUNCTION f AS 'Class' USING 'file.py'
CREATE EXTERNAL FUNCTION f ... CONNECTION = connCREATE EXTERNAL FUNCTION f ... CONNECTION = conn
任务编排 DataWorks Workflow JSON Studio 任务 + cz-cli taskcz-cli task
任务依赖配置 Workflow JSON dependenciesdependencies
字段 cz-cli task save-config --deps replace --dep-tasks '[...]'cz-cli task save-config --deps replace --dep-tasks '[...]'
相关文档