工业 IoT 设备健康监控数仓实践

将生产线传感器实时数据构建为多层数仓,输出设备健康评分和预测性维护告警。本文以 20 台工业设备、100 条传感器事件为数据集,端到端演示 Kafka PIPE → Bronze → Silver → Gold 的完整构建过程,并覆盖 Bloomfilter Index、Column Masking、SQL UDF 三项关键平台能力的落地用法。


概述

IoT 设备健康监控的典型数据链路是:传感器上报 → 实时接入 → 原始存储(Bronze)→ 清洗打标(Silver)→ 指标聚合与告警(Gold)

云器 Lakehouse 通过以下组合解决几个核心问题:

问题解决方案
传感器数据实时接入,毫秒级高频写入Kafka PIPE 持续摄取,无需手写消费者
Bronze → Silver → Gold 自动增量计算Dynamic Table,声明式 SQL,系统自动调度依赖链
设备位置坐标等敏感字段需脱敏Column Masking,绑定到列,对非特权用户透明
device_id 是高基数列,点查频繁Bloomfilter Index,按需快速过滤
异常检测评分逻辑可复用SQL UDF,封装加权健康分公式

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 Bronze 层原始事件表和设备主数据表普通表,作为 Dynamic Table 上游
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
device_id
device_id
列创建 Bloomfilter 索引
适合高基数列的点查过滤
CREATE PIPE
CREATE PIPE
创建 Kafka 持续摄取管道绑定到 Bronze 层目标表
CREATE FUNCTION
CREATE FUNCTION
创建 SQL UDF
calc_health_score
calc_health_score
封装健康评分加权公式
ALTER TABLE ... CHANGE COLUMN ... SET MASK
ALTER TABLE ... CHANGE COLUMN ... SET MASK
绑定 Column Masking 策略对经纬度敏感列脱敏
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 Silver / Gold 层增量计算表系统自动识别上游变更并增量刷新
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用

前置准备

本文所有示例在

iot_health
iot_health
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS iot_health;


Bronze 层:原始传感器事件表

建表

CREATE TABLE IF NOT EXISTS iot_health.bronze_sensor_events ( event_id STRING, device_id STRING, device_type STRING, temperature DOUBLE, vibration DOUBLE, pressure DOUBLE, humidity DOUBLE, fault_label INT, error_code STRING, event_time TIMESTAMP, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

ingest_time
ingest_time
使用
DEFAULT CURRENT_TIMESTAMP()
DEFAULT CURRENT_TIMESTAMP()
,Kafka PIPE 写入时自动填充,无需在消息体中携带。

创建 Bloomfilter Index

后续 Silver 和 Gold 层都会按

device_id
device_id
过滤,该列基数约为设备数量级(高基数),适合 Bloomfilter Index。

CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_device_id ON TABLE bronze_sensor_events (device_id);

配置 Kafka PIPE

Kafka PIPE 在 DDL 阶段就会尝试连接 Kafka broker 验证订阅关系。正式环境替换

KAFKA_BROKER
KAFKA_BROKER
地址和
TOPIC
TOPIC
名称后即可创建。

-- 先建 raw 字符串接收表,Pipe 写入 JSON 字符串 CREATE TABLE IF NOT EXISTS iot_health.kafka_raw_events (value STRING); -- 创建 Kafka PIPE CREATE PIPE IF NOT EXISTS iot_health.pipe_sensor_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO iot_health.kafka_raw_events FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- 替换为实际 broker 地址 'iot_sensor_events', -- topic 名称 '', 'cz_iot_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

PIPE 创建成功后默认处于运行状态,每隔

BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
秒批量消费一次。

写入模拟数据

从本地 CSV 导入(推荐)

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/your/bronze_sensor_events.csv' TO USER VOLUME FILE 'bronze_sensor_events.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO iot_health.bronze_sensor_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('bronze_sensor_events.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

本文使用直接 INSERT 模拟 Kafka 消息已解析写入的效果:

INSERT INTO iot_health.bronze_sensor_events (event_id, device_id, device_type, temperature, vibration, pressure, humidity, fault_label, error_code, event_time) VALUES ('EVT001','DEV001','pump', 72.3,3.2, 98.5,45.2,0,NULL, CAST('2026-06-01 08:00:00' AS TIMESTAMP)), ('EVT003','DEV003','compressor',91.5,4.1,88.3,38.7,1,'E001',CAST('2026-06-01 08:02:00' AS TIMESTAMP)), ('EVT005','DEV005','motor', 55.2,8.9,101.2,50.0,1,'E002', CAST('2026-06-01 08:04:00' AS TIMESTAMP)), ('EVT006','DEV006','valve', 63.4,1.5,130.0,41.5,1,'E003', CAST('2026-06-01 08:05:00' AS TIMESTAMP)) -- ... 完整 100 条,此处省略 ;

验证 Bronze 层行数:

SELECT COUNT(*) AS bronze_row_count FROM iot_health.bronze_sensor_events;

bronze_row_count ---------------- 100


设备主数据表与 Column Masking

建表

CREATE TABLE IF NOT EXISTS iot_health.device_master ( device_id STRING, device_name STRING, device_type STRING, location_lat DOUBLE, -- 敏感字段:经度 location_lon DOUBLE, -- 敏感字段:纬度 install_date DATE, manufacturer STRING, model STRING, status STRING );

写入设备主数据

从本地 CSV 导入(推荐)

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/your/device_master.csv' TO USER VOLUME FILE 'device_master.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO iot_health.device_master FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('device_master.csv');

也可直接内联插入小批量测试数据(不需要 CSV 文件):

INSERT INTO iot_health.device_master VALUES ('DEV001','Pump-Alpha-01', 'pump', 31.2304,121.4737,CAST('2022-03-15' AS DATE),'SiemensCN','P300','active'), ('DEV002','Motor-Beta-01', 'motor', 31.2310,121.4740,CAST('2022-04-20' AS DATE),'ABB', 'M500','active'), ('DEV003','Compressor-Gamma-01','compressor',31.2315,121.4745,CAST('2021-11-10' AS DATE),'Atlas', 'C200','active') -- ... 完整 20 条,此处省略 ;

创建脱敏函数并绑定到经纬度列

location_lat
location_lat
location_lon
location_lon
是设备安装位置,属于敏感数据。方案:授权账号(已在脱敏策略中列出的用户名)看原始精度,其他用户精度降低到小数点后 1 位。

-- 创建脱敏函数 CREATE OR REPLACE FUNCTION iot_health.mask_location_coord(coord DOUBLE) RETURNS DOUBLE AS CASE WHEN current_user() IN ('privileged_user') THEN coord -- 替换为实际授权用户名 ELSE ROUND(coord, 1) END; -- 绑定到 location_lat ALTER TABLE iot_health.device_master CHANGE COLUMN location_lat SET MASK iot_health.mask_location_coord; -- 绑定到 location_lon ALTER TABLE iot_health.device_master CHANGE COLUMN location_lon SET MASK iot_health.mask_location_coord;

验证绑定效果:

-- 管理员账号查询(看原始精度) SELECT device_id, location_lat, location_lon FROM iot_health.device_master LIMIT 3;

device_id | location_lat | location_lon ----------+--------------+------------- DEV001 | 31.2304 | 121.4737 DEV002 | 31.231 | 121.474 DEV003 | 31.2315 | 121.4745


健康评分 UDF

将异常检测评分逻辑封装为 SQL UDF,Silver 和 Gold 层均可复用。

评分公式:

100 - (temperature/100 × 30 + vibration/10 × 30 + pressure/200 × 20 + fault_label × 20)
100 - (temperature/100 × 30 + vibration/10 × 30 + pressure/200 × 20 + fault_label × 20)
,上限 100,下限 0。

CREATE OR REPLACE FUNCTION iot_health.calc_health_score( temperature DOUBLE, vibration DOUBLE, pressure DOUBLE, fault_label INT ) RETURNS DOUBLE AS GREATEST(0.0, LEAST(100.0, 100.0 - (temperature / 100.0 * 30.0) - (vibration / 10.0 * 30.0) - (pressure / 200.0 * 20.0) - (fault_label * 20.0) ));

验证函数:

-- 正常设备:温度75,振动3.5,压力99,无故障 SELECT iot_health.calc_health_score(75.0, 3.5, 99.0, 0) AS sample_score;

sample_score ------------ 57.1


Silver 层 Dynamic Table:清洗与异常打标

Silver 层在 Bronze 原始事件基础上做两件事:

  1. LEFT JOIN
    device_master
    device_master
    ,为每条事件关联设备名称、厂商、安装日期等维度字段
  2. 打异常标记(
    is_anomaly
    is_anomaly
    )和计算风险分(
    risk_score
    risk_score
    ),方便 Gold 层直接聚合

CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.silver_device_events REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT AS SELECT e.event_id, e.device_id, e.device_type, e.temperature, e.vibration, e.pressure, e.humidity, e.fault_label, e.error_code, e.event_time, e.ingest_time, d.device_name, d.manufacturer, d.model, d.install_date, d.status AS device_status, -- 异常标记:温度 > 90 或 振动 > 8 或 压力 > 120 CASE WHEN e.temperature > 90 OR e.vibration > 8 OR e.pressure > 120 THEN 1 ELSE 0 END AS is_anomaly, -- 加权风险分(值越高越危险) ROUND(e.temperature / 100.0 + e.vibration / 10.0 + e.pressure / 200.0, 4) AS risk_score FROM iot_health.bronze_sensor_events e LEFT JOIN iot_health.device_master d ON e.device_id = d.device_id;

异常判定阈值说明

指标阈值依据
temperature
temperature
> 90 °C设备过热临界点,持续超限会损坏绝缘材料
vibration
vibration
> 8 mm/sISO 10816 标准中 B 类机器的告警阈值
pressure
pressure
> 120 bar典型工业管路设计压力上限

手动触发首次刷新:

REFRESH DYNAMIC TABLE iot_health.silver_device_events; SELECT COUNT(*) AS silver_count FROM iot_health.silver_device_events;

silver_count ------------ 100


Gold 层 Dynamic Table:设备级聚合与告警

Gold 层以

device_id
device_id
+ 小时窗口为粒度聚合 Silver 层数据,调用
calc_health_score
calc_health_score
UDF 计算健康分,并输出三档告警等级。

CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.gold_device_health REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT AS SELECT device_id, device_name, manufacturer, model, device_status, DATE_TRUNC('hour', event_time) AS hour_window, COUNT(*) AS event_count, ROUND(AVG(temperature), 2) AS avg_temperature, ROUND(AVG(vibration), 2) AS avg_vibration, ROUND(AVG(pressure), 2) AS avg_pressure, SUM(is_anomaly) AS anomaly_count, ROUND(iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ), 2) AS health_score, CASE WHEN iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ) >= 80 THEN 'GREEN' WHEN iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ) >= 60 THEN 'YELLOW' ELSE 'RED' END AS alert_level FROM iot_health.silver_device_events GROUP BY device_id, device_name, manufacturer, model, device_status, DATE_TRUNC('hour', event_time);

MAX(fault_label)
MAX(fault_label)
取窗口内最严重的故障状态(任意一条事件有故障则 MAX = 1),避免平均值掩盖瞬时故障。

手动触发首次刷新并查看结果:

REFRESH DYNAMIC TABLE iot_health.gold_device_health; SELECT device_id, device_name, hour_window, avg_temperature, avg_vibration, avg_pressure, anomaly_count, health_score, alert_level FROM iot_health.gold_device_health ORDER BY health_score ASC LIMIT 10;

device_id | device_name | hour_window | avg_temperature | avg_vibration | avg_pressure | anomaly_count | health_score | alert_level ----------+---------------+---------------------+-----------------+---------------+--------------+---------------+--------------+------------ DEV012 | Motor-Beta-04 | 2026-06-01T08:00:00 | 88.7 | 7.5 | 108.0 | 0 | 20.09 | RED DEV012 | Motor-Beta-04 | 2026-06-01T09:00:00 | 87.9 | 7.2 | 109.3 | 0 | 21.10 | RED DEV012 | Motor-Beta-04 | 2026-06-01T10:00:00 | 89.1 | 7.0 | 110.5 | 0 | 21.22 | RED DEV012 | Motor-Beta-04 | 2026-06-01T11:00:00 | 90.2 | 6.8 | 111.7 | 1 | 21.37 | RED DEV019 | Motor-Beta-06 | 2026-06-01T12:00:00 | 60.8 | 10.4 | 90.4 | 1 | 21.52 | RED DEV012 | Motor-Beta-04 | 2026-06-01T12:00:00 | 91.4 | 6.5 | 112.8 | 1 | 21.80 | RED DEV019 | Motor-Beta-06 | 2026-06-01T11:00:00 | 59.9 | 10.1 | 89.7 | 1 | 22.76 | RED DEV019 | Motor-Beta-06 | 2026-06-01T10:00:00 | 59.0 | 9.8 | 89.0 | 1 | 24.00 | RED DEV015 | Motor-Beta-05 | 2026-06-01T12:00:00 | 96.8 | 5.6 | 92.3 | 1 | 24.93 | RED DEV019 | Motor-Beta-06 | 2026-06-01T09:00:00 | 58.1 | 9.5 | 88.3 | 1 | 25.24 | RED

结果解读

  • DEV012(Motor-Beta-04) 在所有观测小时内持续处于 RED 状态,根因是温度(88–91°C)和振动(6.5–7.5 mm/s)双指标偏高,虽然单指标未超阈值(温度阈值 90),但加权后健康分跌至 20–21 分。建议优先排查电机散热和轴承磨损。
  • DEV019(Motor-Beta-06) 振动值持续超过 8 mm/s(告警阈值),是 RED 告警的直接原因,同时
    fault_label=1
    fault_label=1
    拉低 20 分,应立即检查轴对中偏差。
  • 当前数据集中 83 个设备-小时记录为 RED,17 个为 YELLOW,无 GREEN——说明模拟数据集中包含较多高负载场景。

查看告警等级分布:

SELECT alert_level, COUNT(*) AS device_hour_count FROM iot_health.gold_device_health GROUP BY alert_level ORDER BY alert_level;

alert_level | device_hour_count ------------+------------------ RED | 83 YELLOW | 17


数仓对象总览

全部构建完成后,

iot_health
iot_health
Schema 下的对象:

SHOW TABLES IN iot_health;

schema_name | table_name | is_dynamic ------------+----------------------+----------- iot_health | bronze_sensor_events | false iot_health | device_master | false iot_health | kafka_raw_events | false iot_health | silver_device_events | true iot_health | gold_device_health | true

架构结构:

Kafka(实时) │ ▼ pipe_sensor_events(Kafka PIPE) kafka_raw_events bronze_sensor_events │ ← INSERT(模拟/生产写入) │ Bloomfilter Index(device_id) │ device_master(设备主数据) Column Masking(location_lat / location_lon) │ ▼ REFRESH INTERVAL 1 MINUTE silver_device_events(Dynamic Table) is_anomaly / risk_score / 维度关联 │ ▼ REFRESH INTERVAL 1 MINUTE gold_device_health(Dynamic Table) health_score(calc_health_score UDF) alert_level(GREEN / YELLOW / RED)


注意事项

  • Bloomfilter Index 对存量数据不自动生效

    CREATE BLOOMFILTER INDEX
    CREATE BLOOMFILTER INDEX
    只对创建后写入的新数据生效。若表中已有大量存量数据,Bloomfilter 的过滤加速效果有限,可通过执行
    BUILD INDEX
    BUILD INDEX
    重建覆盖存量(
    BLOOMFILTER
    BLOOMFILTER
    类型不支持
    BUILD INDEX
    BUILD INDEX
    ,如需存量覆盖需重建表)。

  • Dynamic Table 增量刷新依赖 Bronze 层变更追踪:第一次

    REFRESH
    REFRESH
    会做全量快照计算;后续增量刷新只处理 Bronze 层自上次刷新点以来新增或变更的行。如果 Bronze 层使用
    INSERT OVERWRITE
    INSERT OVERWRITE
    写入,会导致 Dynamic Table 退化为全量刷新。

  • calc_health_score
    calc_health_score
    MAX(fault_label)
    MAX(fault_label)
    的语义
    :Gold 层用
    MAX(fault_label)
    MAX(fault_label)
    而非
    AVG(fault_label)
    AVG(fault_label)
    ,目的是让一条故障事件能把整个小时窗口拉入 RED 状态。如果业务上需要"超过 50% 事件故障才告警",可改为
    CASE WHEN AVG(fault_label) > 0.5 THEN 1 ELSE 0 END
    CASE WHEN AVG(fault_label) > 0.5 THEN 1 ELSE 0 END

  • Column Masking 对 Dynamic Table 透明生效:Silver 层查询

    device_master
    device_master
    时,非特权用户得到的
    location_lat
    location_lat
    /
    location_lon
    location_lon
    是低精度脱敏值,Silver/Gold 中存储的也是脱敏后的值。若需要高精度坐标做空间分析,需在有权限的账号下单独查询
    device_master
    device_master


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询