CREATE DYNAMIC TABLE equipment_metrics_1h
REFRESH INTERVAL 10 MINUTE
AS
SELECT
equipment_id, equipment_type,
MAX(record_time) AS last_record_time,
ROUND(AVG(temperature), 2) AS avg_temp,
ROUND(MAX(temperature), 2) AS max_temp,
ROUND(AVG(vibration), 2) AS avg_vib,
ROUND(MAX(vibration), 2) AS max_vib,
ROUND(AVG(pressure), 2) AS avg_pres,
ROUND(AVG(power_load), 2) AS avg_load,
COUNT(*) AS reading_cnt
FROM equipment_sensors
WHERE record_time >= CURRENT_TIMESTAMP() - INTERVAL 1 HOUR
GROUP BY equipment_id, equipment_type;
第二层:多维阈值过滤与风险分级(
equipment_anomaly_candidates
equipment_anomaly_candidates
)
两阶段过滤:先用宽松阈值预筛(
WHERE
WHERE
子句),再用严格阈值精确分级(
CASE WHEN
CASE WHEN
),大幅减少下游 AI 调用量。
CREATE DYNAMIC TABLE equipment_anomaly_candidates
REFRESH INTERVAL 10 MINUTE
AS
SELECT
equipment_id, equipment_type, last_record_time,
avg_temp, max_temp, avg_vib, max_vib, avg_pres, avg_load,
-- 拼接所有超阈值指标的描述字符串
CONCAT_WS(',',
CASE WHEN max_temp > 90 THEN CONCAT('高温:', CAST(max_temp AS STRING), '°C') END,
CASE WHEN max_vib > 8.0 THEN CONCAT('高振动:', CAST(max_vib AS STRING), 'mm/s') END,
CASE WHEN avg_pres > 12 THEN CONCAT('高压力:', CAST(avg_pres AS STRING), 'Bar') END,
CASE WHEN avg_load > 95 THEN CONCAT('高负载:', CAST(avg_load AS STRING), '%') END
) AS anomaly_metrics,
-- 风险分级:任一指标超高风险阈值 → 高;超中风险阈值 → 中
CASE
WHEN max_temp > 100 OR max_vib > 12 OR avg_load > 98 THEN '高'
WHEN max_temp > 90 OR max_vib > 8 OR avg_pres > 12 THEN '中'
ELSE '低'
END AS risk_level
FROM equipment_metrics_1h
WHERE max_temp > 85 OR max_vib > 6 OR avg_pres > 10 OR avg_load > 90; -- 宽松预筛
检测阈值参考:
指标
宽松预筛
中风险
高风险
温度
> 85°C
> 90°C
> 100°C
振动
> 6 mm/s
> 8 mm/s
> 12 mm/s
功率负载
> 90%
> 95%
> 98%
压力
> 10 Bar
> 12 Bar
—
第三层:AI 生成维护建议,写入告警表
只对
risk_level IN ('高', '中')
risk_level IN ('高', '中')
的设备触发
AI_COMPLETE
AI_COMPLETE
,将设备类型、异常指标、当前均值作为上下文传入大模型,解析 JSON 响应中的
advice
advice
和
predicted_failure_hours
predicted_failure_hours
字段后直接写入告警表。
INSERT INTO equipment_alerts
SELECT
CONCAT(equipment_id, '_', DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyyMMddHHmm')) AS alert_id,
equipment_id, equipment_type,
CURRENT_TIMESTAMP() AS alert_time,
anomaly_metrics, risk_level,
-- AI 调用:提取维护建议文本
-- REGEXP_EXTRACT 处理 LLM 可能返回的 markdown code fence 包裹,提取纯 JSON 对象
GET_JSON_OBJECT(
REGEXP_EXTRACT(
AI_COMPLETE(
'conn_dashscope:deepseek-v3',
'你是工厂设备维护工程师,根据以下传感器异常数据给出简洁维护建议,'
|| '返回 JSON:{"advice":"具体维护操作,不超过50字","predicted_failure_hours":数字}'
|| '设备类型:' || COALESCE(equipment_type, '')
|| ',异常指标:' || anomaly_metrics
|| ',当前均温:' || CAST(avg_temp AS STRING) || '°C'
|| ',最大振动:' || CAST(max_vib AS STRING) || 'mm/s'
), '(?s)\{.*\}', 0
), '$.advice'
) AS maintenance_advice,
-- AI 调用:提取预计故障小时数
CAST(GET_JSON_OBJECT(
REGEXP_EXTRACT(
AI_COMPLETE(
'conn_dashscope:deepseek-v3',
'你是工厂设备维护工程师,根据以下传感器异常数据给出简洁维护建议,'
|| '返回 JSON:{"advice":"具体维护操作,不超过50字","predicted_failure_hours":数字}'
|| '设备类型:' || COALESCE(equipment_type, '')
|| ',异常指标:' || anomaly_metrics
|| ',当前均温:' || CAST(avg_temp AS STRING) || '°C'
|| ',最大振动:' || CAST(max_vib AS STRING) || 'mm/s'
), '(?s)\{.*\}', 0
), '$.predicted_failure_hours'
) AS INT) AS predicted_failure_hours
FROM equipment_anomaly_candidates
WHERE risk_level IN ('高', '中');
-- 场景1:查某台设备过去30天的所有传感器原始数据(故障回溯)
SELECT * FROM equipment_sensors
WHERE equipment_id = 'EQ_CNC_01'
AND record_time >= CURRENT_TIMESTAMP() - INTERVAL 30 DAY;
-- 场景2:批量拉取多台设备的最新数据(Dashboard 刷新)
SELECT * FROM equipment_sensors
WHERE equipment_id IN ('EQ_CNC_01', 'EQ_INJ_02', 'EQ_CONV_01')
AND record_time >= CURRENT_TIMESTAMP() - INTERVAL 1 HOUR;
-- 场景3:按告警 ID 查历史告警详情
SELECT * FROM equipment_alerts
WHERE equipment_id = 'EQ_CNC_01'
AND alert_time >= '2026-01-01';
-- ClickZetta 实际语法(经 alicloud 验证):CREATE BLOOMFILTER INDEX
-- 注意:不支持 PROPERTIES('fpp'=...) 参数,使用平台默认误判率
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_equipment_id ON TABLE equipment_sensors(equipment_id);
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_sensor_id ON TABLE equipment_sensors(sensor_id);
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_alert_equipment_id ON TABLE equipment_alerts(equipment_id);
使用场景与适用边界
查询模式
是否适合 BoolFilter
说明
WHERE equipment_id = 'EQ_CNC_01'
WHERE equipment_id = 'EQ_CNC_01'
是
等值查询,最优场景
WHERE equipment_id IN (...)
WHERE equipment_id IN (...)
是
多值等值,逐个探测
WHERE equipment_id LIKE 'EQ_%'
WHERE equipment_id LIKE 'EQ_%'
否
模糊查询无法利用
WHERE temperature > 90
WHERE temperature > 90
否
范围查询需用 Min/Max 统计,不用 BloomFilter
WHERE risk_level = '高'
WHERE risk_level = '高'
否
低基数列(只有高/中/低),BloomFilter 收益极低
BoolFilter 专为高基数列的等值查询设计。低基数列(如
risk_level
risk_level
、
equipment_type
equipment_type
)用 BoolFilter 反而浪费空间,不建议添加。范围查询和 LIKE 查询应分别依赖分区裁剪和 Inverted Index。
IoT 平台 + 时序数据库 + Flink 集群 + ML 平台 + AI 推理服务 + BI 工具
3–6 个月
高(6+ 个系统)
本方案
ClickZetta Lakehouse(一个平台)
数天
低(纯 SQL)
六、快速上手
前置依赖:在 ClickZetta Studio 中创建名为
conn_dashscope
conn_dashscope
的 API Connection:
-- 使用 ClickZetta CREATE API CONNECTION 语法(实际验证通过)
CREATE API CONNECTION IF NOT EXISTS conn_dashscope
TYPE ai_function
PROPERTIES (
'BASE_URL' = 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'API_KEY' = '<your-dashscope-api-key>'
);
-- 查看最新告警,确认 AI 建议和预计故障时间已生成
SELECT equipment_id, equipment_type, risk_level, anomaly_metrics,
maintenance_advice, predicted_failure_hours
FROM equipment_alerts
ORDER BY alert_time DESC
LIMIT 10;
清理环境(可选):
-- teardown.sql:按依赖顺序删除所有表
DROP TABLE IF EXISTS equipment_anomaly_candidates;
DROP TABLE IF EXISTS equipment_metrics_1h;
DROP TABLE IF EXISTS equipment_alerts;
DROP TABLE IF EXISTS equipment_sensors;