智慧矿山安全预警解决方案

基于 ClickZetta Lakehouse,将煤矿/非煤矿山六大子系统数据统一接入,实现跨系统关联分析与 AI 驱动的主动预警,从"事后报警"升级为"事前预测",6 周完成 PoC 上线。


一、业务背景

中国原煤年产量约 47 亿吨,占全球近 50%。矿山开采是高危行业——2001–2022 年煤矿水害事故累计死亡 4,667 人,一次重大瓦斯事故直接损失超 5,000 万元,且每班 200~500 人在井下作业,预警失效即群死群伤。

当前矿山数字化的根本矛盾:传感器每秒产生海量数据,但六大子系统各自独立运行,数据孤岛严重,跨系统关联分析几乎是空白。

政策强制要求(2025 年起刚需)

要求规范依据
CH₄ 超限 1 秒内自动断电,违规停产《煤矿安全规程》2022 版
安全监控数据实时上报省级/国家平台AQ 1029-2019
传感器历史数据保存 ≥ 1 年GB 16423-2020
每班入井人员位置实时可查《矿山智能化建设指南 2025》

全球互联矿山市场 2033 年预计达 487 亿美元(CAGR 13.4%),中国约 4,000 座规模以上煤矿中仅 30% 完成初步智能化,数据融合层是最大空白

六大子系统

子系统核心指标采集频率安全权重
通风系统CH₄、CO、风速、温度1~10s★★★★★
监控系统人员位置、违规行为1s/人★★★★★
运输系统跑偏量、绳张力、车辆位置1~5s★★★★☆
供电系统三相电压/电流、功率因数50ms★★★★☆
采掘系统截割电流、支架压力100ms~1s★★★☆☆
调度系统产量计划、安全事件、工单业务驱动★★★☆☆

通风 + 监控权重最高:瓦斯突出 × 人员在场 = 群死群伤,两个系统数据在传统架构中完全隔离,正是本方案的核心切入点。


二、行业痛点

量化事故数据

  • 2001–2022 年,中国煤矿水害事故共 1,103 起,死亡 4,667 人
  • 顶板事故占煤矿总事故约 30%~40%,地下矿山为重灾区
  • 瓦斯事故仍是煤矿最大威胁,突出矿井瓦斯浓度超限事件频发

传统方案三大缺陷

① 数据孤岛,无法联动

六大子系统由不同厂商独立建设,Modbus/OPC-UA/MQTT/私有协议并存。典型失效场景:

  • ZONE-101 CH₄ 浓度超 1.0% 报警 → 但供电馈电开关未自动断开
  • 人员定位显示 3 人在危险区 → 但通风系统不知道,调度也没收到推送

② 预警滞后,只能救火

现有系统对五大灾害(瓦斯/火灾/水害/顶板/粉尘)仅做阈值报警,不具备趋势预测能力。CH₄ 从 0.7% 上升至 1.0% 平均需 20~30 分钟,这段时间完全可以预警撤人——但传统系统不会主动提示。

③ 人工依赖,难以规模化

调度指挥依赖人工经验,每班次核查 2,000+ 监测点不现实。巡检人员难以覆盖全矿区,隐患排查不及时。

数据平台视角的痛点

问题表现
协议异构多厂商系统协议不统一,接入成本高
实时性不足安全预警需秒级响应,传统批处理架构无法满足
AI 落地难模型训练与推理分离,迭代周期长,无统一 Feature Store
标准化缺失各系统数据模型不统一,跨系统分析依赖人工

核心缺口正好是 ClickZetta Lakehouse 的强项——统一接入、实时聚合、SQL 内嵌 AI,一套平台打通六个孤岛。


三、解决方案

整体架构

架构图

数据模型

原始层(8 张表)

-- 通风系统:CH₄/CO/风速/温度等传感器数据 CREATE TABLE mine_ventilation_sensors ( collected_at TIMESTAMP_NTZ NOT NULL, sensor_id STRING NOT NULL, mine_id STRING NOT NULL, zone_id STRING NOT NULL, sensor_type STRING NOT NULL, -- CH4/CO/CO2/WIND_SPEED/TEMP/HUMIDITY value DOUBLE NOT NULL, unit STRING NOT NULL, device_status STRING NOT NULL, PRIMARY KEY (collected_at, sensor_id) ) PARTITIONED BY (DAYS(collected_at)); -- 人员定位(UWB,1 秒/次) CREATE TABLE mine_personnel_location ( collected_at TIMESTAMP_NTZ NOT NULL, person_id STRING NOT NULL, mine_id STRING NOT NULL, zone_id STRING NOT NULL, location_x DOUBLE NOT NULL, location_y DOUBLE NOT NULL, depth DOUBLE NOT NULL, is_in_danger_zone BOOLEAN NOT NULL DEFAULT FALSE, shift_id STRING NOT NULL, PRIMARY KEY (collected_at, person_id) ) PARTITIONED BY (DAYS(collected_at)); -- 安全事件台账(跨系统联动事件标记) CREATE TABLE mine_safety_events ( event_at TIMESTAMP_NTZ NOT NULL, event_id STRING NOT NULL, mine_id STRING NOT NULL, zone_id STRING NOT NULL, source_system STRING NOT NULL, event_level STRING NOT NULL, -- INFO/WARNING/ALARM/EMERGENCY is_cross_system BOOLEAN NOT NULL DEFAULT FALSE, related_event_ids STRING, PRIMARY KEY (event_at, event_id) ) PARTITIONED BY (DAYS(event_at));

关键索引

CREATE BLOOMFILTER INDEX idx_bf_vent_zone ON TABLE mine_ventilation_sensors(zone_id); CREATE BLOOMFILTER INDEX idx_bf_person_zone ON TABLE mine_personnel_location(zone_id); CREATE BLOOMFILTER INDEX idx_bf_safety_level ON TABLE mine_safety_events(event_level);

三层数据管道

第一层:聚合层 Dynamic Table(每子系统一张,5 分钟刷新)

-- 通风指标聚合:按区域统计最近 10 分钟各气体指标 CREATE DYNAMIC TABLE mine_ventilation_metrics REFRESH INTERVAL 5 MINUTE AS SELECT zone_id, mine_id, MAX(CASE WHEN sensor_type = 'CH4' THEN value END) AS ch4_max_pct, MIN(CASE WHEN sensor_type = 'WIND_SPEED' THEN value END) AS wind_speed_min, CASE WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 1.5 THEN 'CRITICAL' WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 1.0 THEN 'HIGH' WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) >= 0.7 THEN 'MEDIUM' ELSE 'NORMAL' END AS ch4_risk_level FROM mine_ventilation_sensors WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE GROUP BY zone_id, mine_id;

第二层:跨系统关联异常检测 Dynamic Table(核心差异化)

-- 将五个聚合层 JOIN 在一起,供电通过 zone_id + mine_id 精确关联,输出综合风险评级 CREATE DYNAMIC TABLE mine_cross_system_anomalies REFRESH INTERVAL 5 MINUTE AS SELECT v.zone_id, v.ch4_risk_level, pw.switch_status AS feeder_switch_status, -- 注意别名,非 pw.switch_status -- 综合风险评级:多系统联合判断 CASE WHEN v.ch4_risk_level = 'CRITICAL' AND COALESCE(p.persons_in_danger, 0) > 0 AND pw.switch_status = 'CLOSED' THEN 'CRITICAL' WHEN v.ch4_risk_level IN ('CRITICAL','HIGH') AND COALESCE(p.persons_in_danger, 0) > 0 THEN 'HIGH' ... END AS overall_risk_level, -- 结构化摘要,同时作为 AI_CLASSIFY 和 AI_COMPLETE 的输入 CONCAT_WS(';', CASE WHEN v.ch4_risk_level != 'NORMAL' THEN CONCAT('CH4浓度', v.ch4_max_pct, '%') END, CASE WHEN COALESCE(p.persons_in_danger, 0) > 0 THEN CONCAT(p.persons_in_danger, '人处于危险区域') END, CASE WHEN pw.switch_status = 'CLOSED' AND v.ch4_risk_level IN ('HIGH','CRITICAL') THEN '馈电开关未断开(需联动断电)' END ) AS anomaly_summary FROM mine_ventilation_metrics v LEFT JOIN mine_personnel_metrics p ON v.zone_id = p.zone_id AND v.mine_id = p.mine_id LEFT JOIN mine_power_metrics pw ON v.zone_id = pw.zone_id AND v.mine_id = pw.mine_id LEFT JOIN mine_excavation_metrics ex ON v.mine_id = ex.mine_id AND ex.working_face_id = CONCAT('FACE-', SUBSTR(v.zone_id, 6)) LEFT JOIN mine_transport_metrics tr -- zone_id 精确关联:皮带异常只关联本区域,不跨区污染 ON v.zone_id = tr.zone_id AND v.mine_id = tr.mine_id AND tr.device_type = 'CONVEYOR' WHERE v.ch4_risk_level != 'NORMAL' OR v.co_risk_level != 'NORMAL' OR ...;

第三层:AI 分类 + AI 预警工单生成(CTE 单次调用)

-- CTE 结构:AI_CLASSIFY 一次调用 + AI_COMPLETE 一次调用,避免多次调用结果不一致 WITH anomalies AS ( SELECT * FROM mine_cross_system_anomalies WHERE overall_risk_level IN ('HIGH', 'CRITICAL') ), classified AS ( SELECT *, REGEXP_EXTRACT(AI_CLASSIFY('conn_dashscope:deepseek-v3', anomaly_summary, ARRAY('GAS','FIRE','ROOF','FLOOD','EQUIPMENT','COMPOUND')), '(?s)\{.*\}', 0) AS classify_json FROM anomalies ), completed AS ( SELECT *, REGEXP_EXTRACT(AI_COMPLETE('conn_dashscope:deepseek-v3', CONCAT('你是矿山安全工程师,灾害类型:', COALESCE(GET_JSON_OBJECT(classify_json,'$.label'),'未知'), ',异常:', anomaly_summary, ...)), '(?s)\{.*\}', 0) AS complete_json FROM classified ) INSERT INTO mine_ai_safety_alerts (...) SELECT GET_JSON_OBJECT(classify_json, '$.label') AS disaster_type, CAST(GET_JSON_OBJECT(classify_json, '$.score') AS DOUBLE) AS disaster_confidence, GET_JSON_OBJECT(complete_json, '$.alert_title') AS alert_title, GET_JSON_OBJECT(complete_json, '$.immediate_actions') AS immediate_actions, CAST(GET_JSON_OBJECT(complete_json, '$.evacuation_required') AS BOOLEAN) AS evacuation_required, CAST(GET_JSON_OBJECT(complete_json, '$.power_cutoff_required') AS BOOLEAN) AS power_cutoff_required, ... FROM completed;

三大联动场景

场景涉及系统触发条件AI 处置建议
瓦斯-人员-供电联动通风+监控+供电CH₄≥1.0% + 人员在场 + 馈电未断立即断电/撤人/加强通风
皮带故障-生产链路中断运输+调度皮带跑偏>50mm + 采煤机作业中停机检修/调整产量计划
采空区自燃预警通风+调度CO 浓度趋势上升(>24ppm 且增速>2ppm/10min)注浆封堵/调整通风方式

链路 A:实时数据处理链路

现有 Dynamic Table 5 分钟刷新对多数场景已足够,但部分安全事件(CH₄ 突升、风速骤降)需要在 1 分钟内被感知。实时链路在现有 5 层管道基础上叠加一条高频旁路:

传感器 → MQTT 采集网关 → mine_ventilation_sensors ↓ Change Tracking 驱动 Studio Task(1 分钟 Cron) ↓ 单点阈值 HAVING 过滤 mine_realtime_alerts ↓ 推送:WebSocket / 调度台 / 移动端

关键设计决策:

  • 1 分钟窗口聚合(非逐条触发):避免传感器抖动产生海量误报,同时将延迟控制在 ≤90 秒
  • 规程阈值硬编码:CH₄ 0.75%/1.0%/1.5%、CO 24/50ppm、风速 0.25m/s、温度 26/30℃ 均直接写入 CASE 逻辑,不依赖外部配置
  • ingestion_lag_ms 监控:记录每条数据从传感器到入库的延迟,用于 Studio 监控面板检测采集链路健康状态
  • 与 Dynamic Table 分工:实时链路负责单点极值触发,Dynamic Table 负责多传感器聚合和跨系统关联,两者互补

-- 实时链路核心:1 分钟窗口,只保留超阈值记录 INSERT INTO mine_realtime_alerts (...) SELECT sensor_id, sensor_type, MAX(value) AS current_value, alert_rule, ... FROM mine_ventilation_sensors WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 1 MINUTE GROUP BY mine_id, zone_id, sensor_id, sensor_type HAVING (sensor_type = 'CH4' AND MAX(value) >= 0.75) OR (sensor_type = 'CO' AND MAX(value) >= 24) OR (sensor_type = 'WIND_SPEED' AND MAX(value) < 0.25) OR (sensor_type = 'TEMP' AND MAX(value) >= 26);

链路 B:离线非结构化数据处理链路

矿山积累了大量非结构化数据:历年事故调查报告、设备检修记录、地质勘探档案、规程培训材料。这些数据是最宝贵的安全知识资产,但长期以 PDF/Word/扫描件形式孤立存放,无法被 AI 检索利用。

离线链路将这些文档转化为可检索的结构化知识库:

PDF/Word/图片(对象存储) ↓ OCR / 文档解析 mine_unstructured_docs(原始文本 + 元数据) ↓ Studio Task(每日批量) ↓ AI_COMPLETE(矿山安全专家 Prompt) mine_doc_knowledge(结构化知识:摘要/风险/经验教训) ↓ 实时预警 AI 增强(RAG 检索历史相似事故的处置方案)

支持的文档类型:

doc_type来源AI 提取重点
ACCIDENT_REPORT事故调查报告事故经过、根本原因、经验教训(lessons_learned)
MAINTENANCE_LOG设备检修记录故障模式、更换部件、预防建议
GEO_SURVEY地质勘探档案地层结构、断层位置、瓦斯赋存规律
REGULATION规程/标准文件适用条款、关键阈值、操作要求
TRAINING培训材料安全操作规范、应急处置流程

RAG 增强: 当发生新告警时,可从

mine_doc_knowledge
mine_doc_knowledge
检索同区域、同灾害类型的历史事故经验,将其注入 AI_COMPLETE 的 Prompt,使生成的处置建议更具针对性。

-- 检索相似历史事故,注入预警 AI 的 Prompt SELECT summary, lessons_learned FROM mine_doc_knowledge WHERE mine_id = 'MINE-001' AND involved_zones LIKE '%ZONE-101%' AND disaster_types LIKE '%GAS%' ORDER BY processed_at DESC LIMIT 3;

完整技术实现矩阵

以下为各业务场景的完整技术方案,均基于 ClickZetta Lakehouse 原生能力实现,无需引入外部框架。

通风系统扩展:瓦斯浓度趋势预测

当前实时链路只做单点阈值判断,以下动态表补充趋势预测,CH₄ 缓慢爬升期间提前 20~30 分钟预警:

CREATE DYNAMIC TABLE mine_ch4_trend REFRESH INTERVAL 5 MINUTE AS SELECT zone_id, mine_id, MAX(CASE WHEN sensor_type = 'CH4' THEN value END) AS ch4_current, (AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) - AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END) ) / 25.0 AS ch4_slope_per_min, MAX(CASE WHEN sensor_type = 'CH4' THEN value END) + (AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) - AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END) ) AS ch4_predicted_30min, CASE WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) + (AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) - AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END) ) >= 1.0 THEN 'TREND_WARN' WHEN MAX(CASE WHEN sensor_type = 'CH4' THEN value END) + (AVG(CASE WHEN sensor_type = 'CH4' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE THEN value END) - AVG(CASE WHEN sensor_type = 'CH4' AND collected_at <= CURRENT_TIMESTAMP() - INTERVAL 25 MINUTE THEN value END) ) >= 0.75 THEN 'TREND_CAUTION' ELSE 'STABLE' END AS trend_level FROM mine_ventilation_sensors WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 30 MINUTE AND sensor_type = 'CH4' GROUP BY zone_id, mine_id;

运输系统扩展:皮带全生命周期健康评分

按班次综合跑偏量(权重 40%)、温度(30%)、电流过载(30%)计算健康分 0~100,驱动检修排期:

CREATE DYNAMIC TABLE mine_belt_health_score REFRESH INTERVAL 30 MINUTE AS SELECT device_id, mine_id, ROUND( GREATEST(0, 100 - MAX(CASE WHEN metric_name = 'DEVIATION' THEN value ELSE 0 END) / 70.0 * 40) * 0.4 + GREATEST(0, 100 - GREATEST(0, MAX(CASE WHEN metric_name = 'TEMP' THEN value ELSE 0 END) - 50) * 2) * 0.3 + GREATEST(0, 100 - GREATEST(0, MAX(CASE WHEN metric_name = 'CURRENT' THEN value ELSE 0 END) - 85) / 85.0 * 30) * 0.3, 1) AS health_score, CASE WHEN health_score < 40 THEN 'CRITICAL' -- 立即检修 WHEN health_score < 65 THEN 'DEGRADED' -- 72h 内计划检修 WHEN health_score < 80 THEN 'WARNING' -- 关注监控 ELSE 'HEALTHY' END AS health_status FROM mine_transport_sensors WHERE device_type = 'CONVEYOR' AND collected_at >= CURRENT_TIMESTAMP() - INTERVAL 480 MINUTE GROUP BY device_id, mine_id;

监控系统扩展:人员异常行为检测

对轨迹做静止时长分析,识别晕倒/困伤等紧急状态,与人员风险评级联动触发搜救预案:

CREATE DYNAMIC TABLE mine_personnel_behavior REFRESH INTERVAL 3 MINUTE AS SELECT person_id, mine_id, zone_id, shift_id, MAX(collected_at) AS last_seen_at, SUM(CASE WHEN location_x = LAG(location_x) OVER (PARTITION BY person_id ORDER BY collected_at) AND location_y = LAG(location_y) OVER (PARTITION BY person_id ORDER BY collected_at) THEN 1 ELSE 0 END) OVER (PARTITION BY person_id) AS static_seconds, is_in_danger_zone, CASE WHEN static_seconds > 300 AND is_in_danger_zone THEN 'EMERGENCY' -- 危险区静止 5 分钟 WHEN static_seconds > 600 THEN 'ABNORMAL' -- 任意区静止 10 分钟 ELSE 'NORMAL' END AS behavior_status FROM mine_personnel_location WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE;

供电系统扩展:负荷预测与无功补偿优化

基于历史同期特征(同星期、同时段)用 AI_COMPLETE 预测下一小时负荷,提前调度无功补偿投入时序,功率因数目标 ≥ 0.92,年节电 30~80 万元/矿:

CREATE DYNAMIC TABLE mine_power_load_features REFRESH INTERVAL 30 MINUTE AS SELECT substation_id, mine_id, DAYOFWEEK(collected_at) AS day_of_week, HOUR(collected_at) AS hour_of_day, AVG(CASE WHEN metric_name = 'ACTIVE_POWER' THEN value END) AS avg_load_kw, MIN(CASE WHEN metric_name = 'POWER_FACTOR' THEN value END) AS min_pf FROM mine_power_sensors WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 10080 MINUTE GROUP BY substation_id, mine_id, DAYOFWEEK(collected_at), HOUR(collected_at);

采掘系统扩展:设备预测性维护(PHM)

基于电流波动、过载次数、液压压力特征向量,AI 预测剩余可用时间(RUL)并自动派发 P1/P2 维修工单:

CREATE DYNAMIC TABLE mine_excavation_features REFRESH INTERVAL 30 MINUTE AS SELECT device_id, device_type, mine_id, working_face_id, AVG(CASE WHEN metric_name = 'CURRENT' THEN value END) AS avg_current, STDDEV(CASE WHEN metric_name = 'CURRENT' THEN value END) AS current_stddev, MIN(CASE WHEN metric_name = 'PRESSURE' THEN value END) AS min_pressure, COUNT(CASE WHEN metric_name = 'CURRENT' AND value > 420 THEN 1 END) AS overload_count FROM mine_excavation_sensors WHERE collected_at >= CURRENT_TIMESTAMP() - INTERVAL 480 MINUTE GROUP BY device_id, device_type, mine_id, working_face_id;

overload_count >= 3
overload_count >= 3
时触发 AI_COMPLETE RUL 预测,输出
{"rul_hours": 数值, "priority": "P1/P2", "advice": "..."}
{"rul_hours": 数值, "priority": "P1/P2", "advice": "..."}
并自动写入
mine_maintenance_orders
mine_maintenance_orders

调度系统扩展:通风网络智能优化

通风网络解算:根据各区域 CH₄ 浓度反推所需风量,AI 输出局扇调节方案写入调度表,通风电耗目标降低 15~20%,同时保障所有区域 CH₄ < 0.5%。

全矿区:集团化安全管理

集团层统一 workspace,各矿安全评分实时排名,安全评分最低的矿优先介入:

CREATE DYNAMIC TABLE group_mine_safety_dashboard REFRESH INTERVAL 10 MINUTE AS SELECT mine_id, COUNT(CASE WHEN overall_risk_level = 'CRITICAL' THEN 1 END) AS critical_count, COUNT(CASE WHEN overall_risk_level = 'HIGH' THEN 1 END) AS high_count, SUM(persons_in_danger) AS total_persons_at_risk, GREATEST(0, 100 - COUNT(CASE WHEN overall_risk_level = 'CRITICAL' THEN 1 END) * 20 - COUNT(CASE WHEN overall_risk_level = 'HIGH' THEN 1 END) * 10 - SUM(persons_in_danger) * 5) AS safety_score FROM mine_cross_system_anomalies GROUP BY mine_id ORDER BY safety_score ASC;

矿山长期积累的事故报告、检修记录、地质档案是最宝贵的安全知识资产。ClickZetta 原生支持全文检索和向量检索,将这些非结构化文档变成可机器检索的知识库,为实时预警提供 RAG 增强。

全文检索:调度员用自然语言关键词快速定位相关案例,支持中文 IK 分词:

-- 场景:告警触发后,调度员搜索"ZONE-101 瓦斯 断电"找历史处置经验 SELECT doc_id, doc_type, summary, lessons_learned, MATCH(chunk_text, summary) AGAINST ('ZONE-101 瓦斯 断电' IN NATURAL LANGUAGE MODE) AS relevance FROM mine_doc_knowledge WHERE mine_id = 'MINE-001' AND MATCH(chunk_text, summary) AGAINST ('ZONE-101 瓦斯 断电' IN NATURAL LANGUAGE MODE) ORDER BY relevance DESC LIMIT 5; -- 全文检索索引(在 setup.sql 中建立) CREATE FULLTEXT INDEX idx_ft_know_text ON TABLE mine_doc_knowledge(chunk_text, summary, key_findings, lessons_learned) WITH PROPERTIES ('analyzer' = 'ik_max_word');

向量检索:基于语义相似度检索,捕捉"措辞不同但语义相同"的案例,比关键词检索覆盖更广:

-- 场景:将当前告警摘要向量化,找语义最相似的历史事故 WITH alert_vec AS ( SELECT AI_EMBEDDING('conn_dashscope:text-embedding-v3', anomaly_summary) AS q_vec FROM mine_ai_safety_alerts WHERE alert_id = 'ALERT-ZONE-101-xxx' ) SELECT k.doc_id, k.summary, k.lessons_learned, COSINE_SIMILARITY(k.content_vector, a.q_vec) AS similarity FROM mine_doc_knowledge k CROSS JOIN alert_vec a WHERE k.mine_id = 'MINE-001' AND k.doc_type = 'ACCIDENT_REPORT' ORDER BY similarity DESC LIMIT 5; -- 向量索引(HNSW 近似最近邻) CREATE VECTOR INDEX idx_vec_know_content ON TABLE mine_doc_knowledge(content_vector) WITH PROPERTIES ('metric_type' = 'cosine', 'index_type' = 'hnsw');

Hybrid Search(混合检索):全文召回 × 向量重排序,精准率和召回率双优:

-- 全文检索召回 Top-20,再用向量语义重排序,加权融合输出 Top-5 WITH kw_results AS ( SELECT doc_id, chunk_id, summary, lessons_learned, MATCH(chunk_text, summary) AGAINST ('瓦斯 断电 撤人') AS kw_score FROM mine_doc_knowledge WHERE mine_id = 'MINE-001' AND MATCH(chunk_text, summary) AGAINST ('瓦斯 断电 撤人') LIMIT 20 ), reranked AS ( SELECT k.*, COSINE_SIMILARITY(d.content_vector, AI_EMBEDDING('conn_dashscope:text-embedding-v3', 'CH4超限人员撤离断电处置方案') ) AS vec_score FROM kw_results k JOIN mine_doc_knowledge d ON k.doc_id = d.doc_id AND k.chunk_id = d.chunk_id ) SELECT doc_id, summary, lessons_learned, kw_score * 0.3 + vec_score * 0.7 AS hybrid_score -- 向量权重更高 FROM reranked ORDER BY hybrid_score DESC LIMIT 5;

这三种检索能力直接用 SQL 表达,无需独立搜索引擎(Elasticsearch/Milvus)。


四、ClickZetta 技术优势

矿山传感器每秒产生数千条数据,安全预警不能等批处理窗口。ClickZetta 用一种机制同时覆盖分钟级聚合和秒级趋势计算:

层次刷新间隔用途传统方案等价
实时链路1 min(Studio Cron)单点超阈值极值触发Flink CEP 规则引擎
聚合层 DT5 min多传感器滚动聚合 + 风险评级Spark Structured Streaming
异常层 DT5 min跨系统关联 + 综合评级独立关联计算服务
趋势预测 DT5 minCH₄ 斜率 + 30min 预测独立时序预测服务

-- 4 种时间窗口,同一 SQL 范式,零额外系统 CREATE DYNAMIC TABLE mine_ch4_trend REFRESH INTERVAL 5 MINUTE AS ...; CREATE DYNAMIC TABLE mine_belt_health REFRESH INTERVAL 30 MINUTE AS ...; CREATE DYNAMIC TABLE mine_power_features REFRESH INTERVAL 30 MINUTE AS ...;

Change Tracking 驱动增量计算,TB 级历史数据不影响刷新性能。传统方案需要独立的流处理集群(Flink/Spark Streaming)+ 批处理调度两套系统,ClickZetta 用 SQL 声明式替代。

AI_CLASSIFY + AI_COMPLETE:SQL 内嵌两级 AI 推理

矿山安全需要两种截然不同的 AI 能力,ClickZetta 在同一 CTE 内串联完成:

第一级:AI_CLASSIFY 快速分类(低延迟,低成本)

对异常摘要做多分类,输出灾害类型标签(GAS/FIRE/ROOF/FLOOD/EQUIPMENT/COMPOUND)。相比用 AI_COMPLETE 问"这是什么灾害",分类速度更快、token 消耗更少,适合高频触发场景:

AI_CLASSIFY( 'conn_dashscope:deepseek-v3', anomaly_summary, ARRAY('GAS', 'FIRE', 'ROOF', 'FLOOD', 'EQUIPMENT', 'COMPOUND') ) AS classify_json -- 返回 {"label":"GAS","score":0.92}

第二级:AI_COMPLETE 生成结构化处置方案(深度推理)

将灾害类型 + 跨系统异常摘要 + RAG 检索到的历史事故经验一起注入 Prompt,生成完整处置方案。CTE 结构保证两级各调用一次,避免重复调用导致结果不一致:

WITH classified AS ( SELECT *, REGEXP_EXTRACT(AI_CLASSIFY(...), '(?s)\{.*\}', 0) AS classify_json FROM anomalies ), completed AS ( SELECT *, REGEXP_EXTRACT(AI_COMPLETE('conn_dashscope:deepseek-v3', CONCAT('灾害类型:', GET_JSON_OBJECT(classify_json,'$.label'), ';历史相似案例:', rag_context, ';当前异常:', anomaly_summary, ...)), '(?s)\{.*\}', 0) AS complete_json FROM classified ) -- 所有字段从同一 JSON 提取,结果完全一致 SELECT GET_JSON_OBJECT(complete_json, '$.alert_title') AS alert_title, GET_JSON_OBJECT(complete_json, '$.immediate_actions') AS immediate_actions, ... FROM completed;

相比独立 AI 平台,零数据搬运,无额外延迟,无 ETL 成本。

API Connection:多模型统一管理

所有 AI 调用通过

conn_dashscope
conn_dashscope
一个连接入口,支持运行时切换模型、密钥轮换、限流熔断,不修改任何 SQL 逻辑:

-- 当前使用 DeepSeek-V3,切换至 Qwen-Max 只需修改 Connection 配置 CREATE API CONNECTION IF NOT EXISTS conn_dashscope TYPE ai_function PROPERTIES ( 'BASE_URL' = 'https://dashscope.aliyuncs.com/compatible-mode/v1', 'API_KEY' = '<your-api-key>' ); -- SQL 中引用保持不变:AI_COMPLETE('conn_dashscope:deepseek-v3', ...) -- → AI_COMPLETE('conn_dashscope:qwen-max', ...)

支持接入多家 LLM(DashScope、DeepSeek、OpenAI 兼容接口),以及领域微调后的矿山安全专有模型。

BoolFilter:千万级监测点毫秒级检索

单矿 500~2,000 个传感器点位,中型矿区可达数万个,集团级可达百万级。BoolFilter 索引将 zone_id/sensor_type/event_level 的点查延迟从秒级降至毫秒级,是高频告警查询的基础:

CREATE BLOOMFILTER INDEX idx_bf_vent_zone ON TABLE mine_ventilation_sensors(zone_id); CREATE BLOOMFILTER INDEX idx_bf_alert_level ON TABLE mine_ai_safety_alerts(overall_risk_level); CREATE BLOOMFILTER INDEX idx_bf_rt_level ON TABLE mine_realtime_alerts(threshold_level);

全文检索:事故知识库关键词快速定位

矿山历年积累的事故调查报告、规程条文、检修记录,用 FULLTEXT INDEX 建立全文索引,调度员可直接用中文关键词(如"ZONE-101 瓦斯 断电")检索相关案例,无需额外 Elasticsearch 集群:

-- 全文索引(IK 中文分词,支持矿山专业术语) CREATE FULLTEXT INDEX idx_ft_know_text ON TABLE mine_doc_knowledge(chunk_text, summary, key_findings, lessons_learned) WITH PROPERTIES ('analyzer' = 'ik_max_word'); -- 安全预警处置记录全文检索 CREATE FULLTEXT INDEX idx_ft_alert_actions ON TABLE mine_ai_safety_alerts(immediate_actions, alert_title) WITH PROPERTIES ('analyzer' = 'ik_max_word');

为什么本场景适合全文检索: 矿山事故报告存在大量行业术语("综采工作面""顶板来压""采空区自燃"),关键词精确匹配比 LIKE 模糊查询快 100 倍以上,且支持相关度排序,帮助调度员在秒级内找到最相关的历史处置方案。

-- 告警触发后,自动检索相关历史经验(毫秒级响应) SELECT summary, lessons_learned, MATCH(chunk_text, summary) AGAINST ('CH4超限 断电 撤人' IN NATURAL LANGUAGE MODE) AS score FROM mine_doc_knowledge WHERE mine_id = 'MINE-001' AND MATCH(chunk_text, summary) AGAINST ('CH4超限 断电 撤人' IN NATURAL LANGUAGE MODE) ORDER BY score DESC LIMIT 5;

向量检索:语义相似度搜索与 RAG 增强

关键词检索依赖精确词汇匹配,对"描述方式不同但语义相同"的案例无能为力。向量检索基于 AI_EMBEDDING 生成的语义向量,即使措辞完全不同,只要语义相近就能被检索到:

-- 向量索引(HNSW 算法,余弦相似度) CREATE VECTOR INDEX idx_vec_know_content ON TABLE mine_doc_knowledge(content_vector) WITH PROPERTIES ('metric_type' = 'cosine', 'index_type' = 'hnsw'); -- AI_EMBEDDING 生成向量(在 pipeline.sql 链路 B 中调用) AI_EMBEDDING('conn_dashscope:text-embedding-v3', summary_text) AS content_vector

RAG 三步流程(全在 SQL 内完成):

-- Step 1:将当前告警向量化 -- Step 2:向量检索 Top-K 相似历史案例 -- Step 3:将检索结果注入 AI_COMPLETE Prompt WITH query_vec AS ( SELECT AI_EMBEDDING('conn_dashscope:text-embedding-v3', anomaly_summary) AS vec FROM mine_ai_safety_alerts WHERE alert_id = 'ALERT-ZONE-101-xxx' ), top_k AS ( SELECT k.lessons_learned, COSINE_SIMILARITY(k.content_vector, q.vec) AS sim FROM mine_doc_knowledge k CROSS JOIN query_vec q WHERE k.doc_type = 'ACCIDENT_REPORT' ORDER BY sim DESC LIMIT 3 ) SELECT AI_COMPLETE('conn_dashscope:deepseek-v3', CONCAT('历史相似事故经验:\n', GROUP_CONCAT(lessons_learned SEPARATOR '\n'), '\n\n基于以上经验,给出处置建议:', anomaly_summary) ) AS rag_advice FROM top_k, mine_ai_safety_alerts WHERE alert_id = 'ALERT-ZONE-101-xxx';

与 BoolFilter 的协同: 在向量检索前先用 BoolFilter 过滤 mine_id、doc_type 缩小候选集,再做 ANN 搜索,检索延迟从秒级降至百毫秒级。

为什么不需要独立向量数据库(Milvus/Pinecone): ClickZetta 原生支持 VECTOR 字段类型和 VECTOR INDEX,向量检索和结构化过滤(mine_id、zone_id、doc_type)在同一条 SQL 中完成,无需跨系统数据同步。

Lakehouse 统一架构:消灭数据孤岛

传统矿山六大子系统各自独立,跨系统关联分析需要定制集成层。ClickZetta Lakehouse 将所有数据落在同一存储层,跨系统 JOIN 与单表查询性能差异微乎其微:

-- 六个子系统,一条 SQL 完成跨系统关联,无中间层,无数据复制 FROM mine_ventilation_metrics v LEFT JOIN mine_personnel_metrics p ON v.zone_id = p.zone_id AND v.mine_id = p.mine_id LEFT JOIN mine_power_metrics pw ON v.zone_id = pw.zone_id AND v.mine_id = pw.mine_id LEFT JOIN mine_excavation_metrics ex ON ... LEFT JOIN mine_transport_metrics tr ON ...

同一平台承载实时流(Dynamic Table)、批处理(INSERT SELECT)、AI 推理(AI_COMPLETE)、非结构化处理(离线链路 B)四种模式,统一运维,统一权限管控。

Studio:端到端运维自动化

Studio 将整条数据管道纳入统一调度,不需要外部调度工具(Airflow/DolphinScheduler):

Studio 能力矿山方案中的用途
Task 定时调度(Cron)实时链路 1min 告警 + 离线链路每日知识库更新
工作流 DAG 编排REFRESH DT → AI 推理 → 预警写入的依赖顺序保障
API Connection 管理DeepSeek 密钥加密存储,运行时切换模型
运行监控面板DT 刷新延迟、AI 推理成功率、ingestion_lag_ms 可视化
SQL 编辑器Dynamic Table 交互式调试、AI_COMPLETE Prompt 迭代

技术优势对比总览

能力维度传统方案ClickZetta Lakehouse
实时预警延迟5~15 分钟(批处理窗口)≤90 秒(实时链路)/ 5 分钟(DT 聚合)
跨系统关联定制开发集成层,数月周期SQL JOIN,天级交付
AI 处置建议独立 AI 平台 + ETL 同步AI_COMPLETE 内嵌,零搬运
灾害类型分类人工判断 / 规则引擎AI_CLASSIFY 毫秒级分类
历史知识检索人工翻阅档案全文检索(IK 分词)+ 向量检索(HNSW)+ Hybrid Search
RAG 增强独立向量库(Milvus)+ 同步管道SQL 内嵌 AI_EMBEDDING + VECTOR INDEX,零外部依赖
非结构化文档独立文档管理系统,数据孤立离线链路统一入湖,知识沉淀
运维复杂度Kafka + Flink + Spark + AI 平台 + ETL单一 ClickZetta + Studio
PoC 交付周期6~12 个月6 周

五、客户价值

ROI 估算

指标改造前改造后改善幅度
重大安全事故基准-50%+合规风险降低,监管处罚减少
非计划停机时长基准-40%+以煤价 820 元/吨计,每减少 1h 停机约损失 20~80 万元
违规行为识别率~30%(人工巡检)90%+违规导致的事故率下降 60%
事故调查周期3~7 天< 1 天系统自动关联多系统日志
PoC 建设周期传统方案 6~12 个月6 周SQL-first,无需独立 ETL + AI 平台

运营效率提升

方向改善
调度决策数据支撑替代经验判断,预警提前量从 0 升至 15~30 分钟
合规上报自动生成监管所需报表,数据保留 ≥ 1 年
维修计划AI 预测故障剩余可用时间,变"抢修"为"预防性维护"

与传统方案的建设成本对比

维度传统方案(分散系统)ClickZetta Lakehouse
系统数量6+ 套独立系统1 套统一平台
跨系统联动需定制开发集成层SQL JOIN 原生支持
AI 能力独立 AI 平台 + MLOpsAI_COMPLETE 内置
运维人员每套系统需专职运维统一运维
数据保留成本分散存储,重复成本高统一 Lakehouse,存储成本低

完整能力全景

本方案已覆盖当前阶段核心能力,并内置以下进阶场景的技术基础:

场景核心技术预期价值状态
六大子系统实时预警Dynamic Table + 跨系统 JOIN预警提前量 15~30min已实现
AI 灾害分类 + 处置建议AI_CLASSIFY + AI_COMPLETE CTE处置方案自动生成已实现
实时链路(1min 窗口)Change Tracking + Studio Cron延迟 ≤90s,规程阈值硬编码已实现
离线知识库 + RAGAI_COMPLETE + mine_doc_knowledge历史经验注入 Prompt已实现
全文检索(事故知识库)FULLTEXT INDEX + IK 分词中文关键词秒级定位案例已实现
向量检索(语义相似度)VECTOR INDEX + AI_EMBEDDING + HNSWRAG 语义增强,无需独立向量库已实现
Hybrid Search(混合检索)全文召回 + 向量重排序,加权融合精准率+召回率双优已实现
瓦斯浓度趋势预测Dynamic Table 滑动窗口斜率预警提前量再 +20~30min可扩展
皮带健康评分班次聚合加权评分非计划停机 -40%可扩展
人员异常行为检测窗口函数轨迹静止分析缩短搜救响应时间可扩展
供电负荷预测历史特征 + AI_COMPLETE 时序预测年节电 30~80 万元/矿可扩展
设备 PHM(RUL 预测)特征工程 + AI 预测 + 自动工单停机减少 50%可扩展
通风网络智能优化风网解算 + AI 局扇调度通风电耗 -15~20%可扩展
数字孪生三维可视化3D 数据接口 + 实时视图 DT调度决策时间缩短至秒级规划中
矿山安全领域大模型语料构建 + SFT 微调AI 建议准确率 +30~50%规划中
合规自动上报AQ 1029 格式 + Studio 推送零人工,零漏报规划中
多矿区集团化管理跨 workspace 同步 + 安全评分集团事故率 -30%规划中

六、快速上手

前置依赖

  1. ClickZetta Lakehouse workspace(已创建 schema)
  2. DashScope / DeepSeek API Key(用于 AI_COMPLETE)

执行顺序

# 步骤 1:建表、索引、Change Tracking(仅首次执行) execute setup.sql # 步骤 2:写入测试数据 execute test_data.sql # ⏱ 步骤 2 完成后请立即执行步骤 3(8 分钟内) # 步骤 3:创建 Dynamic Table 并立即 REFRESH execute pipeline.sql # 包含 CREATE DYNAMIC TABLE 语句 REFRESH DYNAMIC TABLE mine_ventilation_metrics; REFRESH DYNAMIC TABLE mine_transport_metrics; REFRESH DYNAMIC TABLE mine_excavation_metrics; REFRESH DYNAMIC TABLE mine_power_metrics; REFRESH DYNAMIC TABLE mine_personnel_metrics; REFRESH DYNAMIC TABLE mine_production_metrics; REFRESH DYNAMIC TABLE mine_cross_system_anomalies; # 步骤 4:执行 AI 预警生成(需先在 pipeline.sql 中替换 API Key) # 执行 pipeline.sql 中的 INSERT INTO mine_ai_safety_alerts 部分

验证查询

-- 1. 聚合层:确认 ZONE-101 CH4 超停产断电阈值(≥1.0%) SELECT zone_id, ch4_max_pct, co_max_ppm, wind_speed_min, temp_max_celsius, ch4_risk_level, co_risk_level, temp_risk_level FROM mine_ventilation_metrics ORDER BY ch4_max_pct DESC; -- 期望:ZONE-101 ch4_max_pct=1.62(CRITICAL),co_max_ppm=32,ch4_risk_level=CRITICAL -- 2. 跨系统异常层:ZONE-101 CRITICAL(CH4超限+人员在场+馈电未断) SELECT zone_id, overall_risk_level, persons_in_danger, feeder_switch_status, anomaly_summary FROM mine_cross_system_anomalies ORDER BY overall_risk_level; -- 期望:ZONE-101 overall_risk_level=CRITICAL,persons_in_danger=3 -- anomaly_summary 含"CH4浓度1.62%【紧急超限】;3人处于危险区域;馈电开关未断开" -- 3. 人员风险核查(危险区 3 人) SELECT zone_id, total_persons, persons_in_danger, personnel_risk_level FROM mine_personnel_metrics WHERE personnel_risk_level != 'NORMAL'; -- 期望:ZONE-101 persons_in_danger=3,personnel_risk_level=CRITICAL -- 4. 运输风险(ZONE-201 皮带跑偏) SELECT zone_id, device_id, belt_deviation_mm, transport_risk_level, device_status FROM mine_transport_metrics WHERE transport_risk_level != 'NORMAL'; -- 期望:ZONE-201 belt_deviation_mm=68,transport_risk_level=HIGH,device_status=ALARM -- 5. AI 预警工单 SELECT alert_id, zone_id, overall_risk_level, disaster_type, disaster_confidence, alert_title, evacuation_required, power_cutoff_required, immediate_actions, responsible_team FROM mine_ai_safety_alerts ORDER BY alert_at DESC; -- 期望:HIGH/CRITICAL 告警有完整 immediate_actions,disaster_type=GAS, -- evacuation_required=true,power_cutoff_required=true

重新验证(数据已过期时)

如果之前的测试数据写入时间超过 10 分钟,Dynamic Table 将返回空结果。重置步骤:

# 方式 A:仅清空数据,保留表结构(推荐,速度快) # 取消注释 teardown.sql 中的 TRUNCATE 语句并执行, # 然后从步骤 2 重新开始 # 方式 B:完全重建(彻底清除所有对象后重新执行 setup.sql) execute teardown.sql # DROP 所有表和 DT execute setup.sql # 重建表结构 # 然后从步骤 2 开始

清理环境

execute teardown.sql # 默认执行 DROP 模式(确认已取消 TRUNCATE 注释或不执行 TRUNCATE 部分)


相关文档

AI 函数

文档说明
AI 函数概述AI 函数整体介绍,模型选择、调用方式、计费说明
AI_CLASSIFY多标签分类函数,本方案用于快速分类灾害类型(GAS/FIRE/ROOF/FLOOD/EQUIPMENT/COMPOUND),成本低于 AI_COMPLETE
AI_COMPLETE通用 LLM 推理函数,本方案用于生成结构化处置方案(撤人/断电/应急指令),并支持 RAG 历史事故经验注入
AI_EMBEDDING文本向量化函数,本方案用于将事故报告和告警摘要转为语义向量,驱动向量检索和 RAG 增强
CREATE API CONNECTION创建 API Connection,统一管理 LLM 服务凭据,支持运行时切换模型无需修改 SQL

Dynamic Table

文档说明
动态表简介Dynamic Table 核心概念、增量刷新机制,以及与 Flink/Spark Streaming 的选型对比
动态表开发入门端到端建表、刷新、查看历史的完整示例
CREATE DYNAMIC TABLE建表语法参考,含
REFRESH INTERVAL
REFRESH INTERVAL
change_tracking
change_tracking
等参数说明
动态表刷新调度定时刷新配置,控制各聚合层(5 分钟)、健康评分层(30 分钟)、集团大盘(10 分钟)的刷新频率
使用 Studio 开发监控动态表通过 Studio 监控 Dynamic Table 刷新延迟和多层管道健康状态

索引

文档说明
索引概述各类索引(BloomFilter / 倒排 / 向量)适用场景和选型对比
布隆过滤器索引BloomFilter 工作原理,本方案用于 zone_id / event_level / sensor_type 高基数列等值查询加速
CREATE BLOOMFILTER INDEXBloomFilter 索引建立语法参考
倒排索引倒排索引概述,支持全文检索和 IK 中文分词
CREATE INVERTED INDEX全文索引建立语法,本方案用于事故报告、处置记录的中文关键词检索(支持矿山专业术语)
向量索引向量索引概述,支持 HNSW 近似最近邻检索和余弦相似度
CREATE VECTOR INDEX向量索引建立语法,本方案用于语义相似度检索,与 BoolFilter 协同缩小候选集
向量检索与 RAG 应用实战RAG 完整实战,含向量化、检索、Prompt 注入的端到端示例,与本方案 RAG 增强模式直接对应
基于 RRF 实现全文 + 向量混合检索最佳实践Hybrid Search 最佳实践,对应本方案全文召回 + 向量重排序加权融合模式
Lakehouse 索引最佳实践指南分区裁剪 + BloomFilter + 全文 + 向量索引组合使用策略

分区表

文档说明
分区与分桶分区表设计概念,含
PARTITIONED BY (DAYS(...))
PARTITIONED BY (DAYS(...))
时间分区用法
分区表使用指南分区表规范,含复合主键必须包含分区键的约束(本方案各表
collected_at
collected_at
/
event_at
event_at
在主键首位的依据)

窗口函数

文档说明
窗口函数概述窗口函数语法,含
OVER (PARTITION BY ... ORDER BY ...)
OVER (PARTITION BY ... ORDER BY ...)
用法
lag滞后函数,本方案人员行为检测中用于比对前后位置判断是否静止

JSON 与正则函数

文档说明
get_json_object从 JSON 字符串提取指定路径字段,本方案用于解析
AI_CLASSIFY
AI_CLASSIFY
AI_COMPLETE
AI_COMPLETE
返回的
$.label
$.label
$.advice
$.advice
等字段
regexp_extract正则提取函数,本方案用 DOTALL 模式
(?s)\{.*\}
(?s)\{.*\}
剥离 LLM 响应中可能的 markdown code fence

Studio 任务与调度

文档说明
Studio 任务开发与运维Studio Task 创建、部署、Cron 调度,本方案用于实时链路 1 分钟窗口触发和离线知识库每日批量更新
Studio 任务开发与运维实践Studio Task 最佳实践,含 DAG 依赖编排、运行监控告警配置
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询