CREATE TABLE IF NOT EXISTS best_practice_manufacturing_spc.kafka_raw_defects (value STRING);
CREATE PIPE IF NOT EXISTS best_practice_manufacturing_spc.pipe_defect_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_manufacturing_spc.kafka_raw_defects
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092',
'mes_defect_events',
'',
'cz_mes_consumer',
'','','','',
'raw', 'raw',
0,
map()
)
);
如果暂未配置 Kafka,可先将数据保存为本地 CSV 文件,通过 cz-cli 上传到 User Volume 后用 COPY INTO 导入(推荐):
从本地 CSV 导入(推荐)
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/defect_events_data.csv' TO USER VOLUME FILE 'defect_events_data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_manufacturing_spc.doc_defect_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('defect_events_data.csv');
将人工抽样表格上传到 Volume 后,通过 COPY INTO 批量导入,支持自动跳过已导入文件(幂等):
COPY INTO best_practice_manufacturing_spc.doc_defect_events
(defect_id, product_id, defect_type, defect_date,
defect_location, severity, inspection_method, repair_cost)
FROM (
SELECT
$1::INT AS defect_id,
$2::INT AS product_id,
$3 AS defect_type,
$4::DATE AS defect_date,
$5 AS defect_location,
$6 AS severity,
$7 AS inspection_method,
$8::DOUBLE AS repair_cost
FROM @best_practice_manufacturing_spc.sampling_volume/defects_data.csv
)
USING csv
OPTIONS('header'='true', 'sep'=',');
💡 提示:COPY INTO 默认以文件为单位做幂等去重,同一文件多次执行不会重复导入。如果需要允许重复导入,加
CREATE OR REPLACE FUNCTION best_practice_manufacturing_spc.severity_score(
severity STRING
)
RETURNS INT
AS CASE severity
WHEN 'Critical' THEN 3
WHEN 'Moderate' THEN 2
WHEN 'Minor' THEN 1
ELSE 0
END;
Silver 层 Dynamic Table:清洗与维度关联
Silver 层在 Bronze 原始缺陷事件基础上完成两件事:
LEFT JOIN
doc_product_master
doc_product_master
,为每条事件关联生产线、产品分类、规格上下限等维度字段
计算
severity_score
severity_score
、
is_critical
is_critical
标记,方便 Gold 层直接聚合
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_manufacturing_spc.silver_defect_enriched
REFRESH INTERVAL 10 MINUTE VCLUSTER DEFAULT
AS
SELECT
e.defect_id,
e.product_id,
e.defect_type,
e.defect_date,
e.defect_location,
e.severity,
e.inspection_method,
e.repair_cost,
e.ingest_time,
p.product_name,
p.production_line,
p.product_category,
p.spec_ucl,
p.spec_lcl,
p.spec_target,
best_practice_manufacturing_spc.severity_score(e.severity) AS severity_score,
CASE
WHEN e.severity = 'Critical' THEN 1
ELSE 0
END AS is_critical,
DATE_TRUNC('month', e.defect_date) AS defect_month
FROM best_practice_manufacturing_spc.doc_defect_events e
LEFT JOIN best_practice_manufacturing_spc.doc_product_master p ON e.product_id = p.product_id;
⚠️ 注意:Dynamic Table 的 DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
实现定期调度,而是在 Studio 中创建"刷新动态表"任务并配置 Cron 表达式来管理。这样可以在同一任务上追加监控告警和数据质量检查规则(见"Studio 刷新任务配置"章节)。本文中 DDL 里的
REFRESH INTERVAL 10 MINUTE
REFRESH INTERVAL 10 MINUTE
用于设定 DT 的刷新能力,实际触发由 Studio Task 控制。
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_manufacturing_spc.silver_defect_enriched;
SELECT COUNT(*) AS silver_count
FROM best_practice_manufacturing_spc.silver_defect_enriched;
silver_count
------------
1000
查看按生产线和缺陷类型的分布(Silver 层直查):
SELECT
production_line,
defect_type,
COUNT(*) AS defect_count,
SUM(is_critical) AS critical_count,
ROUND(AVG(repair_cost), 2) AS avg_repair_cost
FROM best_practice_manufacturing_spc.silver_defect_enriched
GROUP BY production_line, defect_type
ORDER BY production_line, defect_count DESC;
SPC 控制图通过统计过程均值(μ)和标准差(σ)计算过程控制上下限(UCL/LCL),识别过程失控点。本节用窗口函数实现 c 控制图(计数型),适合缺陷计数数据。
控制限公式:
UCL = μ + 3σ
UCL = μ + 3σ
LCL = max(0, μ − 3σ)
LCL = max(0, μ − 3σ)
(计数数据下限不能为负)
WITH monthly_stats AS (
-- 先按产品+月份聚合,避免二层窗口聚合嵌套错误
SELECT
product_id,
defect_month,
COUNT(*) AS monthly_defects
FROM best_practice_manufacturing_spc.silver_defect_enriched
GROUP BY product_id, defect_month
)
SELECT
product_id,
defect_month,
monthly_defects,
-- 滑动 3 个月均值(移动平均)
ROUND(AVG(monthly_defects) OVER (
PARTITION BY product_id
ORDER BY defect_month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
), 2) AS rolling_3m_avg,
-- 全历史过程均值(控制图中心线)
ROUND(AVG(monthly_defects) OVER (PARTITION BY product_id), 2) AS process_mean,
ROUND(STDDEV_SAMP(monthly_defects) OVER (PARTITION BY product_id), 2) AS process_std,
-- UCL / LCL
ROUND(AVG(monthly_defects) OVER (PARTITION BY product_id)
+ 3 * STDDEV_SAMP(monthly_defects) OVER (PARTITION BY product_id), 2) AS ucl,
ROUND(GREATEST(0, AVG(monthly_defects) OVER (PARTITION BY product_id)
- 3 * STDDEV_SAMP(monthly_defects) OVER (PARTITION BY product_id)), 2) AS lcl,
-- 过程状态判断
CASE
WHEN monthly_defects > AVG(monthly_defects) OVER (PARTITION BY product_id)
+ 3 * STDDEV_SAMP(monthly_defects) OVER (PARTITION BY product_id)
THEN 'OUT_OF_CONTROL'
WHEN monthly_defects < GREATEST(0, AVG(monthly_defects) OVER (PARTITION BY product_id)
- 3 * STDDEV_SAMP(monthly_defects) OVER (PARTITION BY product_id))
THEN 'OUT_OF_CONTROL'
ELSE 'IN_CONTROL'
END AS spc_status
FROM monthly_stats
WHERE product_id IN (10, 14, 15)
ORDER BY product_id, defect_month;
在分组聚合场景外会报"aggregate function cannot contain another aggregate function"错误)。正确做法是先用 CTE 完成分组聚合,再在外层应用窗口函数。
Cpk 分析:按生产线计算工序能力
将
calc_cpk
calc_cpk
UDF 应用于按生产线汇总的修复费用数据(以修复费用替代物理尺寸测量值做演示):
WITH line_stats AS (
SELECT
production_line,
COUNT(*) AS total_defects,
ROUND(AVG(repair_cost), 2) AS avg_repair_cost,
ROUND(STDDEV_SAMP(repair_cost), 2) AS std_repair_cost
FROM best_practice_manufacturing_spc.silver_defect_enriched
GROUP BY production_line
)
SELECT
production_line,
total_defects,
avg_repair_cost,
std_repair_cost,
ROUND(best_practice_manufacturing_spc.calc_cpk(
avg_repair_cost,
std_repair_cost,
1000.0, -- 规格上限(最大可接受修复费用)
0.0 -- 规格下限
), 3) AS repair_cost_cpk
FROM line_stats
ORDER BY production_line;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_manufacturing_spc.gold_production_line_monthly
PARTITIONED BY (production_line)
REFRESH INTERVAL 10 MINUTE VCLUSTER DEFAULT
TBLPROPERTIES ('static_partitions' = 'true')
AS
SELECT
production_line,
defect_month,
COUNT(*) AS total_defects,
SUM(is_critical) AS critical_defects,
ROUND(SUM(is_critical)*100.0/COUNT(*), 2) AS critical_rate_pct,
ROUND(SUM(repair_cost), 2) AS total_repair_cost,
ROUND(AVG(repair_cost), 2) AS avg_repair_cost,
COUNT(DISTINCT product_id) AS affected_products
FROM best_practice_manufacturing_spc.silver_defect_enriched
GROUP BY production_line, defect_month;
SELECT production_line, defect_month, total_defects, critical_rate_pct, total_repair_cost
FROM best_practice_manufacturing_spc.gold_production_line_monthly
WHERE production_line = 'Line-3'
ORDER BY defect_month;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_manufacturing_spc.gold_defect_pareto
REFRESH INTERVAL 10 MINUTE VCLUSTER DEFAULT
AS
SELECT
defect_type,
severity,
defect_location,
COUNT(*) AS defect_count,
ROUND(SUM(repair_cost), 2) AS total_repair_cost,
ROUND(COUNT(*)*100.0 / SUM(COUNT(*)) OVER (), 2) AS defect_pct,
SUM(COUNT(*)) OVER (
ORDER BY COUNT(*) DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_count,
ROUND(SUM(COUNT(*)) OVER (
ORDER BY COUNT(*) DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS cumulative_pct
FROM best_practice_manufacturing_spc.silver_defect_enriched
GROUP BY defect_type, severity, defect_location
ORDER BY defect_count DESC;
SELECT defect_type,
SUM(defect_count) AS total,
ROUND(SUM(total_repair_cost), 2) AS total_cost
FROM best_practice_manufacturing_spc.gold_defect_pareto
GROUP BY defect_type
ORDER BY total DESC;
PPM(Parts Per Million)衡量百万分之缺陷数,是 SPC 和六西格玛体系的标准化质量指标:
SELECT
defect_type,
COUNT(*) AS defect_count,
ROUND(COUNT(*) * 1000000.0 / 1000, 0) AS ppm_rate
FROM best_practice_manufacturing_spc.doc_defect_events
GROUP BY defect_type
ORDER BY ppm_rate DESC;