Hive → Lakehouse 迁移实战:电商用户行为数仓
如果你的数仓跑在 Hive 上,迁移到 ClickZetta Lakehouse 的工作量比你想象的低。Hive SQL 的核心语法——SELECT、JOIN、GROUP BY、窗口函数、条件聚合——在 Lakehouse 里直接运行,不需要改。改动集中在 4 处:存储格式声明、数据加载方式、动态分区 SET 语句、SerDe 配置。
本文用一个真实项目验证这一点:将基于 Hive 4.0 的电商用户行为数仓(ODS → DWD → DWS → ADS 四层架构)完整迁移到 ClickZetta Lakehouse,经过 10 项自动化验证,全部通过。
完整代码见 GitHub:hive2lakehouse-ecommerce-events
原始项目
数据来源是 Kaggle 电商用户行为数据集,包含 2019 年 10–11 月某化妆品电商网站的用户行为日志,字段包括
event_time
event_time
、
event_type
event_type
(view/cart/purchase)、
product_id
product_id
、
category_code
category_code
、
brand
brand
、
price
price
、
user_id
user_id
、
user_session
user_session
,共约 650 万行。
Hive 实现在 Docker 容器(
apache/hive:4.0.1
apache/hive:4.0.1
)中运行,四层架构如下:
| 层 | 表 | 说明 |
|---|
| ODS | ods_events_raw
ods_events_raw | 原始事件,按日期分区,ORC 格式 |
| DWD | dwd_events_clean
dwd_events_clean | 清洗后事件,按 user_id 分 8 桶,ORC 格式 |
| DWS | dws_user_behavior
dws_user_behavior | 用户行为日汇总(view/cart/purchase 次数和金额) |
| ADS | ads_funnel_daily
ads_funnel_daily | 每日漏斗转化率(view→cart→purchase) |
迁移后的代码在
03_lakehouse/
03_lakehouse/
目录,可与
01_hive/
01_hive/
逐文件对照。
结论先行
你的 SQL 查询逻辑一行不用改。5 处改动全在建表语句和数据加载方式上——去掉 Hive 特有的存储配置,替换 LOAD DATA 语法,业务逻辑完整保留。
| 改动项 | 工作量 | 说明 |
|---|
去掉 STORED AS ORC
STORED AS ORC / TBLPROPERTIES
TBLPROPERTIES | 极低 | 直接删除,无逻辑改动 |
LOAD DATA
LOAD DATA → COPY INTO FROM VOLUME
COPY INTO FROM VOLUME | 低 | 语法结构不同,但逻辑一致 |
| 删除动态分区 SET 语句 | 极低 | 直接删除 3 行 SET |
| 去掉 SerDe 配置 | 极低 | 直接删除 ROW FORMAT SERDE
ROW FORMAT SERDE 块 |
SELECT / JOIN / GROUP BY / 窗口函数 / 条件聚合——这些数仓的核心操作,语法完全一致,不需要改。
技术栈对比
| Hive 4.0 | Lakehouse |
|---|
| 存储格式 | ORC(需显式声明 STORED AS ORC
STORED AS ORC ) | 原生 Parquet(无需声明) |
| CSV 解析 | ROW FORMAT SERDE 'OpenCSVSerde'
ROW FORMAT SERDE 'OpenCSVSerde' | COPY INTO ... USING CSV OPTIONS (...)
COPY INTO ... USING CSV OPTIONS (...) |
| 分桶加速 | CLUSTERED BY (col) INTO N BUCKETS
CLUSTERED BY (col) INTO N BUCKETS | 语法相同,直接兼容 |
| 动态分区 | 需要 3 条 SET 语句才能开启 | 默认开启,无需 SET |
| 数据加载 | LOAD DATA LOCAL INPATH
LOAD DATA LOCAL INPATH + staging 表 | COPY INTO FROM VOLUME
COPY INTO FROM VOLUME |
| 运行环境 | Docker 容器(beeline 客户端) | cz-cli / Python SDK |

项目背景

数据架构分四层,每层对应一个 Schema:
- ODS(
ecommerce_ods
ecommerce_ods
):原始数据,按日期分区,保留原始字段
- DWD(
ecommerce_dwd
ecommerce_dwd
):清洗层,event_time
event_time
转 TIMESTAMP,category_code
category_code
拆分为三级,过滤脏数据
- DWS(
ecommerce_dws
ecommerce_dws
):用户行为日汇总,每用户每天的 view/cart/purchase 次数和消费金额
- ADS(
ecommerce_ads
ecommerce_ads
):漏斗分析,每日 view→cart→purchase 转化率
sample 数据(20 行)验证结果:
| 指标 | 值 |
|---|
| ODS 行数 | 19(过滤 1 行列偏移脏数据) |
| DWD 行数 | 19 |
| DWS 用户数 | 6 |
| view 用户数 | 6 |
| cart 用户数 | 4 |
| purchase 用户数 | 3 |
| view→cart 转化率 | 66.67% |
| cart→purchase 转化率 | 75% |
迁移步骤
第一步:去掉存储格式声明
Hive 每张表都需要声明存储格式,Lakehouse 原生 Parquet,直接删除。
-- Hive
CREATE TABLE dwd_events_clean (...)
PARTITIONED BY (dt STRING)
CLUSTERED BY (user_id) INTO 8 BUCKETS
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
-- Lakehouse:删掉最后三行即可
CREATE TABLE dwd_events_clean (...)
PARTITIONED BY (dt STRING);
第二步:替换数据加载方式
Hive 用
LOAD DATA
LOAD DATA
加文件到 EXTERNAL TABLE,Lakehouse 用
COPY INTO FROM VOLUME
COPY INTO FROM VOLUME
。
-- Hive:两步(staging 表 + INSERT)
LOAD DATA LOCAL INPATH '/tmp/events.csv'
OVERWRITE INTO TABLE ods_events_staging;
INSERT OVERWRITE TABLE ods_events_raw PARTITION (dt)
SELECT ..., SUBSTR(event_time, 1, 10) AS dt
FROM ods_events_staging
WHERE event_type IN ('view', 'cart', 'purchase');
-- Lakehouse:同样两步,但语法不同
-- COPY INTO 不支持计算列(如提取 dt),先 COPY INTO staging,再 INSERT INTO 分区表
COPY INTO ecommerce_ods.ods_events_staging
FROM VOLUME ecommerce_ods.ecommerce_vol
USING CSV
OPTIONS ('header' = 'true', 'nullValue' = '')
FILES ('raw/events_sample.csv')
ON_ERROR = CONTINUE;
INSERT OVERWRITE TABLE ecommerce_ods.ods_events_raw PARTITION (dt)
SELECT ..., SUBSTR(event_time, 1, 10) AS dt
FROM ecommerce_ods.ods_events_staging
WHERE event_type IN ('view', 'cart', 'purchase')
AND user_id > 100000;
第三步:删除动态分区 SET 语句
Hive 动态分区默认关闭,需要 3 条 SET 语句开启。Lakehouse 默认开启,直接删除。
-- Hive(必须,否则报错)
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.enforce.bucketing=true;
INSERT OVERWRITE TABLE dwd_events_clean PARTITION (dt)
SELECT ..., dt FROM ods_events_raw;
-- Lakehouse:直接写,删掉所有 SET
INSERT OVERWRITE TABLE ecommerce_dwd.dwd_events_clean PARTITION (dt)
SELECT ..., dt FROM ecommerce_ods.ods_events_raw;
第四步:去掉 SerDe 配置
Hive 解析 CSV 需要配置 SerDe,Lakehouse 在
COPY INTO
COPY INTO
的
OPTIONS
OPTIONS
里指定,建表时无需任何格式配置。
-- Hive:建表时必须配置 SerDe
CREATE EXTERNAL TABLE ods_events_staging (...)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "\""
)
STORED AS TEXTFILE
TBLPROPERTIES ("skip.header.line.count"="1");
-- Lakehouse:建表只声明列,格式在 COPY INTO 时指定
CREATE TABLE ecommerce_ods.ods_events_staging (
event_time STRING, event_type STRING, ...
);
-- 加载时指定格式
COPY INTO ecommerce_ods.ods_events_staging
FROM VOLUME ecommerce_ods.ecommerce_vol
USING CSV
OPTIONS ('header' = 'true', 'nullValue' = '')
FILES ('raw/events_sample.csv');
完全兼容的部分
以下 Hive SQL 在 Lakehouse 中直接运行,无需任何修改:
ETL 转换逻辑(ODS → DWD):
-- 两侧完全一致,直接复用
INSERT OVERWRITE TABLE dwd_events_clean PARTITION (dt)
SELECT
CAST(REGEXP_REPLACE(event_time, ' UTC$', '') AS TIMESTAMP) AS event_ts,
event_type,
product_id,
SPLIT(category_code, '\\.')[0] AS category_l1,
CASE WHEN SIZE(SPLIT(category_code, '\\.')) > 1
THEN SPLIT(category_code, '\\.')[1] END AS category_l2,
brand, price, user_id, user_session, dt
FROM ods_events_raw
WHERE price > 0 OR event_type != 'purchase';
聚合逻辑(DWD → DWS/ADS):
-- 两侧完全一致,直接复用
SELECT
COUNT(DISTINCT CASE WHEN event_type = 'view' THEN user_id END) AS view_users,
COUNT(DISTINCT CASE WHEN event_type = 'cart' THEN user_id END) AS cart_users,
COUNT(DISTINCT CASE WHEN event_type = 'purchase' THEN user_id END) AS purchase_users,
ROUND(
COUNT(DISTINCT CASE WHEN event_type = 'cart' THEN user_id END) * 1.0 /
NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'view' THEN user_id END), 0),
4
) AS view_to_cart_rate,
dt
FROM dwd_events_clean
GROUP BY dt;
窗口函数分析:
-- 两侧完全一致,直接复用
SELECT
user_id,
purchase_amt,
RANK() OVER (ORDER BY purchase_amt DESC) AS spending_rank
FROM dws_user_behavior
WHERE dt = '2019-10-01';
注意事项
1. OpenCSVSerde 列偏移
现象:CSV 中有连续空字段(如
brand
brand
为空,出现
,,
,,
)时,OpenCSVSerde 解析会导致后续列向左偏移,
price
price
的值被读成
user_id
user_id
。
示例:
2019-10-01,view,28719074,...,apparel.shoes.keds,,35.79,541312140,...
brand
brand
为空 →
price=35.79
price=35.79
被读成
user_id
user_id
,产生
user_id=35
user_id=35
的脏数据行。
处理方式:在 INSERT INTO ODS 时过滤异常 user_id:
WHERE CAST(user_id AS BIGINT) > 100000
Lakehouse 影响:
COPY INTO
COPY INTO
使用
ON_ERROR=CONTINUE
ON_ERROR=CONTINUE
跳过格式错误行,同样在 INSERT 时过滤。
2. Hive 4.0 分桶表 GROUP BY 返回空
现象:Hive 4.0 + Tez 引擎下,对
CLUSTERED BY
CLUSTERED BY
分桶 ORC 表执行
GROUP BY
GROUP BY
查询返回空结果,但
COUNT(*)
COUNT(*)
走 metadata stats 返回正确行数。
根因:
CombineHiveInputFormat
CombineHiveInputFormat
(默认)在分桶 ORC 表上的执行计划有 bug。
临时解决:
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Lakehouse 影响:无此问题。Lakehouse 同样支持
CLUSTERED BY ... INTO N BUCKETS
CLUSTERED BY ... INTO N BUCKETS
分桶,但不存在这个 bug,GROUP BY 正常工作。
3. COPY INTO 不支持列引用语法
现象:Lakehouse
COPY INTO
COPY INTO
不支持
$1
$1
、
$2
$2
列引用:
-- 报错:Syntax error at or near '$'
COPY INTO t FROM (SELECT $1, $2 FROM VOLUME ...)
正确做法:使用
FROM VOLUME ... USING CSV OPTIONS (...) FILES (...)
FROM VOLUME ... USING CSV OPTIONS (...) FILES (...)
语法,按列顺序直接映射。需要计算列(如提取
dt
dt
)时,先 COPY INTO 无分区 staging 表,再 INSERT INTO 目标表。
4. 动态分区默认行为差异
| 行为 | Hive | Lakehouse |
|---|
| 动态分区开关 | 默认关闭,需 SET hive.exec.dynamic.partition=true
SET hive.exec.dynamic.partition=true | 默认开启 |
| 严格模式 | 默认严格,需 SET ... mode=nonstrict
SET ... mode=nonstrict | 无此限制 |
| 分桶写入 | 需 SET hive.enforce.bucketing=true
SET hive.enforce.bucketing=true | 默认支持,无需 SET |
端到端验证
03_lakehouse/e2e.py
03_lakehouse/e2e.py
对迁移结果执行 10 项自动化检查:
| 检查项 | 预期值 | 结果 |
|---|
| ODS 行数 | 19 | ✓ |
| DWD 行数 | 19 | ✓ |
| DWS 用户数 | 6 | ✓ |
| 漏斗 view_users | 6 | ✓ |
| 漏斗 cart_users | 4 | ✓ |
| 漏斗 purchase_users | 3 | ✓ |
| view→cart 转化率 | 0.6667 | ✓ |
| cart→purchase 转化率 | 0.75 | ✓ |
| 最高消费用户 ID | 526595547 | ✓ |
| 最高消费金额 | 1422.0 | ✓ |
实际运行结果:10/10 通过。
完整兼容性对照
| 类别 | Hive 语法 | Lakehouse | 兼容性 |
|---|
| 分区表 | PARTITIONED BY (dt STRING)
PARTITIONED BY (dt STRING) | 相同 | ✅ 完全兼容 |
| 动态分区写入 | INSERT OVERWRITE ... PARTITION (dt)
INSERT OVERWRITE ... PARTITION (dt) | 相同 | ✅ 完全兼容 |
| REGEXP_REPLACE | REGEXP_REPLACE(col, pattern, replace)
REGEXP_REPLACE(col, pattern, replace) | 相同 | ✅ 完全兼容 |
| SPLIT | SPLIT(col, '\\.')
SPLIT(col, '\\.') | 相同 | ✅ 完全兼容 |
| SIZE | SIZE(SPLIT(...))
SIZE(SPLIT(...)) | 相同 | ✅ 完全兼容 |
| SUBSTR | SUBSTR(col, 1, 10)
SUBSTR(col, 1, 10) | 相同 | ✅ 完全兼容 |
| CAST | CAST(col AS TIMESTAMP)
CAST(col AS TIMESTAMP) | 相同 | ✅ 完全兼容 |
| NULLIF | NULLIF(expr, 0)
NULLIF(expr, 0) | 相同 | ✅ 完全兼容 |
| 条件聚合 | SUM(CASE WHEN ... END)
SUM(CASE WHEN ... END) | 相同 | ✅ 完全兼容 |
| COUNT DISTINCT | COUNT(DISTINCT CASE WHEN ...)
COUNT(DISTINCT CASE WHEN ...) | 相同 | ✅ 完全兼容 |
| 窗口函数 | RANK() OVER (ORDER BY ...)
RANK() OVER (ORDER BY ...) | 相同 | ✅ 完全兼容 |
| STORED AS ORC | STORED AS ORC
STORED AS ORC | 不需要 | ✅ 删除即可 |
| CLUSTERED BY | CLUSTERED BY (col) INTO N BUCKETS
CLUSTERED BY (col) INTO N BUCKETS | 语法相同 | ✅ 完全兼容 |
| OpenCSVSerde | ROW FORMAT SERDE 'OpenCSVSerde'
ROW FORMAT SERDE 'OpenCSVSerde' | 不需要 | ✅ 删除,COPY INTO 替代 |
| LOAD DATA | LOAD DATA LOCAL INPATH
LOAD DATA LOCAL INPATH | 不支持 | ⚠️ 改用 COPY INTO FROM VOLUME |
| 动态分区 SET | SET hive.exec.dynamic.partition=true
SET hive.exec.dynamic.partition=true | 不需要 | ✅ 删除即可 |
迁移结论
Hive 数仓迁移到 Lakehouse 的工作量主要在 DDL 和加载语句,不在业务逻辑。 本项目的 ETL 转换 SQL(REGEXP_REPLACE、SPLIT、条件聚合)和分析查询(窗口函数、漏斗计算)全部直接复用,改动集中在 4 处已知差异点,均为机械性替换。
SQL 层面的收益
- 分桶 ORC 表的 GROUP BY bug 自然消失
- 无需维护 SerDe 配置和 TBLPROPERTIES
- 动态分区默认开启,无需 SET 语句
- COPY INTO 比 LOAD DATA + staging 表更简洁
部署模式的收益
从 Docker 自运维 Hive 集群迁移到 ClickZetta Lakehouse SaaS,带来的不只是 SQL 层面的简化:
| Hive(Docker 自运维) | Lakehouse(SaaS) |
|---|
| 集群运维 | 需要维护 Docker 容器、JVM 参数、YARN 队列 | 无需运维,全托管 |
| 计算资源 | 固定资源,闲时浪费 | 弹性伸缩,按查询计费 |
| 存储格式 | ORC 文件需要定期 compaction | 原生 Parquet,自动管理 |
| 调优成本 | 需要调 Hive 参数、Tez 配置、分桶策略 | 专注 SQL 逻辑,无需底层调优 |
| 版本升级 | 手动升级 Hive、Hadoop 依赖 | 平台自动升级 |
迁移后,数据工程师可以把精力从"让 Hive 跑起来"转移到"让数据产生价值"。
参考