多引擎 Iceberg 数据湖联邦查询管道最佳实践

上游 Spark、Flink 或 PyIceberg 以 Iceberg 格式向 OSS/S3/COS 写入数据,云器 Lakehouse 通过 External Catalog(Iceberg REST)直接联邦查询,无需复制数据,再由 Dynamic Table 完成增量 Silver/Gold 层加工,输出报表指标。本文以 30 条模拟订单 + 10 条产品维表为数据集,端到端演示该架构的完整搭建过程。


概述

多引擎 Iceberg 数据湖的典型问题是:多个写入引擎各自维护 Iceberg 表,分析层需要将这些数据与内部表联合查询,同时要正确处理 Iceberg 的 DELETE/UPDATE 语义。

云器 Lakehouse 通过以下组合解决这些核心问题:

问题解决方案
Spark 写入的 Iceberg 表有 DELETE 文件,PIPE 无法识别External Catalog 通过 REST API 读取 snapshot,正确应用 deletion vector
多引擎分别写入,schema 可能演化External Catalog 自动跟踪 Iceberg schema 版本,无需手动维护列映射
数据量大,不想全量复制进 LakehouseExternal Catalog 零拷贝联邦查询,数据文件留在原 OSS/S3
分析层需要 Silver/Gold 多层加工Dynamic Table 以 External Catalog 表为上游,自动增量刷新
下游 Spark/Trino 需要读取 Lakehouse 内部表Lakehouse 自身提供 Iceberg REST Catalog 接口,双向互通

涉及的 SQL 命令

命令 / 功能用途说明
CREATE STORAGE CONNECTION
CREATE STORAGE CONNECTION
声明访问 OSS/S3/COS 的凭据External Catalog 读取 Parquet 数据文件时使用
CREATE CATALOG CONNECTION TYPE ICEBERG_REST
CREATE CATALOG CONNECTION TYPE ICEBERG_REST
连接 Iceberg REST Catalog 服务存储认证信息(URI、OAuth 等)
CREATE EXTERNAL CATALOG
CREATE EXTERNAL CATALOG
挂载 Iceberg Catalog,映射为 catalog.schema.table 三层命名联邦查询入口
SELECT catalog.schema.table
SELECT catalog.schema.table
联邦查询 Iceberg 数据,不落地支持 snapshot 跳过、delete file 合并
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
以 External Catalog 表为上游,定义 Silver/Gold 加工逻辑声明式 SQL,系统自动增量刷新
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
触发一次手动刷新首次构建或调试时使用

PIPE 方案 vs External Catalog(Iceberg REST)选型

在接入 Iceberg 数据时,常见的两种方案有不同适用场景:

维度PIPE(LIST_PURGE / EVENT_NOTIFICATION)External Catalog(Iceberg REST)
文件理解方式扫描 Parquet 文件,不读 Iceberg 元数据通过 REST API 读取 snapshot/manifest
DELETE/UPDATE 处理无法识别 delete 文件,只看到数据文件正确应用 deletion vector,结果准确
Schema 演化需手动维护列映射,容易出错自动检测列变更,跟随 Iceberg schema 版本
数据落地数据写入 Lakehouse 内部表数据留在 OSS/S3,零拷贝联邦
适用场景历史文件一次性导入、仅 append 写入多引擎共享 Iceberg、有 UPDATE/DELETE 的场景
前置条件需要 Volume + Storage Connection需要 Iceberg REST Catalog 服务

前置准备

本文所有示例在

best_practice_iceberg_fed
best_practice_iceberg_fed
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_iceberg_fed;


External Catalog 层:连接 Iceberg REST Catalog

前提条件

External Catalog(Iceberg REST)需要预先搭建以下环境:

  • 写入侧:一个能写 Iceberg 格式的引擎(Apache Spark、Flink、PyIceberg)
  • Iceberg REST Catalog 服务:选择以下任意一种
    • 开源自建:Apache Polaris、Apache Gravitino、Project Nessie
    • 云托管:Snowflake Open Catalog、AWS Glue(Iceberg REST 模式)
  • 对象存储:OSS(阿里云)、S3(AWS)或 COS(腾讯云),存放 Parquet 数据文件

步骤一:创建 Storage Connection

Storage Connection 存储访问 Parquet 数据文件所需的凭据。External Catalog 在读取数据文件时通过该连接鉴权。

OSS(阿里云)示例:

CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_oss_conn TYPE OSS ACCESS_ID = '<your-access-key-id>' ACCESS_KEY = '<your-access-key-secret>' ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com';

S3(AWS)示例:

CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_s3_conn TYPE S3 ACCESS_KEY = '<your-access-key-id>' SECRET_KEY = '<your-secret-access-key>' ENDPOINT = 's3.cn-north-1.amazonaws.com.cn' REGION = 'cn-north-1';

步骤二:创建 Catalog Connection(TYPE ICEBERG_REST)

Catalog Connection 存储 Iceberg REST Catalog 服务的 API 端点和认证凭据。

通用 Iceberg REST Catalog(无认证,如 Nessie / 自建 Gravitino):

CREATE CATALOG CONNECTION IF NOT EXISTS iceberg_rest_conn TYPE ICEBERG_REST URI = 'https://your-iceberg-catalog.example.com/api/catalog' ACCESS_REGION = 'cn-hangzhou';

带 OAuth 认证(如 Apache Polaris / Snowflake Open Catalog):

CREATE CATALOG CONNECTION IF NOT EXISTS polaris_conn TYPE ICEBERG_REST URI = 'https://<account>.snowflakecomputing.com/polaris/api/catalog' ACCESS_REGION = 'ap-southeast-1' OAUTH_CLIENT_ID = '<your-client-id>' OAUTH_CLIENT_SECRET = '<your-client-secret>' OAUTH_SCOPE = 'PRINCIPAL_ROLE:ALL' NAMESPACE = '<your_database>' WAREHOUSE = '<your_catalog_name>' WITH PROPERTIES ( 'client.region' = 'ap-southeast-1', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO' );

步骤三:创建 External Catalog

基于 Catalog Connection 创建 External Catalog,映射为三层命名

catalog.schema.table
catalog.schema.table

CREATE EXTERNAL CATALOG iceberg_catalog CONNECTION iceberg_rest_conn;

创建成功后,可查看 Catalog 中的 Schema 和表:

-- 查看 Catalog 中的所有 Schema SHOW SCHEMAS IN iceberg_catalog; -- 查看某个 Schema 下的表 SHOW TABLES IN iceberg_catalog.ecommerce; -- 联邦查询 Iceberg 表(零数据复制) SELECT * FROM iceberg_catalog.ecommerce.orders LIMIT 10;


模拟数据层:本地表代替 Iceberg 外部表

在没有实际 Iceberg REST Catalog 环境时,用 Lakehouse 内部表模拟 Iceberg 读入后的数据效果,验证后续 Dynamic Table 加工逻辑。

建产品维表

CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_products_local ( product_id STRING, product_name STRING, category STRING, brand STRING, cost_price DOUBLE, list_price DOUBLE );

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_products_local.csv' TO USER VOLUME FILE 'doc_products_local.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_iceberg_fed.doc_products_local FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_products_local.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_iceberg_fed.doc_products_local VALUES ('P001','Smartphone X1','Electronics','TechBrand',1800.0,3299.0), ('P002','Laptop Pro 14','Electronics','TechBrand',4200.0,7999.0), ('P003','Wireless Earbuds','Electronics','SoundMax',180.0,499.0), ('P004','Cotton T-Shirt','Apparel','FashionCo',30.0,129.0), ('P005','Running Shoes','Apparel','SportMax',220.0,699.0), ('P006','Coffee Maker','Kitchen','HomeChef',350.0,899.0), ('P007','Yoga Mat','Sports','FitLife',45.0,199.0), ('P008','Backpack 30L','Accessories','TravelPro',80.0,299.0), ('P009','LED Desk Lamp','Furniture','LightUp',55.0,199.0), ('P010','Protein Powder 1kg','Health','NutriPlus',120.0,349.0);

建订单事实表(模拟 Iceberg 外部表读入)

CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_orders_local ( order_id STRING, customer_id STRING, product_id STRING, region STRING, order_date DATE, quantity INT, unit_price DOUBLE, discount_rate DOUBLE, status STRING, ingest_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_orders_local.csv' TO USER VOLUME FILE 'doc_orders_local.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_iceberg_fed.doc_orders_local FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_orders_local.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_iceberg_fed.doc_orders_local (order_id, customer_id, product_id, region, order_date, quantity, unit_price, discount_rate, status) VALUES ('ORD001','C101','P001','East',DATE '2024-01-05',1,3299.0,0.0,'completed'), ('ORD002','C102','P002','West',DATE '2024-01-06',1,7999.0,0.05,'completed'), ('ORD003','C103','P003','East',DATE '2024-01-07',2,499.0,0.0,'completed'), ('ORD004','C104','P004','South',DATE '2024-01-08',3,129.0,0.1,'completed'), ('ORD005','C105','P005','North',DATE '2024-01-09',1,699.0,0.0,'completed'), ('ORD006','C101','P006','East',DATE '2024-01-10',1,899.0,0.0,'completed'), ('ORD007','C106','P007','West',DATE '2024-01-11',2,199.0,0.0,'completed'), ('ORD008','C107','P008','South',DATE '2024-01-12',1,299.0,0.0,'completed'), ('ORD009','C108','P001','North',DATE '2024-01-13',2,3299.0,0.1,'completed'), ('ORD010','C109','P009','East',DATE '2024-01-14',1,199.0,0.0,'completed'), ('ORD011','C110','P010','West',DATE '2024-01-15',2,349.0,0.0,'completed'), ('ORD012','C102','P003','East',DATE '2024-01-16',1,499.0,0.0,'completed'), ('ORD013','C111','P002','North',DATE '2024-01-17',1,7999.0,0.0,'completed'), ('ORD014','C112','P004','South',DATE '2024-01-18',5,129.0,0.15,'completed'), ('ORD015','C103','P005','East',DATE '2024-01-19',1,699.0,0.0,'completed'), ('ORD016','C113','P006','West',DATE '2024-01-20',1,899.0,0.05,'completed'), ('ORD017','C114','P007','North',DATE '2024-01-21',3,199.0,0.0,'completed'), ('ORD018','C115','P008','East',DATE '2024-01-22',2,299.0,0.0,'completed'), ('ORD019','C116','P001','South',DATE '2024-01-23',1,3299.0,0.0,'completed'), ('ORD020','C117','P009','West',DATE '2024-01-24',2,199.0,0.0,'completed'), ('ORD021','C118','P010','North',DATE '2024-01-25',1,349.0,0.0,'completed'), ('ORD022','C101','P002','East',DATE '2024-02-01',1,7999.0,0.0,'completed'), ('ORD023','C119','P003','West',DATE '2024-02-02',3,499.0,0.1,'completed'), ('ORD024','C120','P004','South',DATE '2024-02-03',2,129.0,0.0,'completed'), ('ORD025','C104','P005','East',DATE '2024-02-04',1,699.0,0.0,'completed'), ('ORD026','C121','P001','North',DATE '2024-02-05',1,3299.0,0.0,'cancelled'), ('ORD027','C122','P006','West',DATE '2024-02-06',1,899.0,0.0,'completed'), ('ORD028','C123','P007','East',DATE '2024-02-07',2,199.0,0.0,'completed'), ('ORD029','C124','P010','South',DATE '2024-02-08',3,349.0,0.0,'completed'), ('ORD030','C125','P002','North',DATE '2024-02-09',1,7999.0,0.05,'completed');

验证数据写入:

SELECT COUNT(*) AS order_count FROM best_practice_iceberg_fed.doc_orders_local LIMIT 50;

返回:

order_count
30

Silver 层:Dynamic Table 清洗 + 关联维表

Silver 层从订单表(对应 Iceberg 外部表)关联产品维表,过滤已取消订单,计算实际收入和毛利。

在实际 Iceberg 联邦环境中,将

doc_orders_local
doc_orders_local
替换为
iceberg_catalog.ecommerce.orders
iceberg_catalog.ecommerce.orders
即可使用相同 DDL。

CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders AS SELECT o.order_id, o.customer_id, o.product_id, p.product_name, p.category, p.brand, o.region, o.order_date, YEAR(o.order_date) AS order_year, MONTH(o.order_date) AS order_month, o.quantity, o.unit_price, o.discount_rate, ROUND(o.quantity * o.unit_price * (1 - o.discount_rate), 2) AS net_revenue, ROUND(o.quantity * (o.unit_price - p.cost_price) * (1 - o.discount_rate), 2) AS gross_profit, o.status, o.ingest_ts FROM best_practice_iceberg_fed.doc_orders_local o LEFT JOIN best_practice_iceberg_fed.doc_products_local p ON o.product_id = p.product_id WHERE o.status = 'completed';

手动触发首次刷新后查询 Silver 层结果:

REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders;

SELECT order_id, customer_id, product_name, category, region, order_date, net_revenue, gross_profit FROM best_practice_iceberg_fed.dt_silver_orders ORDER BY order_date LIMIT 10;

返回:

order_idcustomer_idproduct_namecategoryregionorder_datenet_revenuegross_profit
ORD001C101Smartphone X1ElectronicsEast2024-01-053299.01499.0
ORD002C102Laptop Pro 14ElectronicsWest2024-01-067599.053609.05
ORD003C103Wireless EarbudsElectronicsEast2024-01-07998.0638.0
ORD004C104Cotton T-ShirtApparelSouth2024-01-08348.3267.3
ORD005C105Running ShoesApparelNorth2024-01-09699.0479.0
ORD006C101Coffee MakerKitchenEast2024-01-10899.0549.0
ORD007C106Yoga MatSportsWest2024-01-11398.0308.0
ORD008C107Backpack 30LAccessoriesSouth2024-01-12299.0219.0
ORD009C108Smartphone X1ElectronicsNorth2024-01-135938.22698.2
ORD010C109LED Desk LampFurnitureEast2024-01-14199.0144.0

Silver 层过滤了 status='cancelled' 的 ORD026(3299 元),仅保留 29 条 completed 订单。

net_revenue
net_revenue
已扣除折扣,
gross_profit
gross_profit
已扣除成本。


Gold 层:Dynamic Table 聚合指标

Gold 层在 Silver 基础上按 region、category、年月聚合,输出各维度的订单量、总收入、毛利和利润率,供 BI 工具直接查询。

CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics AS SELECT region, category, order_year, order_month, COUNT(order_id) AS order_count, SUM(quantity) AS total_qty, ROUND(SUM(net_revenue), 2) AS total_revenue, ROUND(SUM(gross_profit), 2) AS total_profit, ROUND(SUM(gross_profit) / NULLIF(SUM(net_revenue), 0) * 100, 2) AS profit_margin_pct FROM best_practice_iceberg_fed.dt_silver_orders GROUP BY region, category, order_year, order_month;

REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics;

SELECT region, category, order_year, order_month, order_count, total_revenue, total_profit, profit_margin_pct FROM best_practice_iceberg_fed.dt_gold_regional_metrics ORDER BY total_revenue DESC LIMIT 10;

返回:

regioncategoryorder_yearorder_monthorder_counttotal_revenuetotal_profitprofit_margin_pct
NorthElectronics20241213937.26497.246.62
EastElectronics2024217999.03799.047.49
WestElectronics2024117599.053609.0547.49
NorthElectronics2024217599.053609.0547.49
EastElectronics2024134796.02456.051.21
SouthElectronics2024113299.01499.045.44
WestElectronics2024211347.3861.363.93
SouthHealth2024211047.0687.065.62
EastKitchen202411899.0549.061.07
WestKitchen202421899.0549.061.07

Electronics 类目收入最高(North 1 月 13,937 元),Health、Kitchen 类目利润率相对更高(60%+)。BI 工具可直接连接该表输出分区销售看板。


配置刷新调度

Dynamic Table 的定期刷新通过 Studio Task 管理,而不是写在 DDL 里。好处是可以在同一任务上附加监控告警、数据质量检查规则,统一运维入口。

已在

best_practices/iceberg_fed/
best_practices/iceberg_fed/
路径下创建以下任务:

任务名SQL 内容调度周期
refresh_dt_silver_orders
refresh_dt_silver_orders
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders
每小时(
0 0/1 * * ?
0 0/1 * * ?
refresh_dt_gold_metrics
refresh_dt_gold_metrics
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics
REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics
每小时(
0 0/1 * * ?
0 0/1 * * ?

创建任务的 cz-cli 操作流程:

# 1. 在 best_practices 下创建子目录 cz-cli task create-folder iceberg_fed -p skill_test --parent 186117 # 2. 创建 Silver 刷新任务(folder ID 为上一步返回值) cz-cli task create refresh_dt_silver_orders -p skill_test --type SQL --folder <folder_id> cz-cli task save-content refresh_dt_silver_orders -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders" cz-cli task save-cron refresh_dt_silver_orders -p skill_test --cron "0 0/1 * * ?" # 3. 创建 Gold 刷新任务 cz-cli task create refresh_dt_gold_metrics -p skill_test --type SQL --folder <folder_id> cz-cli task save-content refresh_dt_gold_metrics -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics" cz-cli task save-cron refresh_dt_gold_metrics -p skill_test --cron "0 0/1 * * ?" # 4. 发布上线 cz-cli task deploy refresh_dt_silver_orders -p skill_test cz-cli task deploy refresh_dt_gold_metrics -p skill_test


双向互通:Lakehouse 作为 Iceberg REST Provider

除了读取外部 Iceberg 表,云器 Lakehouse 本身也对外提供标准 Iceberg REST Catalog 接口。外部的 Spark、Trino 等引擎可以反向读取 Lakehouse 内部表,实现双向数据共享:

  • 方向一(本文主流程):外部 Spark 写 Iceberg → Lakehouse External Catalog 联邦读
  • 方向二(反向):Lakehouse 内部表 → 对外暴露 Iceberg REST API → 外部 Spark/Trino 读取

对外提供 Iceberg REST API 的配置参考:Spark 通过 Iceberg REST Catalog 访问 Lakehouse


注意事项

  • External Catalog 和 Catalog Connection 创建时会验证 REST API 可达性;如果 Iceberg REST Catalog 服务未启动或网络不通,DDL 会失败
  • 目前 External Catalog 仅
    instance_admin
    instance_admin
    角色可以查询;通过 Dynamic Table 将结果写入内部表后,下游可按普通表权限控制
  • Dynamic Table 引用 External Catalog 表时,DDL 中的 schema 路径用三层命名(
    catalog.schema.table
    catalog.schema.table
    );内部表用两层命名(
    schema.table
    schema.table
  • Dynamic Table 的
    REFRESH DYNAMIC TABLE
    REFRESH DYNAMIC TABLE
    会触发一次完整的 Iceberg snapshot 读取;如果上游 Iceberg 表变更频繁,调度间隔建议不短于 5 分钟,避免过度读取 REST API
  • CREATE CATALOG CONNECTION TYPE ICEBERG_REST
    CREATE CATALOG CONNECTION TYPE ICEBERG_REST
    TYPE
    TYPE
    后不加
    =
    =
    ,参数之间不加逗号,这是高频语法错误

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询