-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_raw_events.csv' TO USER VOLUME FILE 'doc_raw_events.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_dataops_quality.doc_raw_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_raw_events.csv');
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_quality_rules.csv' TO USER VOLUME FILE 'doc_quality_rules.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_dataops_quality.doc_quality_rules
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_quality_rules.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_dataops_quality.doc_quality_rules VALUES
('R001','user_id_not_null', 'doc_raw_events','user_id',
'COUNT(*) FILTER (WHERE user_id IS NULL) * 1.0 / COUNT(*)',
0.02, 'ERROR', 'user_id null rate must be below 2%'),
('R002','amount_not_null', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE amount IS NULL) * 1.0 / COUNT(*)',
0.05, 'ERROR', 'amount null rate must be below 5%'),
('R003','amount_positive', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE event_type = ''purchase'' AND amount < 0) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'purchase amount must not be negative'),
('R004','amount_range_check', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE amount > 10000) * 1.0 / COUNT(*)',
0.01, 'WARNING', 'amount > 10000 rate must be below 1%'),
('R005','event_type_whitelist', 'doc_raw_events','event_type',
'COUNT(*) FILTER (WHERE event_type NOT IN (''purchase'',''refund'',''login'')) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'event_type must be in whitelist'),
('R006','status_whitelist', 'doc_raw_events','status',
'COUNT(*) FILTER (WHERE status NOT IN (''completed'',''pending'',''error'')) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'status must be in whitelist'),
('R007','duplicate_event_id', 'doc_raw_events','event_id',
'(COUNT(*) - COUNT(DISTINCT event_id)) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'event_id must be unique'),
('R008','ts_not_future', 'doc_raw_events','ts',
'COUNT(*) FILTER (WHERE ts > CURRENT_TIMESTAMP()) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'ts must not be in the future'),
('R009','region_not_null', 'doc_raw_events','region',
'COUNT(*) FILTER (WHERE region IS NULL) * 1.0 / COUNT(*)',
0.0, 'WARNING', 'region should not be null'),
('R010','duplicate_events', 'doc_raw_events','*',
'(COUNT(*) - COUNT(DISTINCT event_id || CAST(ts AS STRING) || COALESCE(user_id,''''))) * 1.0 / COUNT(*)',
0.05,'WARNING', 'logical duplicate rate must be below 5%');
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/doc_quality_results.csv' TO USER VOLUME FILE 'doc_quality_results.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_dataops_quality.doc_quality_results
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_quality_results.csv');
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_passed
AS
SELECT
event_id,
user_id,
event_type,
amount,
ts,
region,
platform,
status
FROM best_practice_dataops_quality.doc_raw_events
WHERE user_id IS NOT NULL
AND amount IS NOT NULL
AND (event_type != 'purchase' OR amount >= 0)
AND amount <= 10000
AND event_type IN ('purchase','refund','login')
AND status IN ('completed','pending','error');
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_quarantine
AS
SELECT
event_id,
user_id,
event_type,
amount,
ts,
region,
platform,
status,
CASE
WHEN user_id IS NULL THEN 'null_user_id'
WHEN amount IS NULL THEN 'null_amount'
WHEN event_type = 'purchase' AND amount < 0 THEN 'negative_amount'
WHEN amount > 10000 THEN 'amount_out_of_range'
WHEN event_type NOT IN ('purchase','refund','login') THEN 'invalid_event_type'
ELSE 'other'
END AS quarantine_reason
FROM best_practice_dataops_quality.doc_raw_events
WHERE user_id IS NULL
OR amount IS NULL
OR (event_type = 'purchase' AND amount < 0)
OR amount > 10000
OR event_type NOT IN ('purchase','refund','login');
质量汇总 Dynamic Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_quality_summary
AS
SELECT
pipeline_run,
check_ts,
COUNT(*) AS total_rules,
COUNT(*) FILTER (WHERE passed = true) AS passed_rules,
COUNT(*) FILTER (WHERE passed = false) AS failed_rules,
COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') AS error_count,
COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') AS warning_count,
ROUND(COUNT(*) FILTER (WHERE passed = true) * 1.0 / COUNT(*), 4) AS pass_rate,
CASE
WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') > 0 THEN 'BLOCKED'
WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') > 0 THEN 'WARNING'
ELSE 'PASSED'
END AS gate_decision
FROM best_practice_dataops_quality.doc_quality_results
GROUP BY pipeline_run, check_ts;
SELECT
job_id,
status,
ROUND(execution_time, 2) AS exec_s,
rows_produced,
rows_inserted,
start_time,
SUBSTR(job_text, 1, 80) AS sql_preview
FROM sys.information_schema.job_history
WHERE pt_date = CAST(CURRENT_DATE() AS STRING)
AND LOWER(job_text) LIKE '%best_practice_dataops_quality%'
ORDER BY start_time DESC
LIMIT 10;
💡 提示:将上述查询封装成一个 Dynamic Table
doc_pipeline_run_trend
doc_pipeline_run_trend
,按天聚合成功/失败次数和平均耗时,可以直接接入 BI 工具作为 DataOps 质量仪表板的数据源。