多引擎 Iceberg 数据湖联邦查询管道最佳实践
上游 Spark、Flink 或 PyIceberg 以 Iceberg 格式向 OSS/S3/COS 写入数据,云器 Lakehouse 通过 External Catalog(Iceberg REST)直接联邦查询,无需复制数据,再由 Dynamic Table 完成增量 Silver/Gold 层加工,输出报表指标。本文以 30 条模拟订单 + 10 条产品维表为数据集,端到端演示该架构的完整搭建过程。
概述
多引擎 Iceberg 数据湖的典型问题是:多个写入引擎各自维护 Iceberg 表,分析层需要将这些数据与内部表联合查询,同时要正确处理 Iceberg 的 DELETE/UPDATE 语义。
云器 Lakehouse 通过以下组合解决这些核心问题:
问题 解决方案 Spark 写入的 Iceberg 表有 DELETE 文件,PIPE 无法识别 External Catalog 通过 REST API 读取 snapshot,正确应用 deletion vector 多引擎分别写入,schema 可能演化 External Catalog 自动跟踪 Iceberg schema 版本,无需手动维护列映射 数据量大,不想全量复制进 Lakehouse External Catalog 零拷贝联邦查询,数据文件留在原 OSS/S3 分析层需要 Silver/Gold 多层加工 Dynamic Table 以 External Catalog 表为上游,自动增量刷新 下游 Spark/Trino 需要读取 Lakehouse 内部表 Lakehouse 自身提供 Iceberg REST Catalog 接口,双向互通
涉及的 SQL 命令
命令 / 功能 用途 说明 CREATE STORAGE CONNECTIONCREATE STORAGE CONNECTION
声明访问 OSS/S3/COS 的凭据 External Catalog 读取 Parquet 数据文件时使用 CREATE CATALOG CONNECTION TYPE ICEBERG_RESTCREATE CATALOG CONNECTION TYPE ICEBERG_REST
连接 Iceberg REST Catalog 服务 存储认证信息(URI、OAuth 等) CREATE EXTERNAL CATALOGCREATE EXTERNAL CATALOG
挂载 Iceberg Catalog,映射为 catalog.schema.table 三层命名 联邦查询入口 SELECT catalog.schema.tableSELECT catalog.schema.table
联邦查询 Iceberg 数据,不落地 支持 snapshot 跳过、delete file 合并 CREATE DYNAMIC TABLECREATE DYNAMIC TABLE
以 External Catalog 表为上游,定义 Silver/Gold 加工逻辑 声明式 SQL,系统自动增量刷新 REFRESH DYNAMIC TABLEREFRESH DYNAMIC TABLE
触发一次手动刷新 首次构建或调试时使用
PIPE 方案 vs External Catalog(Iceberg REST)选型
在接入 Iceberg 数据时,常见的两种方案有不同适用场景:
维度 PIPE(LIST_PURGE / EVENT_NOTIFICATION) External Catalog(Iceberg REST) 文件理解方式 扫描 Parquet 文件,不读 Iceberg 元数据 通过 REST API 读取 snapshot/manifest DELETE/UPDATE 处理 无法识别 delete 文件,只看到数据文件 正确应用 deletion vector,结果准确 Schema 演化 需手动维护列映射,容易出错 自动检测列变更,跟随 Iceberg schema 版本 数据落地 数据写入 Lakehouse 内部表 数据留在 OSS/S3,零拷贝联邦 适用场景 历史文件一次性导入、仅 append 写入 多引擎共享 Iceberg、有 UPDATE/DELETE 的场景 前置条件 需要 Volume + Storage Connection 需要 Iceberg REST Catalog 服务
💡 提示 :如果上游 Spark 只做 append 写入(无 UPDATE/DELETE),PIPE 方案更简单;有行级变更时必须用 External Catalog 才能保证查询结果准确。
前置准备
本文所有示例在
best_practice_iceberg_fedbest_practice_iceberg_fed
Schema 下运行。
CREATE SCHEMA IF NOT EXISTS best_practice_iceberg_fed;
External Catalog 层:连接 Iceberg REST Catalog
前提条件
External Catalog(Iceberg REST)需要预先搭建以下环境:
写入侧 :一个能写 Iceberg 格式的引擎(Apache Spark、Flink、PyIceberg)
Iceberg REST Catalog 服务 :选择以下任意一种
开源自建:Apache Polaris、Apache Gravitino、Project Nessie
云托管:Snowflake Open Catalog、AWS Glue(Iceberg REST 模式)
对象存储 :OSS(阿里云)、S3(AWS)或 COS(腾讯云),存放 Parquet 数据文件
⚠️ 注意 :如果没有实际 Iceberg REST Catalog 服务,无法执行
CREATE CATALOG CONNECTIONCREATE CATALOG CONNECTION
和
CREATE EXTERNAL CATALOGCREATE EXTERNAL CATALOG
DDL——连接会在创建时验证 REST API 可达性。以下 DDL 展示完整语法,供有实际环境时直接使用。
步骤一:创建 Storage Connection
Storage Connection 存储访问 Parquet 数据文件所需的凭据。External Catalog 在读取数据文件时通过该连接鉴权。
OSS(阿里云)示例:
CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_oss_conn
TYPE OSS
ACCESS_ID = '<your-access-key-id>'
ACCESS_KEY = '<your-access-key-secret>'
ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com';
S3(AWS)示例:
CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_s3_conn
TYPE S3
ACCESS_KEY = '<your-access-key-id>'
SECRET_KEY = '<your-secret-access-key>'
ENDPOINT = 's3.cn-north-1.amazonaws.com.cn'
REGION = 'cn-north-1';
步骤二:创建 Catalog Connection(TYPE ICEBERG_REST)
Catalog Connection 存储 Iceberg REST Catalog 服务的 API 端点和认证凭据。
通用 Iceberg REST Catalog(无认证,如 Nessie / 自建 Gravitino):
CREATE CATALOG CONNECTION IF NOT EXISTS iceberg_rest_conn
TYPE ICEBERG_REST
URI = 'https://your-iceberg-catalog.example.com/api/catalog'
ACCESS_REGION = 'cn-hangzhou';
带 OAuth 认证(如 Apache Polaris / Snowflake Open Catalog):
CREATE CATALOG CONNECTION IF NOT EXISTS polaris_conn
TYPE ICEBERG_REST
URI = 'https://<account>.snowflakecomputing.com/polaris/api/catalog'
ACCESS_REGION = 'ap-southeast-1'
OAUTH_CLIENT_ID = '<your-client-id>'
OAUTH_CLIENT_SECRET = '<your-client-secret>'
OAUTH_SCOPE = 'PRINCIPAL_ROLE:ALL'
NAMESPACE = '<your_database>'
WAREHOUSE = '<your_catalog_name>'
WITH PROPERTIES (
'client.region' = 'ap-southeast-1',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);
⚠️ 注意 :
TYPETYPE
后不加
==
,参数之间不加逗号。写成
TYPE = ICEBERG_RESTTYPE = ICEBERG_REST
或
TYPE ICEBERG_REST,TYPE ICEBERG_REST,
会报语法错误。
步骤三:创建 External Catalog
基于 Catalog Connection 创建 External Catalog,映射为三层命名
catalog.schema.tablecatalog.schema.table
:
CREATE EXTERNAL CATALOG iceberg_catalog
CONNECTION iceberg_rest_conn;
创建成功后,可查看 Catalog 中的 Schema 和表:
-- 查看 Catalog 中的所有 Schema
SHOW SCHEMAS IN iceberg_catalog;
-- 查看某个 Schema 下的表
SHOW TABLES IN iceberg_catalog.ecommerce;
-- 联邦查询 Iceberg 表(零数据复制)
SELECT * FROM iceberg_catalog.ecommerce.orders LIMIT 10;
⚠️ 注意 :目前 External Catalog 只有
instance_admininstance_admin
角色可以查询,普通工作区用户无法直接访问。Dynamic Table 的定义 SQL 可以引用 External Catalog 表,并将查询结果存入 Lakehouse 内部,下游可按普通表访问。
模拟数据层:本地表代替 Iceberg 外部表
在没有实际 Iceberg REST Catalog 环境时,用 Lakehouse 内部表模拟 Iceberg 读入后的数据效果,验证后续 Dynamic Table 加工逻辑。
建产品维表
CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_products_local (
product_id STRING,
product_name STRING,
category STRING,
brand STRING,
cost_price DOUBLE,
list_price DOUBLE
);
从本地 CSV 导入(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_products_local.csv' TO USER VOLUME FILE 'doc_products_local.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_iceberg_fed.doc_products_local
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_products_local.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_iceberg_fed.doc_products_local VALUES
('P001','Smartphone X1','Electronics','TechBrand',1800.0,3299.0),
('P002','Laptop Pro 14','Electronics','TechBrand',4200.0,7999.0),
('P003','Wireless Earbuds','Electronics','SoundMax',180.0,499.0),
('P004','Cotton T-Shirt','Apparel','FashionCo',30.0,129.0),
('P005','Running Shoes','Apparel','SportMax',220.0,699.0),
('P006','Coffee Maker','Kitchen','HomeChef',350.0,899.0),
('P007','Yoga Mat','Sports','FitLife',45.0,199.0),
('P008','Backpack 30L','Accessories','TravelPro',80.0,299.0),
('P009','LED Desk Lamp','Furniture','LightUp',55.0,199.0),
('P010','Protein Powder 1kg','Health','NutriPlus',120.0,349.0);
建订单事实表(模拟 Iceberg 外部表读入)
CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_orders_local (
order_id STRING,
customer_id STRING,
product_id STRING,
region STRING,
order_date DATE,
quantity INT,
unit_price DOUBLE,
discount_rate DOUBLE,
status STRING,
ingest_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
从本地 CSV 导入(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_orders_local.csv' TO USER VOLUME FILE 'doc_orders_local.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_iceberg_fed.doc_orders_local
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_orders_local.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_iceberg_fed.doc_orders_local
(order_id, customer_id, product_id, region, order_date, quantity, unit_price, discount_rate, status)
VALUES
('ORD001','C101','P001','East',DATE '2024-01-05',1,3299.0,0.0,'completed'),
('ORD002','C102','P002','West',DATE '2024-01-06',1,7999.0,0.05,'completed'),
('ORD003','C103','P003','East',DATE '2024-01-07',2,499.0,0.0,'completed'),
('ORD004','C104','P004','South',DATE '2024-01-08',3,129.0,0.1,'completed'),
('ORD005','C105','P005','North',DATE '2024-01-09',1,699.0,0.0,'completed'),
('ORD006','C101','P006','East',DATE '2024-01-10',1,899.0,0.0,'completed'),
('ORD007','C106','P007','West',DATE '2024-01-11',2,199.0,0.0,'completed'),
('ORD008','C107','P008','South',DATE '2024-01-12',1,299.0,0.0,'completed'),
('ORD009','C108','P001','North',DATE '2024-01-13',2,3299.0,0.1,'completed'),
('ORD010','C109','P009','East',DATE '2024-01-14',1,199.0,0.0,'completed'),
('ORD011','C110','P010','West',DATE '2024-01-15',2,349.0,0.0,'completed'),
('ORD012','C102','P003','East',DATE '2024-01-16',1,499.0,0.0,'completed'),
('ORD013','C111','P002','North',DATE '2024-01-17',1,7999.0,0.0,'completed'),
('ORD014','C112','P004','South',DATE '2024-01-18',5,129.0,0.15,'completed'),
('ORD015','C103','P005','East',DATE '2024-01-19',1,699.0,0.0,'completed'),
('ORD016','C113','P006','West',DATE '2024-01-20',1,899.0,0.05,'completed'),
('ORD017','C114','P007','North',DATE '2024-01-21',3,199.0,0.0,'completed'),
('ORD018','C115','P008','East',DATE '2024-01-22',2,299.0,0.0,'completed'),
('ORD019','C116','P001','South',DATE '2024-01-23',1,3299.0,0.0,'completed'),
('ORD020','C117','P009','West',DATE '2024-01-24',2,199.0,0.0,'completed'),
('ORD021','C118','P010','North',DATE '2024-01-25',1,349.0,0.0,'completed'),
('ORD022','C101','P002','East',DATE '2024-02-01',1,7999.0,0.0,'completed'),
('ORD023','C119','P003','West',DATE '2024-02-02',3,499.0,0.1,'completed'),
('ORD024','C120','P004','South',DATE '2024-02-03',2,129.0,0.0,'completed'),
('ORD025','C104','P005','East',DATE '2024-02-04',1,699.0,0.0,'completed'),
('ORD026','C121','P001','North',DATE '2024-02-05',1,3299.0,0.0,'cancelled'),
('ORD027','C122','P006','West',DATE '2024-02-06',1,899.0,0.0,'completed'),
('ORD028','C123','P007','East',DATE '2024-02-07',2,199.0,0.0,'completed'),
('ORD029','C124','P010','South',DATE '2024-02-08',3,349.0,0.0,'completed'),
('ORD030','C125','P002','North',DATE '2024-02-09',1,7999.0,0.05,'completed');
验证数据写入:
SELECT COUNT(*) AS order_count FROM best_practice_iceberg_fed.doc_orders_local LIMIT 50;
返回:
Silver 层:Dynamic Table 清洗 + 关联维表
Silver 层从订单表(对应 Iceberg 外部表)关联产品维表,过滤已取消订单,计算实际收入和毛利。
在实际 Iceberg 联邦环境中,将
doc_orders_localdoc_orders_local
替换为
iceberg_catalog.ecommerce.ordersiceberg_catalog.ecommerce.orders
即可使用相同 DDL。
CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders AS
SELECT
o.order_id,
o.customer_id,
o.product_id,
p.product_name,
p.category,
p.brand,
o.region,
o.order_date,
YEAR(o.order_date) AS order_year,
MONTH(o.order_date) AS order_month,
o.quantity,
o.unit_price,
o.discount_rate,
ROUND(o.quantity * o.unit_price * (1 - o.discount_rate), 2) AS net_revenue,
ROUND(o.quantity * (o.unit_price - p.cost_price) * (1 - o.discount_rate), 2) AS gross_profit,
o.status,
o.ingest_ts
FROM best_practice_iceberg_fed.doc_orders_local o
LEFT JOIN best_practice_iceberg_fed.doc_products_local p
ON o.product_id = p.product_id
WHERE o.status = 'completed';
⚠️ 注意 :DDL 中不写
REFRESH INTERVALREFRESH INTERVAL
,刷新调度通过 Studio Task 管理,见下方"配置刷新调度"章节。
手动触发首次刷新后查询 Silver 层结果:
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders;
SELECT order_id, customer_id, product_name, category, region, order_date, net_revenue, gross_profit
FROM best_practice_iceberg_fed.dt_silver_orders
ORDER BY order_date
LIMIT 10;
返回:
order_id customer_id product_name category region order_date net_revenue gross_profit ORD001 C101 Smartphone X1 Electronics East 2024-01-05 3299.0 1499.0 ORD002 C102 Laptop Pro 14 Electronics West 2024-01-06 7599.05 3609.05 ORD003 C103 Wireless Earbuds Electronics East 2024-01-07 998.0 638.0 ORD004 C104 Cotton T-Shirt Apparel South 2024-01-08 348.3 267.3 ORD005 C105 Running Shoes Apparel North 2024-01-09 699.0 479.0 ORD006 C101 Coffee Maker Kitchen East 2024-01-10 899.0 549.0 ORD007 C106 Yoga Mat Sports West 2024-01-11 398.0 308.0 ORD008 C107 Backpack 30L Accessories South 2024-01-12 299.0 219.0 ORD009 C108 Smartphone X1 Electronics North 2024-01-13 5938.2 2698.2 ORD010 C109 LED Desk Lamp Furniture East 2024-01-14 199.0 144.0
Silver 层过滤了 status='cancelled' 的 ORD026(3299 元),仅保留 29 条 completed 订单。
net_revenuenet_revenue
已扣除折扣,
gross_profitgross_profit
已扣除成本。
Gold 层:Dynamic Table 聚合指标
Gold 层在 Silver 基础上按 region、category、年月聚合,输出各维度的订单量、总收入、毛利和利润率,供 BI 工具直接查询。
CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics AS
SELECT
region,
category,
order_year,
order_month,
COUNT(order_id) AS order_count,
SUM(quantity) AS total_qty,
ROUND(SUM(net_revenue), 2) AS total_revenue,
ROUND(SUM(gross_profit), 2) AS total_profit,
ROUND(SUM(gross_profit) / NULLIF(SUM(net_revenue), 0) * 100, 2) AS profit_margin_pct
FROM best_practice_iceberg_fed.dt_silver_orders
GROUP BY region, category, order_year, order_month;
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics;
SELECT region, category, order_year, order_month,
order_count, total_revenue, total_profit, profit_margin_pct
FROM best_practice_iceberg_fed.dt_gold_regional_metrics
ORDER BY total_revenue DESC
LIMIT 10;
返回:
region category order_year order_month order_count total_revenue total_profit profit_margin_pct North Electronics 2024 1 2 13937.2 6497.2 46.62 East Electronics 2024 2 1 7999.0 3799.0 47.49 West Electronics 2024 1 1 7599.05 3609.05 47.49 North Electronics 2024 2 1 7599.05 3609.05 47.49 East Electronics 2024 1 3 4796.0 2456.0 51.21 South Electronics 2024 1 1 3299.0 1499.0 45.44 West Electronics 2024 2 1 1347.3 861.3 63.93 South Health 2024 2 1 1047.0 687.0 65.62 East Kitchen 2024 1 1 899.0 549.0 61.07 West Kitchen 2024 2 1 899.0 549.0 61.07
Electronics 类目收入最高(North 1 月 13,937 元),Health、Kitchen 类目利润率相对更高(60%+)。BI 工具可直接连接该表输出分区销售看板。
配置刷新调度
Dynamic Table 的定期刷新通过 Studio Task 管理,而不是写在 DDL 里。好处是可以在同一任务上附加监控告警、数据质量检查规则,统一运维入口。
已在
best_practices/iceberg_fed/best_practices/iceberg_fed/
路径下创建以下任务:
任务名 SQL 内容 调度周期 refresh_dt_silver_ordersrefresh_dt_silver_orders
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_ordersREFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders
每小时(0 0/1 * * ?0 0/1 * * ?
) refresh_dt_gold_metricsrefresh_dt_gold_metrics
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metricsREFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics
每小时(0 0/1 * * ?0 0/1 * * ?
)
创建任务的 cz-cli 操作流程:
💡 提示 :下方示例通过 cz-cli (云器 ClickZetta Lakehouse 命令行工具)执行操作。如未安装 cz-cli,请参阅 cz-cli 安装与使用指南 ;如不使用命令行,也可在 ClickZetta Studio 的 开发 -> SQL 编辑器 中执行 SQL,在 Studio -> 任务 页面配置和触发调度任务。
# 1. 在 best_practices 下创建子目录
cz-cli task create-folder iceberg_fed -p skill_test --parent 186117
# 2. 创建 Silver 刷新任务(folder ID 为上一步返回值)
cz-cli task create refresh_dt_silver_orders -p skill_test --type SQL --folder <folder_id>
cz-cli task save-content refresh_dt_silver_orders -p skill_test \
--content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders"
cz-cli task save-cron refresh_dt_silver_orders -p skill_test --cron "0 0/1 * * ?"
# 3. 创建 Gold 刷新任务
cz-cli task create refresh_dt_gold_metrics -p skill_test --type SQL --folder <folder_id>
cz-cli task save-content refresh_dt_gold_metrics -p skill_test \
--content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics"
cz-cli task save-cron refresh_dt_gold_metrics -p skill_test --cron "0 0/1 * * ?"
# 4. 发布上线
cz-cli task deploy refresh_dt_silver_orders -p skill_test
cz-cli task deploy refresh_dt_gold_metrics -p skill_test
💡 提示 :发布上线后,可在 Studio 任务页面为每个任务添加"数据质量检查"规则(如 Silver 层行数不为 0)和"告警通知"(如刷新失败发送飞书/钉钉消息),无需修改 Dynamic Table DDL。
双向互通:Lakehouse 作为 Iceberg REST Provider
除了读取外部 Iceberg 表,云器 Lakehouse 本身也对外提供标准 Iceberg REST Catalog 接口。外部的 Spark、Trino 等引擎可以反向读取 Lakehouse 内部表,实现双向数据共享:
方向一(本文主流程) :外部 Spark 写 Iceberg → Lakehouse External Catalog 联邦读
方向二(反向) :Lakehouse 内部表 → 对外暴露 Iceberg REST API → 外部 Spark/Trino 读取
对外提供 Iceberg REST API 的配置参考:Spark 通过 Iceberg REST Catalog 访问 Lakehouse
注意事项
相关文档