数据质量检查(DQC):SQL 驱动的自动化验证

数据管道跑通了,数据真的对吗?行数是否一致、关键字段有没有空值、聚合指标是否合理——这些是每个数据工程师每天都要面对的问题。云器 Lakehouse 的数据质量检查(DQC)用纯 SQL 实现自动化验证,将质量监控融入数据管道,让问题在影响下游之前被发现。

本文以 NHL Medallion 架构为例,演示如何为 Bronze → Silver → Gold 三层建立完整的 DQC 体系。


DQC 核心概念

Data Pipeline DQC Gate ──────────── ───────── Bronze (raw data) ── after load ──→ row count + freshness │ ▼ Silver (clean DT) ── after refresh ──→ null rate + uniqueness + value range │ ▼ Gold (aggregate DT) ── after refresh ──→ aggregation consistency + volatility │ ▼ BI / Apps ←── consume PASS only

DQC 不是一次性活动,而是嵌入管道的自动化过程——每次数据刷新后自动运行检查,结果写入

dqc_results
dqc_results
表,异常通过监控 DT 暴露。


涉及的 SQL 命令

命令 / 函数用途适用场景
CREATE SCHEMA
CREATE SCHEMA
创建独立 DQC 层隔离质量检查表
CREATE TABLE
CREATE TABLE
建 DQC 结果表存储每次检查的历史记录
INSERT INTO ... SELECT
INSERT INTO ... SELECT
写入 DQC 检查结果每项检查一条记录
CASE WHEN
CASE WHEN
判断 PASS/WARN/FAIL所有检查规则的核心逻辑
COUNT(*)
COUNT(*)
/
SUM(CASE WHEN)
SUM(CASE WHEN)
行数统计、条件计数行数校验、空值率、唯一性
MIN
MIN
/
MAX
MAX
值范围上下限值范围检查、新鲜度检查
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 DQC 仪表盘 DT自动刷新质量状态汇总
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发 DT 刷新首次创建后初始化数据

DQC 检查类型

类型说明示例
行数一致性上下游行数是否匹配Bronze team_info(33) = Silver dim_team(33)
空值率关键字段 NULL 比例goals 空值率应为 0%
唯一性ID 字段是否重复player_id 应唯一
值范围数值是否在合理区间goals ≥ 0, save_pct ∈ [0,1]
新鲜度数据是否更新到最新最新赛季 ≥ 2019
聚合一致性汇总指标是否自洽胜场 + 负场 = 比赛场次
引用完整性JOIN 键匹配率skater_stats.player_id 在 player_info 中存在

第一步:建 DQC 结果表

CREATE SCHEMA IF NOT EXISTS dqc COMMENT '数据质量检查层'; CREATE TABLE dqc.dqc_results ( check_id STRING COMMENT '检查ID,如 DQC-001', check_name STRING COMMENT '检查类型:row_match/null_rate/uniqueness...', layer STRING COMMENT '数据层:bronze/silver/gold', metric STRING COMMENT '指标名', expected STRING COMMENT '期望值或范围', actual STRING COMMENT '实际值', status STRING COMMENT 'PASS / WARN / FAIL', detail STRING COMMENT '检查说明', checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() ) COMMENT 'DQC 检查结果表';


第二步:定义检查规则

行数一致性

验证 Silver 层的维度表行数与 Bronze 源表一致:

INSERT INTO dqc.dqc_results (check_id, check_name, layer, metric, expected, actual, status, detail) SELECT 'DQC-001', 'dim_row_match', 'silver', 'dim_team_rows', CAST((SELECT COUNT(*) FROM nhl_game_data.team_info) AS STRING), CAST((SELECT COUNT(*) FROM silver.dim_team) AS STRING), CASE WHEN (SELECT COUNT(*) FROM nhl_game_data.team_info) = (SELECT COUNT(*) FROM silver.dim_team) THEN 'PASS' ELSE 'FAIL' END, 'Bronze team_info 行数应与 Silver dim_team 一致';

空值率

INSERT INTO dqc.dqc_results (check_id, check_name, layer, metric, expected, actual, status, detail) SELECT 'DQC-003', 'null_rate', 'silver', 'skater_goals_null_pct', '=0', CAST(ROUND(SUM(CASE WHEN goals IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS STRING), CASE WHEN SUM(CASE WHEN goals IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) = 0 THEN 'PASS' ELSE 'WARN' END, 'Silver fact_skater_stats.goals 不应有 NULL' FROM silver.fact_skater_stats;

唯一性

INSERT INTO dqc.dqc_results (check_id, check_name, layer, metric, expected, actual, status, detail) SELECT 'DQC-005', 'uniqueness', 'silver', 'dim_player_id_unique', 'TRUE', CAST(CASE WHEN COUNT(*) = COUNT(DISTINCT player_id) THEN 'TRUE' ELSE 'FALSE' END AS STRING), CASE WHEN COUNT(*) = COUNT(DISTINCT player_id) THEN 'PASS' ELSE 'FAIL' END, 'Silver dim_player.player_id 应唯一' FROM silver.dim_player;

值范围

INSERT INTO dqc.dqc_results (check_id, check_name, layer, metric, expected, actual, status, detail) SELECT 'DQC-006', 'value_range', 'silver', 'skater_goals_positive', '>=0', CAST(MIN(goals) AS STRING), CASE WHEN MIN(goals) >= 0 THEN 'PASS' ELSE 'FAIL' END, 'Silver fact_skater_stats.goals 不应为负数' FROM silver.fact_skater_stats;

新鲜度

INSERT INTO dqc.dqc_results (check_id, check_name, layer, metric, expected, actual, status, detail) SELECT 'DQC-008', 'freshness', 'bronze', 'max_season', '>=2019', CAST(MAX(season) AS STRING), CASE WHEN MAX(season) >= 2019 THEN 'PASS' ELSE 'WARN' END, 'Bronze 最新赛季不应早于 2019' FROM nhl_game_data.game;


第三步:DQC 仪表盘

dqc_results
dqc_results
聚合为仪表盘 Dynamic Table,一键查看各层质量状态:

CREATE OR REPLACE DYNAMIC TABLE dqc.dqc_dashboard REFRESH INTERVAL 1 DAY vcluster DEFAULT COMMENT 'DQC 仪表盘——按层汇总质量状态' AS SELECT layer, COUNT(*) AS total_checks, SUM(CASE WHEN status = 'PASS' THEN 1 ELSE 0 END) AS pass_cnt, SUM(CASE WHEN status = 'WARN' THEN 1 ELSE 0 END) AS warn_cnt, SUM(CASE WHEN status = 'FAIL' THEN 1 ELSE 0 END) AS fail_cnt, ROUND(SUM(CASE WHEN status = 'PASS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pass_rate FROM dqc.dqc_results GROUP BY layer;

查看仪表盘:

SELECT * FROM dqc.dqc_dashboard ORDER BY layer;


实测结果

在 NHL Medallion 架构(Bronze 10 张表 → Silver 4 张 DT → Gold 5 张 DT)上执行 10 项 DQC 检查:

ID类型指标期望实际状态
DQC-001行数silverdim_team_rows3333PASS
DQC-002行数silverdim_player_rows39253925PASS
DQC-003空值silvergoals_null_pct=00.00%PASS
DQC-004空值silverplayer_name_null_pct<1%0.00%PASS
DQC-005唯一性silverplayer_id_uniqueTRUETRUEPASS
DQC-006值范围silvergoals ≥ 0>=00PASS
DQC-007值范围silverpoints ≥ 0>=00PASS
DQC-008新鲜度bronzemax_season>=20192020PASS
DQC-009聚合goldunique_seasons>019PASS
DQC-010聚合goldwins ≥ 0>=00PASS

全部 PASS,通过率 100%。


与数据管道集成

方式一:手动触发(适合开发验证)

-- 执行全部 DQC 检查后查看结果 SELECT check_id, status, metric, actual FROM dqc.dqc_results WHERE status != 'PASS'; -- 只看异常

方式二:Dynamic Table 自动执行

将 DQC 检查逻辑封装为 Dynamic Table,每次源表刷新后自动重跑:

-- DQC 检查 DT:空值率监控 CREATE OR REPLACE DYNAMIC TABLE dqc.skater_null_monitor REFRESH INTERVAL 1 DAY vcluster DEFAULT COMMENT 'Silver 层球员统计空值率监控' AS SELECT 'DQC-003' AS check_id, 'null_rate' AS check_name, 'silver' AS layer, 'skater_goals_null_pct' AS metric, '=0' AS expected, CAST(ROUND(SUM(CASE WHEN goals IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS STRING) AS actual, CASE WHEN SUM(CASE WHEN goals IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) = 0 THEN 'PASS' ELSE 'WARN' END AS status FROM silver.fact_skater_stats;

方式三:Studio 任务调度

在 Studio 中创建 DQC 任务,配置 Cron + 依赖 ETL 任务:

00_sync(Cron 02:00) ↓ 04_etl(Cron 02:30,依赖 00) ↓ 05_dqc(Cron 03:00,依赖 04) ← DQC 在 ETL 完成后执行

如果 DQC 发现 FAIL,可通过 Studio 的告警规则发送通知。


DQC 检查清单

入库后必查建议检查
Bronze行数 ≥ 源端、最新数据日期
_op
_op
分布(I/U/D)、文件数
Silver行数 ≤ Bronze、关键字段 NULL < 1%、ID 唯一LEFT JOIN 匹配率、值范围、类型转换成功率
Gold聚合结果非空、指标 ≥ 0环比波动 < 20%、TOP N 结果合理

与告警联动

-- 查询所有 FAIL 的检查 SELECT * FROM dqc.dqc_results WHERE status = 'FAIL'; -- 查询本轮检查的异常汇总 SELECT layer, SUM(CASE WHEN status = 'FAIL' THEN 1 ELSE 0 END) AS fails, SUM(CASE WHEN status = 'WARN' THEN 1 ELSE 0 END) AS warns FROM dqc.dqc_results WHERE checked_at > CURRENT_TIMESTAMP() - INTERVAL 1 DAY GROUP BY layer HAVING SUM(CASE WHEN status = 'FAIL' THEN 1 ELSE 0 END) > 0;

在 Studio 中可配置:DQC 任务结果中任意 FAIL → 触发企业微信/钉钉/飞书通知。


注意事项

注意点说明
DQC 结果表建议用普通表保留历史记录方便趋势分析,DT 会覆盖历史
WARN 不断路、FAIL 应阻断WARN 是"需关注",FAIL 是"不能发布"
DQC 检查本身也有成本每项检查都是一次表扫描,控制检查数量(建议每层 3-5 项)
阈值需要业务校准不同业务域的 NULL 容忍度不同,先用历史数据确定基线
新鲜度检查注意时区
CURRENT_TIMESTAMP()
CURRENT_TIMESTAMP()
是 UTC,与业务时区可能有偏差

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询