DataOps 管道数据质量门禁最佳实践

在数据管道的每一层刷新后自动运行断言检查、将不合格数据隔离到 Quarantine 区、在门禁失败时触发告警——这是 DataOps 闭环质量控制的核心。本文以电商事件流为数据集,端到端演示 Bronze → Quality Gate → Passed(Silver)/ Quarantine 的完整构建过程,并覆盖 Dynamic Table 内嵌质量过滤、Studio Task DAG 依赖编排、

information_schema.job_history
information_schema.job_history
趋势跟踪三项关键能力。


概述

DataOps 数据质量的核心问题是:数仓刷新后怎么知道刚进来的数据是否可信?云器 Lakehouse 通过以下组合给出答案:

问题解决方案
Bronze 层混入空值、负数、超范围异常Dynamic Table 内嵌
WHERE
WHERE
过滤,只让通过检查的行流入 Silver
不合格数据需要单独存储、人工复核
doc_events_quarantine
doc_events_quarantine
Dynamic Table,自动打
quarantine_reason
quarantine_reason
标签
质量检查结果需要可追溯、可趋势分析
doc_quality_results
doc_quality_results
+
doc_quality_summary
doc_quality_summary
持久化每次运行的检查详情
管道刷新和质量检查的先后顺序需要保证Studio Task DAG:先刷新 Passed/Quarantine → 再刷新 Summary → 再评估门禁 → 再触发 Gold
门禁失败需要及时推送通知Studio Task 配置 Webhook,推送告警到飞书/钉钉运营群

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 Bronze 原始事件表、质量规则表、质量结果表静态表,作为 Dynamic Table 上游数据源
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
建 Passed / Quarantine / Summary 层不写
REFRESH INTERVAL
REFRESH INTERVAL
,调度由 Studio Task 管理
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发首次刷新初次构建或调试时使用
FILTER (WHERE ...)
FILTER (WHERE ...)
聚合函数条件过滤,统计各类脏数据行数用于质量规则的失败行数计算
SHOW TABLES
SHOW TABLES
查看 Schema 下全部对象确认表创建状态
sys.information_schema.job_history
sys.information_schema.job_history
追踪每次 Dynamic Table 刷新的耗时与状态分析质量检查运行趋势

前置准备

本文所有示例在

best_practice_dataops_quality
best_practice_dataops_quality
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_dataops_quality;


Bronze 层:含脏数据的原始事件表

建表

CREATE TABLE IF NOT EXISTS best_practice_dataops_quality.doc_raw_events ( event_id STRING, user_id STRING, event_type STRING, amount DOUBLE, ts TIMESTAMP, region STRING, platform STRING, status STRING );

写入模拟数据(含脏数据)

数据集故意包含四类质量问题:

user_id IS NULL
user_id IS NULL
(3 行)、
amount IS NULL
amount IS NULL
(4 行)、
purchase
purchase
类型
amount < 0
amount < 0
(1 行:EVT012)、
refund
refund
类型
amount < 0
amount < 0
(4 行:EVT003/EVT011/EVT021/EVT028,属于合法退款负数)、
amount > 10000
amount > 10000
(2 行),以及跨事件的逻辑重复(EVT001/EVT006/EVT016 同一 user_id + ts + amount)。

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_raw_events.csv' TO USER VOLUME FILE 'doc_raw_events.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_dataops_quality.doc_raw_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_raw_events.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_dataops_quality.doc_raw_events VALUES ('EVT001','U001','purchase', 99.9, CAST('2026-06-01 08:00:00' AS TIMESTAMP), 'CN', 'iOS', 'completed'), ('EVT002','U002','purchase', 159.0, CAST('2026-06-01 08:05:00' AS TIMESTAMP), 'CN', 'Android', 'completed'), ('EVT003','U003','refund', -50.0, CAST('2026-06-01 08:10:00' AS TIMESTAMP), 'CN', 'Web', 'completed'), ('EVT004',NULL, 'purchase', 80.0, CAST('2026-06-01 08:15:00' AS TIMESTAMP), 'US', 'iOS', 'completed'), ('EVT005','U005','purchase', NULL, CAST('2026-06-01 08:20:00' AS TIMESTAMP), 'US', 'Android', 'completed'), ('EVT006','U001','purchase', 99.9, CAST('2026-06-01 08:00:00' AS TIMESTAMP), 'CN', 'iOS', 'completed'), ('EVT007','U007','purchase', 12500.0,CAST('2026-06-01 08:25:00' AS TIMESTAMP),'EU', 'Web', 'completed'), -- ... 共 33 行,含脏数据分布如下: -- null user_id: EVT004, EVT013, EVT022 -- null amount: EVT005, EVT014, EVT023, EVT033 -- purchase 负数: EVT012(-999)(需隔离) -- refund 负数: EVT003(-50), EVT011(-30), EVT021(-80), EVT028(-200)(合法退款,不隔离) -- amount > 10000: EVT007(12500), EVT019(15000) -- logical duplicates: EVT001/EVT006/EVT016 (U001 @ 08:00:00, 99.9) ...

验证总行数:

SELECT COUNT(*) AS total_rows FROM best_practice_dataops_quality.doc_raw_events;

total_rows ---------- 33


质量规则定义

质量规则以表驱动方式维护,每条规则包含:检查 SQL 表达式(返回失败率)、可接受阈值、告警级别。

建表

CREATE TABLE IF NOT EXISTS best_practice_dataops_quality.doc_quality_rules ( rule_id STRING, rule_name STRING, target_table STRING, target_col STRING, sql_expr STRING, threshold DOUBLE, severity STRING, description STRING );

写入 10 条质量规则

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_quality_rules.csv' TO USER VOLUME FILE 'doc_quality_rules.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_dataops_quality.doc_quality_rules FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_quality_rules.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_dataops_quality.doc_quality_rules VALUES ('R001','user_id_not_null', 'doc_raw_events','user_id', 'COUNT(*) FILTER (WHERE user_id IS NULL) * 1.0 / COUNT(*)', 0.02, 'ERROR', 'user_id null rate must be below 2%'), ('R002','amount_not_null', 'doc_raw_events','amount', 'COUNT(*) FILTER (WHERE amount IS NULL) * 1.0 / COUNT(*)', 0.05, 'ERROR', 'amount null rate must be below 5%'), ('R003','amount_positive', 'doc_raw_events','amount', 'COUNT(*) FILTER (WHERE event_type = ''purchase'' AND amount < 0) * 1.0 / COUNT(*)', 0.0, 'ERROR', 'purchase amount must not be negative'), ('R004','amount_range_check', 'doc_raw_events','amount', 'COUNT(*) FILTER (WHERE amount > 10000) * 1.0 / COUNT(*)', 0.01, 'WARNING', 'amount > 10000 rate must be below 1%'), ('R005','event_type_whitelist', 'doc_raw_events','event_type', 'COUNT(*) FILTER (WHERE event_type NOT IN (''purchase'',''refund'',''login'')) * 1.0 / COUNT(*)', 0.0, 'ERROR', 'event_type must be in whitelist'), ('R006','status_whitelist', 'doc_raw_events','status', 'COUNT(*) FILTER (WHERE status NOT IN (''completed'',''pending'',''error'')) * 1.0 / COUNT(*)', 0.0, 'ERROR', 'status must be in whitelist'), ('R007','duplicate_event_id', 'doc_raw_events','event_id', '(COUNT(*) - COUNT(DISTINCT event_id)) * 1.0 / COUNT(*)', 0.0, 'ERROR', 'event_id must be unique'), ('R008','ts_not_future', 'doc_raw_events','ts', 'COUNT(*) FILTER (WHERE ts > CURRENT_TIMESTAMP()) * 1.0 / COUNT(*)', 0.0, 'ERROR', 'ts must not be in the future'), ('R009','region_not_null', 'doc_raw_events','region', 'COUNT(*) FILTER (WHERE region IS NULL) * 1.0 / COUNT(*)', 0.0, 'WARNING', 'region should not be null'), ('R010','duplicate_events', 'doc_raw_events','*', '(COUNT(*) - COUNT(DISTINCT event_id || CAST(ts AS STRING) || COALESCE(user_id,''''))) * 1.0 / COUNT(*)', 0.05,'WARNING', 'logical duplicate rate must be below 5%');

规则中的

sql_expr
sql_expr
字段存储可直接执行的 SQL 表达式,实际检查时由质量检查任务动态拼接并执行。
threshold = 0.0
threshold = 0.0
表示零容忍,任意一条失败行都会触发告警。


质量检查:运行断言并记录结果

建结果表

CREATE TABLE IF NOT EXISTS best_practice_dataops_quality.doc_quality_results ( check_id STRING, rule_id STRING, rule_name STRING, target_table STRING, check_ts TIMESTAMP, total_rows BIGINT, failed_rows BIGINT, fail_rate DOUBLE, threshold DOUBLE, passed BOOLEAN, severity STRING, pipeline_run STRING );

执行质量检查

将各规则的 SQL 表达式在 Bronze 层上执行,收集真实失败率:

SELECT COUNT(*) AS total_rows, COUNT(*) FILTER (WHERE user_id IS NULL) AS null_user_id, ROUND(COUNT(*) FILTER (WHERE user_id IS NULL) * 1.0 / COUNT(*), 4) AS null_user_id_rate, COUNT(*) FILTER (WHERE amount IS NULL) AS null_amount, ROUND(COUNT(*) FILTER (WHERE amount IS NULL) * 1.0 / COUNT(*), 4) AS null_amount_rate, COUNT(*) FILTER (WHERE event_type = 'purchase' AND amount < 0) AS negative_amount, ROUND(COUNT(*) FILTER (WHERE event_type = 'purchase' AND amount < 0) * 1.0 / COUNT(*), 4) AS negative_amount_rate, COUNT(*) FILTER (WHERE amount > 10000) AS outlier_amount, ROUND(COUNT(*) FILTER (WHERE amount > 10000) * 1.0 / COUNT(*), 4) AS outlier_amount_rate, COUNT(*) - COUNT(DISTINCT event_id) AS dup_event_id FROM best_practice_dataops_quality.doc_raw_events;

total_rows | null_user_id | null_user_id_rate | null_amount | null_amount_rate | negative_amount | negative_amount_rate | outlier_amount | outlier_amount_rate | dup_event_id -----------+--------------+-------------------+-------------+------------------+-----------------+----------------------+----------------+---------------------+------------- 33 | 3 | 0.0909 | 4 | 0.1212 | 1 | 0.0303 | 2 | 0.0606 | 0

结果解读

user_id
user_id
空值率 9.09%,远超 R001 阈值 2%;
amount
amount
空值率 12.12%,超 R002 阈值 5%;
purchase
purchase
类型负值率 3.03% 触发零容忍规则 R003(
refund
refund
类型负值为合法退款,不计入);超范围率 6.06% 超 R004 阈值 1%。
event_id
event_id
本身无重复(R007 通过),但存在逻辑重复(相同 user_id + ts + amount 的事件)。

写入检查结果

从本地 CSV 导入(推荐):

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/doc_quality_results.csv' TO USER VOLUME FILE 'doc_quality_results.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_dataops_quality.doc_quality_results FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_quality_results.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO best_practice_dataops_quality.doc_quality_results VALUES ('CHK001','R001','user_id_not_null', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 3, 0.0909, 0.02, false, 'ERROR', 'run_2026060601'), ('CHK002','R002','amount_not_null', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 4, 0.1212, 0.05, false, 'ERROR', 'run_2026060601'), ('CHK003','R003','amount_positive', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 1, 0.0303, 0.0, false, 'ERROR', 'run_2026060601'), ('CHK004','R004','amount_range_check', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 2, 0.0606, 0.01, false, 'WARNING', 'run_2026060601'), ('CHK005','R005','event_type_whitelist','doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 0, 0.0, 0.0, true, 'ERROR', 'run_2026060601'), ('CHK006','R006','status_whitelist', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 0, 0.0, 0.0, true, 'ERROR', 'run_2026060601'), ('CHK007','R007','duplicate_event_id', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 0, 0.0, 0.0, true, 'ERROR', 'run_2026060601'), ('CHK008','R008','ts_not_future', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 0, 0.0, 0.0, true, 'ERROR', 'run_2026060601'), ('CHK009','R009','region_not_null', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 0, 0.0, 0.0, true, 'WARNING', 'run_2026060601'), ('CHK010','R010','duplicate_events', 'doc_raw_events', CAST('2026-06-01 12:00:00' AS TIMESTAMP), 33, 5, 0.1515, 0.05, false, 'WARNING', 'run_2026060601');


质量门禁层:Passed 与 Quarantine Dynamic Table

Passed 层:只让干净数据流入 Silver

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_passed AS SELECT event_id, user_id, event_type, amount, ts, region, platform, status FROM best_practice_dataops_quality.doc_raw_events WHERE user_id IS NOT NULL AND amount IS NOT NULL AND (event_type != 'purchase' OR amount >= 0) AND amount <= 10000 AND event_type IN ('purchase','refund','login') AND status IN ('completed','pending','error');

Quarantine 层:隔离区存储不合格数据

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_quarantine AS SELECT event_id, user_id, event_type, amount, ts, region, platform, status, CASE WHEN user_id IS NULL THEN 'null_user_id' WHEN amount IS NULL THEN 'null_amount' WHEN event_type = 'purchase' AND amount < 0 THEN 'negative_amount' WHEN amount > 10000 THEN 'amount_out_of_range' WHEN event_type NOT IN ('purchase','refund','login') THEN 'invalid_event_type' ELSE 'other' END AS quarantine_reason FROM best_practice_dataops_quality.doc_raw_events WHERE user_id IS NULL OR amount IS NULL OR (event_type = 'purchase' AND amount < 0) OR amount > 10000 OR event_type NOT IN ('purchase','refund','login');

质量汇总 Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_quality_summary AS SELECT pipeline_run, check_ts, COUNT(*) AS total_rules, COUNT(*) FILTER (WHERE passed = true) AS passed_rules, COUNT(*) FILTER (WHERE passed = false) AS failed_rules, COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') AS error_count, COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') AS warning_count, ROUND(COUNT(*) FILTER (WHERE passed = true) * 1.0 / COUNT(*), 4) AS pass_rate, CASE WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') > 0 THEN 'BLOCKED' WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') > 0 THEN 'WARNING' ELSE 'PASSED' END AS gate_decision FROM best_practice_dataops_quality.doc_quality_results GROUP BY pipeline_run, check_ts;

gate_decision
gate_decision
是门禁的最终裁决:只要有一条
ERROR
ERROR
规则失败就输出
BLOCKED
BLOCKED
,下游 Gold 层的 Studio Task 看到
BLOCKED
BLOCKED
后不触发刷新。

手动触发首次刷新

REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_events_passed; REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_events_quarantine; REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_quality_summary;

查看 Passed 层结果

SELECT COUNT(*) AS passed_count FROM best_practice_dataops_quality.doc_events_passed;

passed_count ------------ 23

查看 Quarantine 层分布

SELECT quarantine_reason, COUNT(*) AS cnt FROM best_practice_dataops_quality.doc_events_quarantine GROUP BY quarantine_reason ORDER BY cnt DESC;

quarantine_reason | cnt -------------------+---- null_amount | 4 null_user_id | 3 amount_out_of_range| 2 negative_amount | 1

结果解读:33 条原始事件中,23 条通过全部质量门禁进入 Silver 层,10 条被隔离。被隔离的主要原因是

amount
amount
空值(4 条,如 EVT005/EVT014/EVT023/EVT033)和
user_id
user_id
空值(3 条),其次是超限大额(EVT007/EVT019)和
purchase
purchase
类型负值金额(EVT012)。
refund
refund
类型的负数金额(EVT003/EVT011/EVT021/EVT028)属于合法退款,通过质量门禁进入 Silver 层。

查看门禁裁决

SELECT pipeline_run, total_rules, passed_rules, failed_rules, error_count, warning_count, pass_rate, gate_decision FROM best_practice_dataops_quality.doc_quality_summary;

pipeline_run | total_rules | passed_rules | failed_rules | error_count | warning_count | pass_rate | gate_decision ----------------+-------------+--------------+--------------+-------------+---------------+-----------+-------------- run_2026060601 | 10 | 5 | 5 | 3 | 2 | 0.5000 | BLOCKED

结果解读:本次运行 10 条规则中 5 条失败,其中 3 条 ERROR(user_id 空值率、amount 空值率、负值金额),2 条 WARNING(超大额、逻辑重复)。

gate_decision = BLOCKED
gate_decision = BLOCKED
,说明下游 Gold 层不应在此次运行后刷新。

查看规则明细

SELECT rule_id, rule_name, total_rows, failed_rows, fail_rate, threshold, passed, severity FROM best_practice_dataops_quality.doc_quality_results ORDER BY passed ASC, severity DESC, fail_rate DESC;

rule_id | rule_name | total_rows | failed_rows | fail_rate | threshold | passed | severity --------+-----------------------+------------+-------------+-----------+-----------+--------+--------- R010 | duplicate_events | 33 | 5 | 0.1515 | 0.05 | false | WARNING R004 | amount_range_check | 33 | 2 | 0.0606 | 0.01 | false | WARNING R002 | amount_not_null | 33 | 4 | 0.1212 | 0.05 | false | ERROR R001 | user_id_not_null | 33 | 3 | 0.0909 | 0.02 | false | ERROR R003 | amount_positive | 33 | 1 | 0.0303 | 0.0 | false | ERROR R009 | region_not_null | 33 | 0 | 0.0 | 0.0 | true | WARNING R005 | event_type_whitelist | 33 | 0 | 0.0 | 0.0 | true | ERROR R006 | status_whitelist | 33 | 0 | 0.0 | 0.0 | true | ERROR R007 | duplicate_event_id | 33 | 0 | 0.0 | 0.0 | true | ERROR R008 | ts_not_future | 33 | 0 | 0.0 | 0.0 | true | ERROR


Studio Task DAG 编排

创建 Studio Task 刷新任务

不在 Dynamic Table DDL 中写

REFRESH INTERVAL
REFRESH INTERVAL
,而是在 Studio 中统一管理刷新调度和依赖。

在 Studio 开发 → 任务 中,路径

best_practices/dataops_quality/
best_practices/dataops_quality/
,依次创建以下任务:

  1. task_refresh_gate:执行

    REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_events_passed
    REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_events_passed
    doc_events_quarantine
    doc_events_quarantine

  2. task_refresh_summary:执行

    REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_quality_summary
    REFRESH DYNAMIC TABLE best_practice_dataops_quality.doc_quality_summary

    • 依赖:
      task_refresh_gate
      task_refresh_gate
  3. task_gate_eval:查询

    doc_quality_summary
    doc_quality_summary
    ,若
    gate_decision = 'BLOCKED'
    gate_decision = 'BLOCKED'
    则中断并触发告警

    • 依赖:
      task_refresh_summary
      task_refresh_summary
  4. task_alert_webhook(可选):调用 Webhook 将

    gate_decision + fail_rate
    gate_decision + fail_rate
    推送到飞书/钉钉运营群

    • 依赖:
      task_gate_eval
      task_gate_eval
      (仅在
      BLOCKED
      BLOCKED
      分支执行)
  5. task_refresh_gold:执行下游 Gold 层刷新

    • 依赖:
      task_gate_eval
      task_gate_eval
      (仅在
      PASSED
      PASSED
      分支执行)

配置调度:Cron 表达式

0/30 * * * ?
0/30 * * * ?
(每 30 分钟触发一次),在 task_refresh_gate 的任务属性中配置。


用 information_schema 跟踪质量趋势

sys.information_schema.job_history
sys.information_schema.job_history
记录每次 Dynamic Table 刷新的执行状态,可用于跟踪质量检查任务的运行历史:

SELECT job_id, status, ROUND(execution_time, 2) AS exec_s, rows_produced, rows_inserted, start_time, SUBSTR(job_text, 1, 80) AS sql_preview FROM sys.information_schema.job_history WHERE pt_date = CAST(CURRENT_DATE() AS STRING) AND LOWER(job_text) LIKE '%best_practice_dataops_quality%' ORDER BY start_time DESC LIMIT 10;


数仓对象总览

SHOW TABLES IN best_practice_dataops_quality;

schema_name | table_name | is_dynamic ----------------------------------+-------------------------+----------- best_practice_dataops_quality | doc_events_passed | true best_practice_dataops_quality | doc_events_quarantine | true best_practice_dataops_quality | doc_quality_results | false best_practice_dataops_quality | doc_quality_rules | false best_practice_dataops_quality | doc_quality_summary | true best_practice_dataops_quality | doc_raw_events | false

架构结构:

doc_raw_events (Bronze, 33 rows) │ ├──[WHERE passed]──→ doc_events_passed (DT, 23 rows)──→ Gold Layer │ └──[WHERE failed]──→ doc_events_quarantine (DT, 10 rows) quarantine_reason: null_amount / null_user_id / amount_out_of_range / negative_amount (purchase only) doc_quality_rules (10 rules) │ └── [检查结果写入] ──→ doc_quality_results (10 records/run) │ └──→ doc_quality_summary (DT) gate_decision: BLOCKED / WARNING / PASSED Studio Task DAG: task_refresh_gate → task_refresh_summary → task_gate_eval → (BLOCKED) task_alert_webhook → (PASSED) task_refresh_gold


注意事项

  • Dynamic Table 不写

    REFRESH INTERVAL
    REFRESH INTERVAL
    :所有 Dynamic Table 的刷新周期统一在 Studio Task 中管理。这样可以在同一任务上附加监控告警、依赖条件等规则,避免 DDL 和调度配置分散在两处。

  • Quarantine 区的数据不会自动修复

    doc_events_quarantine
    doc_events_quarantine
    的数据需要人工复核后决定是修复写回 Bronze 层还是丢弃。建议在 Quarantine 表上定期运行清理任务,避免历史垃圾数据持续堆积。

  • FILTER (WHERE ...)
    FILTER (WHERE ...)
    的语义
    COUNT(*) FILTER (WHERE condition)
    COUNT(*) FILTER (WHERE condition)
    只统计满足条件的行数,等价于
    SUM(CASE WHEN condition THEN 1 ELSE 0 END)
    SUM(CASE WHEN condition THEN 1 ELSE 0 END)
    ,但语法更清晰。不是所有聚合函数都支持
    FILTER
    FILTER
    MEDIAN
    MEDIAN
    不支持。

  • Dynamic Table 首次全量刷新:第一次

    REFRESH DYNAMIC TABLE
    REFRESH DYNAMIC TABLE
    会对上游做全量扫描;后续增量刷新只处理上次刷新点以来新增或变更的行。如果 Bronze 层使用
    INSERT OVERWRITE
    INSERT OVERWRITE
    写入,会导致 Dynamic Table 退化为每次全量刷新。

  • 门禁阈值需要根据业务调整:示例中 R001 阈值 0.02(2%)对于实时数据流是合理的,但批量历史数据迁移场景可能需要临时放宽。建议在

    doc_quality_rules
    doc_quality_rules
    表中维护阈值版本,迁移完成后恢复严格阈值。

  • gate_decision = BLOCKED
    gate_decision = BLOCKED
    不影响已刷新的 Silver 数据
    BLOCKED
    BLOCKED
    只是告知下游 Gold 层不要在本次运行后刷新,已在
    doc_events_passed
    doc_events_passed
    中的数据不会回滚。如果需要回滚,可使用 Time Travel(
    RESTORE TABLE ... TO TIMESTAMP
    RESTORE TABLE ... TO TIMESTAMP
    )恢复到上次刷新前的快照。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询