MaxCompute → Lakehouse 迁移实战:电商数据工程项目

如果你的数据工程项目跑在 MaxCompute + DataWorks 上,迁移到 ClickZetta Lakehouse 的核心工作量集中在两块:SQL 语法适配任务编排替换。MaxCompute SQL 与 Lakehouse SQL 的差异比你想象的少——大部分标准 SQL(JOIN、窗口函数、CTE、聚合)写法完全一致,需要改的主要是 6 处 MaxCompute 专有语法,以及把 DataWorks 任务节点迁移为 Studio 任务。

本文用一个真实项目验证这一点:将基于 MaxCompute + DataWorks 的电商数据工程项目完整迁移到 ClickZetta Lakehouse,覆盖 ODS/DWD/ADS 三层、8 张原始表、5 个 ETL 任务,经过完整端到端验证,全部通过。

原始项目

  • 来源rcdelacruz/dataworks-maxcompute-practice
  • 数据集:电商场景,8 张表(customers / products / orders / order_items / web_sessions / page_views / user_events / suppliers)
  • 任务编排:DataWorks
    daily_etl_workflow.json
    daily_etl_workflow.json
    ,5 个节点,每天 02:00 触发
  • UDF:Python 文本分析(情感分析、关键词提取)+ Java 字符串处理

结论先行

你不需要重写业务逻辑,也不需要重新搭建任务编排体系。6 处 SQL 语法替换覆盖所有改动;DataWorks 的任务依赖关系可以完整迁移到 Studio,迁移后用

cz-cli
cz-cli
全自动化,比原来更容易维护。

  • SQL 改动量:6 处语法替换,其余标准 SQL 零改动
  • 任务编排:5 个 DataWorks 节点 → 5 个 Studio 任务,依赖关系完整保留;迁移后通过
    cz-cli
    cz-cli
    全自动化,
    setup.py
    setup.py
    一键完成
  • 数据验证:ODS 8 张表 / DWD 3 张汇总表 / ADS 3 张分析表,全部跑通
  • UDF:代码逻辑零改动,部署方式从"引擎内"改为"云函数服务"

技术栈对比

维度MaxCompute + DataWorksClickZetta Lakehouse
计算引擎MaxCompute(ODPS)Lakehouse SQL Engine
任务编排DataWorks Workflow JSON(描述性配置,原项目未实现 API 自动化,需手动在控制台创建任务)Studio 任务 + cz-cli(全命令行操作,
setup.py
setup.py
一键自动创建任务、配置依赖、发布上线)
对象存储OSS(LOAD DATA INPATH)Volume(COPY INTO FROM VOLUME)
UDF内联 Python/Java(引擎内执行)External Function(云函数服务)
数据保留
LIFECYCLE 365
LIFECYCLE 365
(自动删除数据,最长无限制)
TBLPROPERTIES ('data_retention_days' = '7')
TBLPROPERTIES ('data_retention_days' = '7')
(Time Travel 历史版本保留,最长 90 天;数据物理删除需手动或定期任务处理)
分区PARTITIONED BY (ds STRING)相同
参数变量
${bizdate}
${bizdate}
Studio 任务 SQL 中可直接使用
${bizdate}
${bizdate}
;Python/cz-cli 直接执行时用 f-string 拼接

项目背景

原始项目是一个标准的电商数据工程实践,包含三层数据架构:

  • ODS 层:8 张原始表,从 OSS 加载 CSV 数据
  • DWD 层:每日销售汇总、客户分层、商品表现分析
  • ADS 层:Web 流量分析、增量变更检测、数据质量监控

DataWorks Workflow 定义了 5 个任务节点的依赖关系:

data_quality_check ├── customer_segmentation ├── product_performance_etl └── web_analytics_etl └── daily_sales_summary(依赖前两个)

迁移步骤

第一步:建立 Repo 和连接配置

Fork 原始 repo 到

clickzetta/
clickzetta/
组织,重组目录结构:

gh repo fork rcdelacruz/dataworks-maxcompute-practice \ --org clickzetta \ --fork-name maxcompute2lakehouse-ecommerce gh repo clone clickzetta/maxcompute2lakehouse-ecommerce

原始代码归入

01_source/
01_source/
,迁移代码放
03_lakehouse/
03_lakehouse/

maxcompute2lakehouse-ecommerce/ ├── 01_source/ # 原始 MaxCompute 代码(原样保留) ├── 02_migration/ # 语法差异说明、任务映射文档 ├── 03_lakehouse/ # 迁移后代码 │ ├── sql/ # Lakehouse SQL │ ├── tasks/ # Studio 任务列表 │ ├── udf/ # External Function 代码 │ ├── setup.py # 一键初始化 │ └── e2e.py # 端到端验证 └── data/ # 8 个 CSV 样本文件

配置连接(

.env
.env
):

cp .env.example .env # 填写 CLICKZETTA_SERVICE / INSTANCE / WORKSPACE / USERNAME / PASSWORD

初始化环境(建 profile、建表、上传数据、创建 Studio 任务):

pip install -r requirements.txt python 03_lakehouse/setup.py

setup.py
setup.py
自动完成 6 步:创建 cz-cli profile → 建 Schema → 建 Volume 并上传 CSV → 建表 → COPY INTO 加载数据 → 创建 Studio 任务(含依赖配置和 cron 调度)。

第二步:SQL 语法适配(6 处改动)

MaxCompute 与 Lakehouse SQL 的差异集中在以下 6 处,其余标准 SQL 写法完全一致。

1. LIFECYCLE → TBLPROPERTIES data_retention_days

MaxCompute 用

LIFECYCLE
LIFECYCLE
控制数据自动删除天数(无上限)。Lakehouse 用
data_retention_days
data_retention_days
控制 Time Travel 历史版本保留期(最长 90 天),两者语义不完全相同——
data_retention_days
data_retention_days
不会自动删除当前数据,只影响历史版本可回溯的时间窗口:

-- MaxCompute(自动删除超期数据) CREATE TABLE orders (...) LIFECYCLE 365; -- Lakehouse(设置 Time Travel 保留期,最长 90 天) CREATE TABLE orders (...); ALTER TABLE orders SET PROPERTIES ('data_retention_days' = '7'); -- 或建表时直接指定 CREATE TABLE orders (...) TBLPROPERTIES ('data_retention_days' = '7');

2. DATETIME → STRING(ODS 层)

MaxCompute 的

DATETIME
DATETIME
类型在
COPY INTO
COPY INTO
时无法从 CSV 字符串隐式转换,ODS 层统一用
STRING
STRING
接收原始值,DWD 层转换时再
CAST
CAST

-- MaxCompute ODS order_date DATETIME -- Lakehouse ODS(接收 CSV 原始字符串) order_date STRING -- Lakehouse DWD(转换时显式 CAST) CAST(order_date AS TIMESTAMP)

3. LOAD DATA INPATH → COPY INTO FROM VOLUME

MaxCompute 从 OSS 加载数据用

LOAD DATA INPATH
LOAD DATA INPATH
,Lakehouse 用
COPY INTO FROM VOLUME
COPY INTO FROM VOLUME
,语法结构不同:

-- MaxCompute LOAD DATA INPATH 'oss://bucket/data/customers.csv' INTO TABLE customers; -- Lakehouse COPY INTO ecommerce.customers FROM VOLUME ecommerce.ecommerce_vol USING CSV OPTIONS ('header' = 'true') FILES ('raw/customers.csv');

4. ${bizdate} 参数变量

Studio 任务 SQL 中可以直接使用

${bizdate}
${bizdate}
,运行时由调度系统替换,与 DataWorks 行为一致。只有在 Python 代码或 cz-cli 直接执行 SQL 时,
${bizdate}
${bizdate}
不会被替换(返回空字符串),此时需要用 f-string 动态拼接:

# Studio 任务 SQL 中可直接保留(调度运行时替换) INSERT OVERWRITE TABLE daily_sales PARTITION (ds = '${bizdate}') ... # Python / cz-cli 直接执行时,需要 f-string 传入 bizdate = "20240115" session.sql(f""" INSERT OVERWRITE TABLE ecommerce_dwd.daily_sales PARTITION (ds = '{bizdate}') SELECT ... """).collect()

5. GETDATE() → CURRENT_TIMESTAMP()

-- MaxCompute GETDATE() -- Lakehouse CURRENT_TIMESTAMP()

6. RLIKE → REGEXP,CAST AS STRING → CAST AS VARCHAR

-- MaxCompute email RLIKE '[A-Za-z0-9+_.-]+@...$' CAST(count AS STRING) -- Lakehouse email REGEXP '[A-Za-z0-9+_.-]+@...$' CAST(count AS VARCHAR)

完全兼容、零改动的部分:

  • JOIN
    JOIN
    (INNER / LEFT / RIGHT / FULL OUTER / SELF)
  • 窗口函数(
    ROW_NUMBER
    ROW_NUMBER
    RANK
    RANK
    DENSE_RANK
    DENSE_RANK
    NTILE
    NTILE
    LAG
    LAG
    LEAD
    LEAD
    SUM OVER
    SUM OVER
    AVG OVER
    AVG OVER
  • CTE(
    WITH ... AS (...)
    WITH ... AS (...)
  • CASE WHEN
    CASE WHEN
    COALESCE
    COALESCE
    NULLIF
    NULLIF
  • DATE_FORMAT
    DATE_FORMAT
    DATEDIFF
    DATEDIFF
    YEAR
    YEAR
    MONTH
    MONTH
    DAYOFWEEK
    DAYOFWEEK
  • CONCAT
    CONCAT
    UPPER
    UPPER
    LOWER
    LOWER
    LIKE
    LIKE
  • PARTITIONED BY (ds STRING)
    PARTITIONED BY (ds STRING)
    分区语法
  • INSERT OVERWRITE TABLE ... PARTITION (...)
    INSERT OVERWRITE TABLE ... PARTITION (...)
  • UNION ALL
    UNION ALL
    HAVING
    HAVING
    LIMIT
    LIMIT
    OFFSET
    OFFSET

第三步:数据加载

setup.py
setup.py
data/
data/
目录下的 8 个 CSV 文件上传到 Volume,再用
COPY INTO
COPY INTO
加载:

# 上传到 Volume session.file.put(str(csv_file), f"vol://ecommerce.ecommerce_vol/raw/") # 加载(SQL 文件中) COPY INTO ecommerce.orders PARTITION (ds = '20240115') FROM VOLUME ecommerce.ecommerce_vol USING CSV OPTIONS ('header' = 'true') FILES ('raw/orders.csv');

实测加载结果:

行数
customers10
products10
orders10
order_items30
web_sessions20
page_views30
user_events30
suppliers9

第四步:迁移 DataWorks Workflow → Studio 任务

原始项目的

daily_etl_workflow.json
daily_etl_workflow.json
是描述性配置文件。DataWorks 提供了 API 可以实现任务自动化,但原项目未实现,实际使用时需要登录控制台手动逐个创建节点、配置依赖和调度。

迁移后,

setup.py
setup.py
的第 6 步通过
cz-cli task
cz-cli task
全自动完成:创建任务、写入 SQL 内容、配置依赖、设置 cron、发布上线,无需任何手动操作。

如需手动操作或了解底层命令,核心步骤如下:

# 创建任务(--profile 保证上下文一致) cz-cli task create data_quality_check --type SQL \ --folder ecommerce_etl --profile ecommerce_dev # 写入 SQL 内容 cz-cli task save-content data_quality_check \ --file 03_lakehouse/sql/06_data_quality.sql \ --profile ecommerce_dev # 配置依赖(dep-tasks 需要 taskId + taskName 的 JSON 数组,taskId 通过 cz-cli task list 获取) cz-cli task save-config customer_segmentation \ --deps replace \ --dep-tasks '[{"taskId":10353489,"taskName":"data_quality_check"}]' \ --profile ecommerce_dev # 配置 cron 调度(每天 02:00) cz-cli task save-cron data_quality_check \ --cron "0 2 * * *" --profile ecommerce_dev # 发布上线 cz-cli task deploy data_quality_check --profile ecommerce_dev

DataWorks → Studio 任务映射:

DataWorks 节点Studio 任务依赖
data_quality_checkdata_quality_check无(入口)
customer_segmentationcustomer_segmentationdata_quality_check
product_performanceproduct_performance_etldata_quality_check
web_analytics_summaryweb_analytics_etldata_quality_check
daily_sales_summarydaily_sales_summarycustomer_segmentation + product_performance_etl

第五步:UDF 迁移

MaxCompute UDF 在引擎内直接执行,Lakehouse UDF 需要部署到云函数服务(阿里云 FC / 腾讯云 SCF)。代码逻辑零改动,只需适配 Lakehouse External Function 规范:

# MaxCompute(继承 com.aliyun.odps.udf.UDF) from odps.udf import annotate @annotate("string->string") class Upper(BaseUDTF): def evaluate(self, arg): return arg.upper() # Lakehouse(适配 cz.udf,其余不变) try: from cz.udf import annotate except ImportError: annotate = lambda _: lambda cls: cls # 本地开发占位 @annotate("string->string") class Upper(object): def evaluate(self, arg): return arg.upper() if arg else None

注册函数:

CREATE EXTERNAL FUNCTION IF NOT EXISTS ecommerce.text_sentiment(text STRING) RETURNS STRING AS 'text_analytics.TextSentiment' USING FILE = 'volume:user://~/text_analytics.zip' CONNECTION = ecommerce_fc_conn WITH PROPERTIES ('remote.udf.api' = 'python3.mc.v0');

注意事项

1. COPY INTO 语法与 Snowflake 不同

第一次写成了 Snowflake 风格的

FILE_FORMAT = (TYPE = 'CSV' ...)
FILE_FORMAT = (TYPE = 'CSV' ...)
语法,报
CZLH-60001 parser return null
CZLH-60001 parser return null
。Lakehouse 的正确写法是
USING CSV OPTIONS(...)
USING CSV OPTIONS(...)

-- ❌ 报错(Snowflake 风格) COPY INTO ecommerce.customers FROM VOLUME ecommerce_vol FILES = ('raw/customers.csv') FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1); -- ✅ 正确 COPY INTO ecommerce.customers FROM VOLUME ecommerce_vol USING CSV OPTIONS ('header' = 'true') FILES ('raw/customers.csv');

2. ODS 层日期列不能用 TIMESTAMP

COPY INTO
COPY INTO
不支持从 CSV 字符串隐式转换为
TIMESTAMP
TIMESTAMP
,报
CZLH-42000 implicit cast not allowed
CZLH-42000 implicit cast not allowed
。ODS 层日期列必须用
STRING
STRING
接收,DWD 层再
CAST
CAST

-- ❌ ODS 建表用 TIMESTAMP,COPY INTO 报错 order_date TIMESTAMP -- ✅ ODS 用 STRING,DWD 转换时 CAST order_date STRING -- ODS 层 CAST(order_date AS TIMESTAMP) -- DWD 层使用时

3. CSV 数据中含逗号的字段导致列数溢出

user_events.csv
user_events.csv
第 30 行的
event_data
event_data
字段值为
products:PROD006,PROD007
products:PROD006,PROD007
,含逗号但没有引号包裹,导致
COPY INTO
COPY INTO
Expected 9 columns, got 10
Expected 9 columns, got 10
。Lakehouse 的
on_error='continue'
on_error='continue'
选项在 Arrow 解析层之前就失败,无法跳过。

解决方案:在 Python 中修复源数据,把多余列合并回最后一列:

import csv rows = list(csv.reader(open('user_events.csv'))) header_len = len(rows[0]) fixed = [r[:header_len-1] + [','.join(r[header_len-1:])] if len(r) > header_len else r for r in rows] csv.writer(open('user_events.csv', 'w')).writerows(fixed)

4. cz-cli task save-config --deps 参数含义

--deps
--deps
不是传任务名,而是控制依赖操作模式(
keep
keep
/
replace
replace
/
clear
clear
)。实际的上游任务通过
--dep-tasks
--dep-tasks
传 JSON 数组:

# ❌ 错误(--deps 不接受任务名) cz-cli task save-config customer_segmentation \ --deps data_quality_check # ✅ 正确 cz-cli task save-config customer_segmentation \ --deps replace \ --dep-tasks '[{"taskId":10353489,"taskName":"data_quality_check"}]'

端到端验证

运行

python 03_lakehouse/e2e.py --reset
python 03_lakehouse/e2e.py --reset
全量验证:

=== 数据汇总 === ecommerce(ODS): customers 10 行 products 10 行 orders 10 行 order_items 30 行 web_sessions 20 行 page_views 30 行 user_events 30 行 suppliers 9 行 ecommerce_dwd(DWD): daily_sales_summary 4 行 customer_segments 10 行 product_performance 10 行 ecommerce_ads(ADS): web_analytics_summary 1 行 customer_changes 0 行 data_quality_metrics 3 行 dq_rules 6 行 dq_assessment 6 行 data_profile 3 行 === 数据校验 === [行数断言] PASS ecommerce.customers (10) PASS ecommerce.orders (10) ...(17 张表全部通过) [ODS 完整性] PASS customers.customer_id 无 NULL (0) PASS orders.total_amount 全部 > 0 (0) PASS order_items.order_id 全部存在于 orders (0) ... [DWD 业务断言] PASS customer_segments 覆盖全部客户 (10) PASS customer_segments 包含 3 个分层 (3) ... [ADS 业务断言] PASS dq_assessment 至少 1 条 PASS (6 条) PASS data_profile 覆盖 3 张表 (3) ... 校验结果:26/26 通过 ✓ 全部通过 === Studio 任务验证 === data_quality_check 触发成功 customer_segmentation 触发成功 product_performance_etl 触发成功 web_analytics_etl 触发成功 daily_sales_summary 触发成功

完整语法对照

场景MaxComputeLakehouse
数据保留
LIFECYCLE 365
LIFECYCLE 365
TBLPROPERTIES ('data_retention_days' = 'N')
TBLPROPERTIES ('data_retention_days' = 'N')
(最长 90 天,控制 Time Travel 保留期)
日期时间类型
DATETIME
DATETIME
TIMESTAMP
TIMESTAMP
(ODS 层用
STRING
STRING
当前时间
GETDATE()
GETDATE()
CURRENT_TIMESTAMP()
CURRENT_TIMESTAMP()
分区写入
INSERT OVERWRITE TABLE t PARTITION (ds='${bizdate}')
INSERT OVERWRITE TABLE t PARTITION (ds='${bizdate}')
Studio 任务中可保留
${bizdate}
${bizdate}
;Python 直接执行时用 f-string
参数变量
${bizdate}
${bizdate}
Studio 任务 SQL 支持;Python/cz-cli 直接执行时用 f-string
数据加载
LOAD DATA INPATH 'oss://...'
LOAD DATA INPATH 'oss://...'
COPY INTO ... FROM VOLUME ... USING CSV OPTIONS(...)
COPY INTO ... FROM VOLUME ... USING CSV OPTIONS(...)
正则匹配
col RLIKE 'pattern'
col RLIKE 'pattern'
col REGEXP 'pattern'
col REGEXP 'pattern'
类型转换为字符串
CAST(x AS STRING)
CAST(x AS STRING)
CAST(x AS VARCHAR)
CAST(x AS VARCHAR)
UDF 注册
CREATE FUNCTION f AS 'Class' USING 'file.py'
CREATE FUNCTION f AS 'Class' USING 'file.py'
CREATE EXTERNAL FUNCTION f ... CONNECTION = conn
CREATE EXTERNAL FUNCTION f ... CONNECTION = conn
任务编排DataWorks Workflow JSONStudio 任务 +
cz-cli task
cz-cli task
任务依赖配置Workflow JSON
dependencies
dependencies
字段
cz-cli task save-config --deps replace --dep-tasks '[...]'
cz-cli task save-config --deps replace --dep-tasks '[...]'

相关文档

联系我们
预约咨询
微信咨询
电话咨询