CREATE BLOOMFILTER INDEX idx_bf_vent_zone ON TABLE mine_ventilation_sensors(zone_id);
CREATE BLOOMFILTER INDEX idx_bf_person_zone ON TABLE mine_personnel_location(zone_id);
CREATE BLOOMFILTER INDEX idx_bf_safety_level ON TABLE mine_safety_events(event_level);
三层数据管道
第一层:聚合层 Dynamic Table(每子系统一张,5 分钟刷新)
-- 通风指标聚合:按区域统计最近 10 分钟各气体指标
CREATE DYNAMIC TABLE mine_ventilation_metrics
REFRESH INTERVAL 5 MINUTE
AS
SELECT
zone_id,
mine_id,
MAX(CASE WHEN sensor_type = 'CH4' THEN value END) AS ch4_max_pct,
MIN(CASE WHEN sensor_type = 'WIND_SPEED' THEN value END) AS wind_speed_min,
CASE
WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 1.5 THEN 'CRITICAL'
WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 1.0 THEN 'HIGH'
WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 0.7 THEN 'MEDIUM'
ELSE 'NORMAL'
END AS ch4_risk_level
FROM mine_ventilation_sensors
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE
GROUP BY zone_id, mine_id;
第二层:跨系统关联异常检测 Dynamic Table(核心差异化)
-- 将五个聚合层 JOIN 在一起,供电通过 zone_id + mine_id 精确关联,输出综合风险评级
CREATE DYNAMIC TABLE mine_cross_system_anomalies
REFRESH INTERVAL 5 MINUTE
AS
SELECT
v.zone_id,
v.ch4_risk_level,
pw.switch_status AS feeder_switch_status, -- 注意别名,非 pw.switch_status
-- 综合风险评级:多系统联合判断
CASE
WHEN v.ch4_risk_level = 'CRITICAL'
AND COALESCE(p.persons_in_danger, 0) > 0
AND pw.switch_status = 'CLOSED'
THEN 'CRITICAL'
WHEN v.ch4_risk_level IN ('CRITICAL','HIGH')
AND COALESCE(p.persons_in_danger, 0) > 0
THEN 'HIGH'
...
END AS overall_risk_level,
-- 结构化摘要,同时作为 AI_CLASSIFY 和 AI_COMPLETE 的输入
CONCAT_WS(';',
CASE WHEN v.ch4_risk_level != 'NORMAL'
THEN CONCAT('CH4浓度', v.ch4_max_pct, '%') END,
CASE WHEN COALESCE(p.persons_in_danger, 0) > 0
THEN CONCAT(p.persons_in_danger, '人处于危险区域') END,
CASE WHEN pw.switch_status = 'CLOSED' AND v.ch4_risk_level IN ('HIGH','CRITICAL')
THEN '馈电开关未断开(需联动断电)' END
) AS anomaly_summary
FROM mine_ventilation_metrics v
LEFT JOIN mine_personnel_metrics p ON v.zone_id = p.zone_id AND v.mine_id = p.mine_id
LEFT JOIN mine_power_metrics pw ON v.zone_id = pw.zone_id AND v.mine_id = pw.mine_id
LEFT JOIN mine_excavation_metrics ex
ON v.mine_id = ex.mine_id
AND ex.working_face_id = CONCAT('FACE-', SUBSTR(v.zone_id, 6))
LEFT JOIN mine_transport_metrics tr
-- zone_id 精确关联:皮带异常只关联本区域,不跨区污染
ON v.zone_id = tr.zone_id AND v.mine_id = tr.mine_id AND tr.device_type = 'CONVEYOR'
WHERE v.ch4_risk_level != 'NORMAL' OR v.co_risk_level != 'NORMAL' OR ...;
第三层:AI 分类 + AI 预警工单生成(CTE 单次调用)
-- CTE 结构:AI_CLASSIFY 一次调用 + AI_COMPLETE 一次调用,避免多次调用结果不一致
WITH anomalies AS (
SELECT * FROM mine_cross_system_anomalies WHERE overall_risk_level IN ('HIGH', 'CRITICAL')
),
classified AS (
SELECT *,
REGEXP_EXTRACT(AI_CLASSIFY('conn_dashscope:deepseek-v3', anomaly_summary,
ARRAY('GAS','FIRE','ROOF','FLOOD','EQUIPMENT','COMPOUND')), '(?s)\{.*\}', 0) AS classify_json
FROM anomalies
),
completed AS (
SELECT *,
REGEXP_EXTRACT(AI_COMPLETE('conn_dashscope:deepseek-v3',
CONCAT('你是矿山安全工程师,灾害类型:',
COALESCE(GET_JSON_OBJECT(classify_json,'$.label'),'未知'),
',异常:', anomaly_summary, ...)), '(?s)\{.*\}', 0) AS complete_json
FROM classified
)
INSERT INTO mine_ai_safety_alerts (...)
SELECT
GET_JSON_OBJECT(classify_json, '$.label') AS disaster_type,
CAST(GET_JSON_OBJECT(classify_json, '$.score') AS DOUBLE) AS disaster_confidence,
GET_JSON_OBJECT(complete_json, '$.alert_title') AS alert_title,
GET_JSON_OBJECT(complete_json, '$.immediate_actions') AS immediate_actions,
CAST(GET_JSON_OBJECT(complete_json, '$.evacuation_required') AS BOOLEAN) AS evacuation_required,
CAST(GET_JSON_OBJECT(complete_json, '$.power_cutoff_required') AS BOOLEAN) AS power_cutoff_required,
...
FROM completed;
-- 实时链路核心:1 分钟窗口,只保留超阈值记录
INSERT INTO mine_realtime_alerts (...)
SELECT sensor_id, sensor_type, MAX(value) AS current_value, alert_rule, ...
FROM mine_ventilation_sensors
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 1 MINUTE
GROUP BY mine_id, zone_id, sensor_id, sensor_type
HAVING (sensor_type = 'CH4' AND MAX(value) >= 0.75)
OR (sensor_type = 'CO' AND MAX(value) >= 24)
OR (sensor_type = 'WIND_SPEED' AND MAX(value) < 0.25)
OR (sensor_type = 'TEMP' AND MAX(value) >= 26);
链路 B:离线非结构化数据处理链路
矿山积累了大量非结构化数据:历年事故调查报告、设备检修记录、地质勘探档案、规程培训材料。这些数据是最宝贵的安全知识资产,但长期以 PDF/Word/扫描件形式孤立存放,无法被 AI 检索利用。
离线链路将这些文档转化为可检索的结构化知识库:
PDF/Word/图片(对象存储)
↓ OCR / 文档解析
mine_unstructured_docs(原始文本 + 元数据)
↓ Studio Task(每日批量)
↓ AI_COMPLETE(矿山安全专家 Prompt)
mine_doc_knowledge(结构化知识:摘要/风险/经验教训)
↓
实时预警 AI 增强(RAG 检索历史相似事故的处置方案)
-- 检索相似历史事故,注入预警 AI 的 Prompt
SELECT summary, lessons_learned
FROM mine_doc_knowledge
WHERE mine_id = 'MINE-001'
AND involved_zones LIKE '%ZONE-101%'
AND disaster_types LIKE '%GAS%'
ORDER BY processed_at DESC LIMIT 3;
CREATE DYNAMIC TABLE mine_ch4_trend
REFRESH INTERVAL 5 MINUTE
AS
SELECT
zone_id, mine_id,
MAX(CASE WHEN sensor_type = 'CH4' THEN value END) AS ch4_current,
(AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) -
AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END)
) / 25.0 AS ch4_slope_per_min,
MAX(CASE WHEN sensor_type = 'CH4' THEN value END) +
(AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) -
AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END)
) AS ch4_predicted_30min,
CASE
WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) +
(AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) -
AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END)
) >= 1.0 THEN 'TREND_WARN'
WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) +
(AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) -
AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END)
) >= 0.75 THEN 'TREND_CAUTION'
ELSE 'STABLE'
END AS trend_level
FROM mine_ventilation_sensors
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 30 MINUTE
AND sensor_type = 'CH4'
GROUP BY zone_id, mine_id;
CREATE DYNAMIC TABLE mine_belt_health_score
REFRESH INTERVAL 30 MINUTE
AS
SELECT
device_id, mine_id,
ROUND(
GREATEST(0, 100 - MAX(CASE WHEN metric_name = 'DEVIATION' THEN value ELSE 0 END) / 70.0 * 40) * 0.4 +
GREATEST(0, 100 - GREATEST(0, MAX(CASE WHEN metric_name = 'TEMP' THEN value ELSE 0 END) - 50) * 2) * 0.3 +
GREATEST(0, 100 - GREATEST(0, MAX(CASE WHEN metric_name = 'CURRENT' THEN value ELSE 0 END) - 85) / 85.0 * 30) * 0.3,
1) AS health_score,
CASE
WHEN health_score < 40 THEN 'CRITICAL' -- 立即检修
WHEN health_score < 65 THEN 'DEGRADED' -- 72h 内计划检修
WHEN health_score < 80 THEN 'WARNING' -- 关注监控
ELSE 'HEALTHY'
END AS health_status
FROM mine_transport_sensors
WHERE device_type = 'CONVEYOR'
AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 480 MINUTE
GROUP BY device_id, mine_id;
监控系统扩展:人员异常行为检测
对轨迹做静止时长分析,识别晕倒/困伤等紧急状态,与人员风险评级联动触发搜救预案:
CREATE DYNAMIC TABLE mine_personnel_behavior
REFRESH INTERVAL 3 MINUTE
AS
SELECT
person_id, mine_id, zone_id, shift_id,
MAX(collected_at) AS last_seen_at,
SUM(CASE WHEN location_x = LAG(location_x) OVER (PARTITION BY person_id ORDER BY collected_at)
AND location_y = LAG(location_y) OVER (PARTITION BY person_id ORDER BY collected_at)
THEN 1 ELSE 0 END) OVER (PARTITION BY person_id) AS static_seconds,
is_in_danger_zone,
CASE
WHEN static_seconds > 300 AND is_in_danger_zone THEN 'EMERGENCY' -- 危险区静止 5 分钟
WHEN static_seconds > 600 THEN 'ABNORMAL' -- 任意区静止 10 分钟
ELSE 'NORMAL'
END AS behavior_status
FROM mine_personnel_location
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE;
CREATE DYNAMIC TABLE mine_power_load_features
REFRESH INTERVAL 30 MINUTE
AS
SELECT
substation_id, mine_id,
DAYOFWEEK(collected_at) AS day_of_week,
HOUR(collected_at) AS hour_of_day,
AVG(CASE WHEN metric_name = 'ACTIVE_POWER' THEN value END) AS avg_load_kw,
MIN(CASE WHEN metric_name = 'POWER_FACTOR' THEN value END) AS min_pf
FROM mine_power_sensors
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10080 MINUTE
GROUP BY substation_id, mine_id, DAYOFWEEK(collected_at), HOUR(collected_at);
CREATE DYNAMIC TABLE mine_excavation_features
REFRESH INTERVAL 30 MINUTE
AS
SELECT
device_id, device_type, mine_id, working_face_id,
AVG(CASE WHEN metric_name = 'CURRENT' THEN value END) AS avg_current,
STDDEV(CASE WHEN metric_name = 'CURRENT' THEN value END) AS current_stddev,
MIN(CASE WHEN metric_name = 'PRESSURE' THEN value END) AS min_pressure,
COUNT(CASE WHEN metric_name = 'CURRENT' AND value > 420 THEN 1 END) AS overload_count
FROM mine_excavation_sensors
WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 480 MINUTE
GROUP BY device_id, device_type, mine_id, working_face_id;
CREATE DYNAMIC TABLE group_mine_safety_dashboard
REFRESH INTERVAL 10 MINUTE
AS
SELECT
mine_id,
COUNT(CASE WHEN overall_risk_level = 'CRITICAL' THEN 1 END) AS critical_count,
COUNT(CASE WHEN overall_risk_level = 'HIGH' THEN 1 END) AS high_count,
SUM(persons_in_danger) AS total_persons_at_risk,
GREATEST(0, 100
- COUNT(CASE WHEN overall_risk_level = 'CRITICAL' THEN 1 END) * 20
- COUNT(CASE WHEN overall_risk_level = 'HIGH' THEN 1 END) * 10
- SUM(persons_in_danger) * 5) AS safety_score
FROM mine_cross_system_anomalies
GROUP BY mine_id
ORDER BY safety_score ASC;
-- 场景:告警触发后,调度员搜索"ZONE-101 瓦斯 断电"找历史处置经验
SELECT doc_id, doc_type, summary, lessons_learned,
MATCH(chunk_text, summary) AGAINST ('ZONE-101 瓦斯 断电' IN NATURAL LANGUAGE MODE) AS relevance
FROM mine_doc_knowledge
WHERE mine_id = 'MINE-001'
AND MATCH(chunk_text, summary) AGAINST ('ZONE-101 瓦斯 断电' IN NATURAL LANGUAGE MODE)
ORDER BY relevance DESC
LIMIT 5;
-- 全文检索索引(在 setup.sql 中建立)
CREATE FULLTEXT INDEX idx_ft_know_text
ON TABLE mine_doc_knowledge(chunk_text, summary, key_findings, lessons_learned)
WITH PROPERTIES ('analyzer' = 'ik_max_word');
向量检索:基于语义相似度检索,捕捉"措辞不同但语义相同"的案例,比关键词检索覆盖更广:
-- 场景:将当前告警摘要向量化,找语义最相似的历史事故
WITH alert_vec AS (
SELECT AI_EMBEDDING('conn_dashscope:text-embedding-v3', anomaly_summary) AS q_vec
FROM mine_ai_safety_alerts WHERE alert_id = 'ALERT-ZONE-101-xxx'
)
SELECT k.doc_id, k.summary, k.lessons_learned,
COSINE_SIMILARITY(k.content_vector, a.q_vec) AS similarity
FROM mine_doc_knowledge k CROSS JOIN alert_vec a
WHERE k.mine_id = 'MINE-001' AND k.doc_type = 'ACCIDENT_REPORT'
ORDER BY similarity DESC
LIMIT 5;
-- 向量索引(HNSW 近似最近邻)
CREATE VECTOR INDEX idx_vec_know_content
ON TABLE mine_doc_knowledge(content_vector)
WITH PROPERTIES ('metric_type' = 'cosine', 'index_type' = 'hnsw');
Hybrid Search(混合检索):全文召回 × 向量重排序,精准率和召回率双优:
-- 全文检索召回 Top-20,再用向量语义重排序,加权融合输出 Top-5
WITH kw_results AS (
SELECT doc_id, chunk_id, summary, lessons_learned,
MATCH(chunk_text, summary) AGAINST ('瓦斯 断电 撤人') AS kw_score
FROM mine_doc_knowledge
WHERE mine_id = 'MINE-001'
AND MATCH(chunk_text, summary) AGAINST ('瓦斯 断电 撤人')
LIMIT 20
),
reranked AS (
SELECT k.*, COSINE_SIMILARITY(d.content_vector,
AI_EMBEDDING('conn_dashscope:text-embedding-v3', 'CH4超限人员撤离断电处置方案')
) AS vec_score
FROM kw_results k JOIN mine_doc_knowledge d
ON k.doc_id = d.doc_id AND k.chunk_id = d.chunk_id
)
SELECT doc_id, summary, lessons_learned,
kw_score * 0.3 + vec_score * 0.7 AS hybrid_score -- 向量权重更高
FROM reranked
ORDER BY hybrid_score DESC LIMIT 5;
WITH classified AS (
SELECT *, REGEXP_EXTRACT(AI_CLASSIFY(...), '(?s)\{.*\}', 0) AS classify_json
FROM anomalies
),
completed AS (
SELECT *, REGEXP_EXTRACT(AI_COMPLETE('conn_dashscope:deepseek-v3',
CONCAT('灾害类型:', GET_JSON_OBJECT(classify_json,'$.label'),
';历史相似案例:', rag_context,
';当前异常:', anomaly_summary, ...)),
'(?s)\{.*\}', 0) AS complete_json
FROM classified
)
-- 所有字段从同一 JSON 提取,结果完全一致
SELECT GET_JSON_OBJECT(complete_json, '$.alert_title') AS alert_title,
GET_JSON_OBJECT(complete_json, '$.immediate_actions') AS immediate_actions, ...
FROM completed;
相比独立 AI 平台,零数据搬运,无额外延迟,无 ETL 成本。
API Connection:多模型统一管理
所有 AI 调用通过
conn_dashscope
conn_dashscope
一个连接入口,支持运行时切换模型、密钥轮换、限流熔断,不修改任何 SQL 逻辑:
-- 当前使用 DeepSeek-V3,切换至 Qwen-Max 只需修改 Connection 配置
CREATE API CONNECTION IF NOT EXISTS conn_dashscope
TYPE ai_function
PROPERTIES (
'BASE_URL' = 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'API_KEY' = '<your-api-key>'
);
-- SQL 中引用保持不变:AI_COMPLETE('conn_dashscope:deepseek-v3', ...)
-- → AI_COMPLETE('conn_dashscope:qwen-max', ...)
CREATE BLOOMFILTER INDEX idx_bf_vent_zone ON TABLE mine_ventilation_sensors(zone_id);
CREATE BLOOMFILTER INDEX idx_bf_alert_level ON TABLE mine_ai_safety_alerts(overall_risk_level);
CREATE BLOOMFILTER INDEX idx_bf_rt_level ON TABLE mine_realtime_alerts(threshold_level);
全文检索:事故知识库关键词快速定位
矿山历年积累的事故调查报告、规程条文、检修记录,用 FULLTEXT INDEX 建立全文索引,调度员可直接用中文关键词(如"ZONE-101 瓦斯 断电")检索相关案例,无需额外 Elasticsearch 集群:
-- 全文索引(IK 中文分词,支持矿山专业术语)
CREATE FULLTEXT INDEX idx_ft_know_text
ON TABLE mine_doc_knowledge(chunk_text, summary, key_findings, lessons_learned)
WITH PROPERTIES ('analyzer' = 'ik_max_word');
-- 安全预警处置记录全文检索
CREATE FULLTEXT INDEX idx_ft_alert_actions
ON TABLE mine_ai_safety_alerts(immediate_actions, alert_title)
WITH PROPERTIES ('analyzer' = 'ik_max_word');
为什么本场景适合全文检索: 矿山事故报告存在大量行业术语("综采工作面""顶板来压""采空区自燃"),关键词精确匹配比 LIKE 模糊查询快 100 倍以上,且支持相关度排序,帮助调度员在秒级内找到最相关的历史处置方案。
-- 告警触发后,自动检索相关历史经验(毫秒级响应)
SELECT summary, lessons_learned,
MATCH(chunk_text, summary) AGAINST ('CH4超限 断电 撤人' IN NATURAL LANGUAGE MODE) AS score
FROM mine_doc_knowledge
WHERE mine_id = 'MINE-001'
AND MATCH(chunk_text, summary) AGAINST ('CH4超限 断电 撤人' IN NATURAL LANGUAGE MODE)
ORDER BY score DESC LIMIT 5;
-- 向量索引(HNSW 算法,余弦相似度)
CREATE VECTOR INDEX idx_vec_know_content
ON TABLE mine_doc_knowledge(content_vector)
WITH PROPERTIES ('metric_type' = 'cosine', 'index_type' = 'hnsw');
-- AI_EMBEDDING 生成向量(在 pipeline.sql 链路 B 中调用)
AI_EMBEDDING('conn_dashscope:text-embedding-v3', summary_text) AS content_vector
RAG 三步流程(全在 SQL 内完成):
-- Step 1:将当前告警向量化
-- Step 2:向量检索 Top-K 相似历史案例
-- Step 3:将检索结果注入 AI_COMPLETE Prompt
WITH query_vec AS (
SELECT AI_EMBEDDING('conn_dashscope:text-embedding-v3', anomaly_summary) AS vec
FROM mine_ai_safety_alerts WHERE alert_id = 'ALERT-ZONE-101-xxx'
),
top_k AS (
SELECT k.lessons_learned,
COSINE_SIMILARITY(k.content_vector, q.vec) AS sim
FROM mine_doc_knowledge k CROSS JOIN query_vec q
WHERE k.doc_type = 'ACCIDENT_REPORT'
ORDER BY sim DESC LIMIT 3
)
SELECT AI_COMPLETE('conn_dashscope:deepseek-v3',
CONCAT('历史相似事故经验:\n',
GROUP_CONCAT(lessons_learned SEPARATOR '\n'),
'\n\n基于以上经验,给出处置建议:', anomaly_summary)
) AS rag_advice
FROM top_k, mine_ai_safety_alerts
WHERE alert_id = 'ALERT-ZONE-101-xxx';
与 BoolFilter 的协同: 在向量检索前先用 BoolFilter 过滤 mine_id、doc_type 缩小候选集,再做 ANN 搜索,检索延迟从秒级降至百毫秒级。
-- 六个子系统,一条 SQL 完成跨系统关联,无中间层,无数据复制
FROM mine_ventilation_metrics v
LEFT JOIN mine_personnel_metrics p ON v.zone_id = p.zone_id AND v.mine_id = p.mine_id
LEFT JOIN mine_power_metrics pw ON v.zone_id = pw.zone_id AND v.mine_id = pw.mine_id
LEFT JOIN mine_excavation_metrics ex ON ...
LEFT JOIN mine_transport_metrics tr ON ...