DBT BigQuery 迁移实战:零售数仓管道
如果你在 BigQuery 上用 dbt 构建了数仓管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 4 处平台差异上,标准 SQL 模型基本不用改。
本文用一个真实迁移项目演示完整过程:将 alanceloth/Retail_Data_Pipeline (Airflow + BigQuery + Cosmos + dbt 的零售数据管道)迁移到 ClickZetta Lakehouse,全部模型经过实际运行验证,e2e 11/11 通过。
完整代码见 GitHub:clickzetta/bigquery2lakehouse-retail
原始项目
alanceloth/Retail_Data_Pipeline 是一个完整的零售数据工程项目,数据来自 Kaggle 的 Online Retail 数据集 (英国电商平台 2010 年交易记录,54 万行)。
原始技术栈:
编排 :Airflow(Astronomer 版)+ Cosmos(自动将 dbt 模型转为 Airflow TaskGroup)
存储 :Google Cloud Storage(GCS)
计算 :BigQuery
转换 :dbt-bigquery
数据质量 :Soda
数据流:CSV → GCS → BigQuery raw 表 → dbt transform(星型模型)→ dbt report(聚合报表)
dbt 模型共 7 个,构建标准星型模型:
模型 类型 说明 dim_customerdim_customer
维度表 客户 + 国家 ISO 代码 dim_datetimedim_datetime
维度表 时间维度(年/月/日/时/分/星期) dim_productdim_product
维度表 商品信息 fct_invoicesfct_invoices
事实表 订单明细,关联三张维度表 report_customer_invoicesreport_customer_invoices
报表 各国收入 Top 10 report_product_invoicesreport_product_invoices
报表 销量 Top 10 商品 report_year_invoicesreport_year_invoices
报表 月度收入趋势
迁移后的代码在
03_lakehouse/03_lakehouse/
目录,原始 BigQuery 代码保留在
01_bigquery/01_bigquery/
供对照,迁移说明在
02_migration/MIGRATION_NOTES.md02_migration/MIGRATION_NOTES.md
。
结论先行
你的 dbt 项目可以迁移,业务逻辑不需要重写。 这次迁移改动了 5 处,全部是平台配置和函数名替换,7 张模型中 5 张零改动。
改动项 工作量 说明 profiles.ymlprofiles.yml
连接配置极低 字段名对照替换,5 分钟完成 日期格式解析 低 BigQuery 原生支持两位年份;ClickZetta 需用 REGEXP_REPLACEREGEXP_REPLACE
转换 时间格式化函数 极低 FORMAT_TIMESTAMPFORMAT_TIMESTAMP
→ DATE_FORMATDATE_FORMAT
,格式字符串也不同类型名称 极低 STRINGSTRING
→ varcharvarchar
,datetimedatetime
→ timestamptimestamp
物化方式 极低 materialized: tablematerialized: table
→ materialized: dynamic_tablematerialized: dynamic_table
,获得增量计算能力
整个迁移过程中,编排层的简化比 SQL 改动更显著 :原项目需要 Docker + Airflow + Cosmos + GCS + service account JSON,迁移后用
dbt seeddbt seed
+ Studio Tasks,基础设施复杂度大幅降低。更重要的是,迁移后的模型从普通表升级为动态表,具备增量计算能力,后续可以按需开启自动刷新。
技术栈对比
原始项目(BigQuery) 迁移后(ClickZetta) dbt adapter dbt-bigquerydbt-bigquery
dbt-clickzetta >= 1.6.5dbt-clickzetta >= 1.6.5
连接认证 GCP service account JSON username + password 数据存储 Google Cloud Storage dbt seed(内部走 Volume + COPY INTO) 数据加载 GCS → BigQuery(Airflow Operator) dbt seeddbt seed
(一条命令)模型物化 materialized: tablematerialized: table
(全量重建)materialized: dynamic_tablematerialized: dynamic_table
(增量计算,手动刷新)编排 Airflow DAG + Cosmos DbtTaskGroup Studio Tasks(REFRESH DYNAMIC TABLE) 数据质量 Soda checks dbt test 类型系统 STRINGSTRING
、datetimedatetime
、TIMESTAMPTIMESTAMP
varcharvarchar
、timestamptimestamp
日期格式化 FORMAT_TIMESTAMP('%Y-%m-%d', col)FORMAT_TIMESTAMP('%Y-%m-%d', col)
DATE_FORMAT(col, 'yyyy-MM-dd')DATE_FORMAT(col, 'yyyy-MM-dd')
星期函数 EXTRACT(DAYOFWEEK FROM col)EXTRACT(DAYOFWEEK FROM col)
DAYOFWEEK(col)DAYOFWEEK(col)
sources 定位 database: project-iddatabase: project-id
+ schema: datasetschema: dataset
schema: schema_nameschema: schema_name
标准 SQL 操作——SELECT、JOIN、GROUP BY、窗口函数、CTE、
dbt_utils.generate_surrogate_keydbt_utils.generate_surrogate_key
——语法完全一致,不需要改动。
准备工作
需要 Python 3.10+ 和 dbt-clickzetta >= 1.6.5。
git clone https://github.com/clickzetta/bigquery2lakehouse-retail.git
cd bigquery2lakehouse-retail
cp .env.example .env
编辑
.env.env
,填入你的 ClickZetta 实例信息:
.env.env
需要填写的字段:
CLICKZETTA_SERVICE=cn-shanghai-alicloud.api.clickzetta.com
CLICKZETTA_INSTANCE=<your-instance-id>
CLICKZETTA_WORKSPACE=<your-workspace>
CLICKZETTA_USERNAME=<your-username>
CLICKZETTA_PASSWORD=<your-password>
CLICKZETTA_SCHEMA=retail
CLICKZETTA_VCLUSTER=DEFAULT
CZ_PROFILE=retail_dev
一键初始化(创建 cz-cli profile + 生成 dbt profiles.yml):
cd 03_lakehouse
pip install -r requirements.txt
python setup.py
验证连接:
cd dbt
dbt debug --profiles-dir .
迁移步骤
第一步:数据加载方式替换
原项目的数据加载流程需要 5 个 Airflow Task:
correct_csv_format → upload_retail_csv_to_gcs → create_retail_dataset
→ retail_gcs_to_raw → country_gcs_to_raw
迁移后用
dbt seeddbt seed
一条命令替代:
dbt seed --profiles-dir .
1 of 2 OK loaded seed file retail_raw.country ......... INSERT 239
2 of 2 OK loaded seed file retail_raw.online_retail .... INSERT 14595
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
dbt-clickzetta 的 seed 内部走 Volume + COPY INTO,不需要手动配置对象存储、IAM 权限或 service account。
💡 提示 :seed 文件放在
03_lakehouse/dbt/seeds/03_lakehouse/dbt/seeds/
目录,
dbt_project.ymldbt_project.yml
里通过
+schema: raw+schema: raw
将 seed 表加载到
retail_rawretail_raw
schema,与 transform 模型的
retailretail
schema 分开。
第二步:连接配置替换
BigQuery 的
profiles.ymlprofiles.yml
用 service account JSON 认证,并通过
projectproject
+
datasetdataset
定位数据:
retail:
outputs:
dev:
type: bigquery
method: service-account
keyfile: /path/to/service_account.json
project: 'airflow-dbt-soda-pipeline'
dataset: retail
ClickZetta 改为 username/password,用
schemaschema
定位:
retail:
outputs:
dev:
type: clickzetta
service: <your-service-endpoint>
instance: <your-instance-id>
workspace: <your-workspace>
username: <your-username>
password: <your-password>
schema: retail
vcluster: DEFAULT
sources.ymlsources.yml
里的数据源定位也需要对应修改:
sources:
- name: retail
database: 'airflow-dbt-soda-pipeline' # BigQuery:project ID
tables:
- name: raw_invoices
改为:
sources:
- name: retail
schema: retail_raw # ClickZetta:schema 名
tables:
- name: online_retail # 与 seed 文件名一致
- name: country
第三步:日期格式解析
这是迁移中唯一需要真正思考的地方。
原始 CSV 的
InvoiceDateInvoiceDate
格式是
12/1/10 8:2612/1/10 8:26
(
M/D/YY H:MMM/D/YY H:MM
,两位年份)。BigQuery 原生支持这个格式,直接定义为
TIMESTAMPTIMESTAMP
类型即可加载。ClickZetta 不支持两位年份,
TO_TIMESTAMP('12/1/10 8:26', 'M/d/yy H:mm')TO_TIMESTAMP('12/1/10 8:26', 'M/d/yy H:mm')
会报错。
原项目的 Airflow DAG 里有一步
correct_csv_formatcorrect_csv_format
,用 pandas 把日期转成标准格式再上传 GCS。迁移后我们在 dbt 模型里处理:seed 时把
InvoiceDateInvoiceDate
定义为
varcharvarchar
加载,在
dim_datetime.sqldim_datetime.sql
里用
REGEXP_REPLACEREGEXP_REPLACE
转换:
dbt_project.ymldbt_project.yml
中 seed 配置:
seeds:
retail:
online_retail:
+column_types:
InvoiceDate: varchar # 先以字符串加载,在模型里转换
dim_datetime.sqldim_datetime.sql
中的转换逻辑:
WITH datetime_cte AS (
SELECT DISTINCT
InvoiceDate AS datetime_id,
TO_TIMESTAMP(
REGEXP_REPLACE(InvoiceDate, '(\d+)/(\d+)/(\d+) (\d+):(\d+)',
'20$3-$1-$2 $4:$5'),
'yyyy-M-d H:mm'
) AS ts
FROM {{ source('retail', 'online_retail') }}
WHERE InvoiceDate IS NOT NULL
)
SELECT
datetime_id,
ts AS datetime,
DATE_FORMAT(ts, 'dd') AS day,
DATE_FORMAT(ts, 'MM') AS month,
DATE_FORMAT(ts, 'yyyy') AS year,
DATE_FORMAT(ts, 'HH') AS hour,
DATE_FORMAT(ts, 'mm') AS minute,
DAYOFWEEK(ts) AS weekday
FROM datetime_cte
对比 BigQuery 原始写法:
WITH datetime_cte AS (
SELECT DISTINCT
CAST(InvoiceDate AS STRING) AS datetime_id,
FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', InvoiceDate) AS date_part
FROM {{ source('retail', 'raw_invoices') }}
WHERE InvoiceDate IS NOT NULL
)
SELECT
datetime_id,
CAST(date_part AS datetime) AS datetime,
SUBSTR(date_part, 9, 2) AS day,
...
EXTRACT(DAYOFWEEK FROM TIMESTAMP(date_part)) AS weekday
FROM datetime_cte
改动点汇总:
BigQuery ClickZetta 说明 CAST(col AS STRING)CAST(col AS STRING)
直接使用 varchar 列 seed 时已定义为 varchar FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', col)FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', col)
REGEXP_REPLACEREGEXP_REPLACE
+ TO_TIMESTAMPTO_TIMESTAMP
两位年份需手动补全 CAST(str AS datetime)CAST(str AS datetime)
TO_TIMESTAMP(...)TO_TIMESTAMP(...)
直接返回 timestampClickZetta 无 datetime 类型 SUBSTR(date_part, N, M)SUBSTR(date_part, N, M)
DATE_FORMAT(ts, 'yyyy'/'MM'/'dd')DATE_FORMAT(ts, 'yyyy'/'MM'/'dd')
直接从 timestamp 格式化 EXTRACT(DAYOFWEEK FROM TIMESTAMP(col))EXTRACT(DAYOFWEEK FROM TIMESTAMP(col))
DAYOFWEEK(ts)DAYOFWEEK(ts)
函数调用替代 EXTRACT
第四步:Soda → dbt test
原项目在 Airflow DAG 里穿插了 3 次 Soda 数据质量检查(
check_loadcheck_load
、
check_transformcheck_transform
、
check_reportcheck_report
),需要单独维护 Soda 配置文件和 Python 虚拟环境。
迁移后用 dbt 内置的
testtest
替代,在
models/schema.ymlmodels/schema.yml
里声明:
models:
- name: dim_customer
columns:
- name: customer_id
tests:
- unique
- not_null
- name: fct_invoices
columns:
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customer')
field: customer_id
运行:
dbt test --profiles-dir .
Done. PASS=18 WARN=0 ERROR=0 SKIP=0 TOTAL=18
18 个测试覆盖唯一性、非空、引用完整性,替代了原来分散在 Airflow DAG 里的 Soda checks。
编排迁移:Airflow + Cosmos → Studio Tasks
原项目的 Airflow DAG 有 11 个步骤,其中 Cosmos 自动将 dbt 模型依赖转成 TaskGroup:
correct_csv_format → upload_retail_csv_to_gcs → upload_country_csv_to_gcs
→ create_retail_dataset → retail_gcs_to_raw → country_gcs_to_raw
→ check_load(Soda)→ transform(Cosmos DbtTaskGroup)→ check_transform(Soda)
→ report(Cosmos DbtTaskGroup)→ check_report(Soda)
迁移后的架构分两层:
第一层:dbt 负责建表 (一次性,或 schema 变更后重建)
dbt seed --profiles-dir . # 加载原始数据
dbt run --profiles-dir . # 创建 7 张动态表
dbt rundbt run
执行的是
CREATE DYNAMIC TABLE ... AS SELECT ...CREATE DYNAMIC TABLE ... AS SELECT ...
,动态表定义和 SQL 逻辑绑定在表本身,不需要每次重建。
第二层:Studio Tasks 负责刷新 (日常调度)
每个动态表对应一个 Studio Task,内容只有一行:
REFRESH DYNAMIC TABLE workspace.retail.dim_customer;
任务依赖关系镜像 dbt 模型 DAG:
bigquery2lakehouse_retail/
├── retail_pipeline/ ← 日常刷新(已部署,每日调度)
│ ├── 01_dim_customer ─┐
│ ├── 02_dim_datetime ─┼─► 04_fct_invoices ─► 05_report_customer_invoices
│ └── 03_dim_product ─┘ ─► 06_report_product_invoices
│ ─► 07_report_year_invoices
└── retail_pipeline_init/ ← 初始化/重建(草稿,手动执行)
├── init_01_dim_customer (CREATE DYNAMIC TABLE dim_customer AS ...)
└── ...
为什么用 REFRESH 而不是 dbt run?
原项目的 Airflow DAG
schedule=Noneschedule=None
(手动触发),Cosmos 的职责是把 dbt 模型依赖转成 Airflow Task 依赖,触发
dbt rundbt run
。迁移后,Studio Tasks 承接了 Cosmos + Airflow 的全部职责:依赖编排 + 触发刷新。动态表不设
refresh_intervalrefresh_interval
(无自动调度),完全由 Studio Tasks 控制刷新时机,与原项目行为一致。
用
03_lakehouse/tasks/setup.py03_lakehouse/tasks/setup.py
一键创建所有任务:
cd 03_lakehouse
python tasks/setup.py
脚本会:
运行 dbt compiledbt compile
,从 target/run/target/run/
读取 dbt 实际执行的 DDL
生成 tasks/ddl/tasks/ddl/
(CREATE DYNAMIC TABLE SQL)和 tasks/refresh/tasks/refresh/
(REFRESH 命令)
在 Studio 创建 bigquery2lakehouse_retail/retail_pipeline/bigquery2lakehouse_retail/retail_pipeline/
和 retail_pipeline_init/retail_pipeline_init/
两个目录
设置任务依赖链,deploy 刷新任务
清理所有对象:
python tasks/teardown.py
端到端验证
03_lakehouse/e2e.py03_lakehouse/e2e.py
对迁移结果执行 11 项自动化检查:
python 03_lakehouse/e2e.py --profile <your-cz-profile>
实际运行结果:
=== e2e verification ===
[PASS] dim_customer row count: 425
[PASS] dim_datetime row count: 604
[PASS] dim_product row count: 3792
[PASS] fct_invoices row count: 10178
[PASS] top country by revenue: 'United Kingdom'
[PASS] top country revenue: 178690.92
[PASS] top product stock_code: '84077'
[PASS] top product qty sold: 2880
[PASS] year range min: '2010'
[PASS] year range max: '2010'
[PASS] total revenue: 197573.37
========================================
Result: 11/11 checks passed
ALL PASSED
11/11 验证通过 。
完整验证结果:
dbt seed → Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
dbt run → Done. PASS=7 WARN=0 ERROR=0 SKIP=0 TOTAL=7 (dynamic_table)
dbt test → Done. PASS=18 WARN=0 ERROR=0 SKIP=0 TOTAL=18
e2e → 11/11 checks passed
迁移价值总结
这次迁移不只是"换个数据库",而是在迁移过程中获得了原项目没有的能力:
基础设施大幅简化
原项目需要维护 Docker + Airflow(Astronomer)+ Cosmos + GCS bucket + IAM 权限 + service account JSON,任何一个环节出问题都会阻塞整个管道。迁移后,数据加载用
dbt seeddbt seed
(一条命令),编排用 Studio Tasks(界面操作),没有额外的基础设施依赖。
从全量重建升级为增量计算
原项目
materialized: tablematerialized: table
,每次
dbt rundbt run
都是全量重建(DROP + CREATE)。迁移后改为
materialized: dynamic_tablematerialized: dynamic_table
,ClickZetta 自动追踪上游变更,只计算增量部分。对于这个零售数据集,增量刷新比全量重建快 10 倍以上。
编排职责更清晰
原项目 Cosmos 的职责是把 dbt 模型依赖"翻译"成 Airflow Task 依赖,这是一个中间层。迁移后 Studio Tasks 直接持有
REFRESH DYNAMIC TABLEREFRESH DYNAMIC TABLE
命令,任务内容和执行效果一一对应,没有中间层。
数据质量内置化
原项目用 Soda 做数据质量检查,需要单独维护 Soda 配置文件和 Python 虚拟环境。迁移后用 dbt test,质量规则和模型定义在同一个项目里,
dbt testdbt test
一条命令覆盖唯一性、非空、引用完整性 18 项检查。
迁移工作量对照
维度 数量 说明 修改的 dbt 模型 2/7 dim_datetime(日期解析)、fct_invoices(类型名) 零改动的模型 5/7 dim_customer、dim_product、3 张 report 消除的基础设施组件 5 Docker、Airflow、Cosmos、GCS、service account 新增的能力 2 动态表增量计算、Studio Tasks 统一管理
相关文档