-- 先建 raw 字符串接收表,Pipe 写入 JSON 字符串
CREATE TABLE IF NOT EXISTS iot_health.kafka_raw_events (value STRING);
-- 创建 Kafka PIPE
CREATE PIPE IF NOT EXISTS iot_health.pipe_sensor_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO iot_health.kafka_raw_events
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'iot_sensor_events', -- topic 名称
'',
'cz_iot_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/bronze_sensor_events.csv' TO USER VOLUME FILE 'bronze_sensor_events.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO iot_health.bronze_sensor_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('bronze_sensor_events.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
本文使用直接 INSERT 模拟 Kafka 消息已解析写入的效果:
INSERT INTO iot_health.bronze_sensor_events
(event_id, device_id, device_type, temperature, vibration,
pressure, humidity, fault_label, error_code, event_time)
VALUES
('EVT001','DEV001','pump', 72.3,3.2, 98.5,45.2,0,NULL, CAST('2026-06-01 08:00:00' AS TIMESTAMP)),
('EVT003','DEV003','compressor',91.5,4.1,88.3,38.7,1,'E001',CAST('2026-06-01 08:02:00' AS TIMESTAMP)),
('EVT005','DEV005','motor', 55.2,8.9,101.2,50.0,1,'E002', CAST('2026-06-01 08:04:00' AS TIMESTAMP)),
('EVT006','DEV006','valve', 63.4,1.5,130.0,41.5,1,'E003', CAST('2026-06-01 08:05:00' AS TIMESTAMP))
-- ... 完整 100 条,此处省略
;
验证 Bronze 层行数:
SELECT COUNT(*) AS bronze_row_count FROM iot_health.bronze_sensor_events;
bronze_row_count
----------------
100
设备主数据表与 Column Masking
建表
CREATE TABLE IF NOT EXISTS iot_health.device_master (
device_id STRING,
device_name STRING,
device_type STRING,
location_lat DOUBLE, -- 敏感字段:经度
location_lon DOUBLE, -- 敏感字段:纬度
install_date DATE,
manufacturer STRING,
model STRING,
status STRING
);
写入设备主数据
从本地 CSV 导入(推荐)
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/device_master.csv' TO USER VOLUME FILE 'device_master.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO iot_health.device_master
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('device_master.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO iot_health.device_master VALUES
('DEV001','Pump-Alpha-01', 'pump', 31.2304,121.4737,CAST('2022-03-15' AS DATE),'SiemensCN','P300','active'),
('DEV002','Motor-Beta-01', 'motor', 31.2310,121.4740,CAST('2022-04-20' AS DATE),'ABB', 'M500','active'),
('DEV003','Compressor-Gamma-01','compressor',31.2315,121.4745,CAST('2021-11-10' AS DATE),'Atlas', 'C200','active')
-- ... 完整 20 条,此处省略
;
-- 创建脱敏函数
CREATE OR REPLACE FUNCTION iot_health.mask_location_coord(coord DOUBLE)
RETURNS DOUBLE
AS CASE
WHEN current_user() IN ('privileged_user') THEN coord -- 替换为实际授权用户名
ELSE ROUND(coord, 1)
END;
-- 绑定到 location_lat
ALTER TABLE iot_health.device_master
CHANGE COLUMN location_lat
SET MASK iot_health.mask_location_coord;
-- 绑定到 location_lon
ALTER TABLE iot_health.device_master
CHANGE COLUMN location_lon
SET MASK iot_health.mask_location_coord;
CREATE OR REPLACE FUNCTION iot_health.calc_health_score(
temperature DOUBLE,
vibration DOUBLE,
pressure DOUBLE,
fault_label INT
)
RETURNS DOUBLE
AS GREATEST(0.0, LEAST(100.0,
100.0
- (temperature / 100.0 * 30.0)
- (vibration / 10.0 * 30.0)
- (pressure / 200.0 * 20.0)
- (fault_label * 20.0)
));
验证函数:
-- 正常设备:温度75,振动3.5,压力99,无故障
SELECT iot_health.calc_health_score(75.0, 3.5, 99.0, 0) AS sample_score;
sample_score
------------
57.1
💡 提示:健康分 57.1 落在 YELLOW 区间(60 分以下触发 RED 告警),说明温度和振动指标对分数影响显著,可按需调整权重系数。
Silver 层 Dynamic Table:清洗与异常打标
Silver 层在 Bronze 原始事件基础上做两件事:
LEFT JOIN
device_master
device_master
,为每条事件关联设备名称、厂商、安装日期等维度字段
打异常标记(
is_anomaly
is_anomaly
)和计算风险分(
risk_score
risk_score
),方便 Gold 层直接聚合
CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.silver_device_events
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
e.event_id,
e.device_id,
e.device_type,
e.temperature,
e.vibration,
e.pressure,
e.humidity,
e.fault_label,
e.error_code,
e.event_time,
e.ingest_time,
d.device_name,
d.manufacturer,
d.model,
d.install_date,
d.status AS device_status,
-- 异常标记:温度 > 90 或 振动 > 8 或 压力 > 120
CASE WHEN e.temperature > 90 OR e.vibration > 8 OR e.pressure > 120
THEN 1 ELSE 0 END AS is_anomaly,
-- 加权风险分(值越高越危险)
ROUND(e.temperature / 100.0 + e.vibration / 10.0 + e.pressure / 200.0, 4) AS risk_score
FROM iot_health.bronze_sensor_events e
LEFT JOIN iot_health.device_master d ON e.device_id = d.device_id;
异常判定阈值说明:
指标
阈值
依据
temperature
temperature
> 90 °C
设备过热临界点,持续超限会损坏绝缘材料
vibration
vibration
> 8 mm/s
ISO 10816 标准中 B 类机器的告警阈值
pressure
pressure
> 120 bar
典型工业管路设计压力上限
手动触发首次刷新:
REFRESH DYNAMIC TABLE iot_health.silver_device_events;
SELECT COUNT(*) AS silver_count FROM iot_health.silver_device_events;
silver_count
------------
100
Gold 层 Dynamic Table:设备级聚合与告警
Gold 层以
device_id
device_id
+ 小时窗口为粒度聚合 Silver 层数据,调用
calc_health_score
calc_health_score
UDF 计算健康分,并输出三档告警等级。
CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.gold_device_health
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
device_id,
device_name,
manufacturer,
model,
device_status,
DATE_TRUNC('hour', event_time) AS hour_window,
COUNT(*) AS event_count,
ROUND(AVG(temperature), 2) AS avg_temperature,
ROUND(AVG(vibration), 2) AS avg_vibration,
ROUND(AVG(pressure), 2) AS avg_pressure,
SUM(is_anomaly) AS anomaly_count,
ROUND(iot_health.calc_health_score(
AVG(temperature),
AVG(vibration),
AVG(pressure),
CAST(MAX(fault_label) AS INT)
), 2) AS health_score,
CASE
WHEN iot_health.calc_health_score(
AVG(temperature), AVG(vibration),
AVG(pressure), CAST(MAX(fault_label) AS INT)
) >= 80 THEN 'GREEN'
WHEN iot_health.calc_health_score(
AVG(temperature), AVG(vibration),
AVG(pressure), CAST(MAX(fault_label) AS INT)
) >= 60 THEN 'YELLOW'
ELSE 'RED'
END AS alert_level
FROM iot_health.silver_device_events
GROUP BY
device_id, device_name, manufacturer, model, device_status,
DATE_TRUNC('hour', event_time);
MAX(fault_label)
MAX(fault_label)
取窗口内最严重的故障状态(任意一条事件有故障则 MAX = 1),避免平均值掩盖瞬时故障。
手动触发首次刷新并查看结果:
REFRESH DYNAMIC TABLE iot_health.gold_device_health;
SELECT device_id, device_name, hour_window, avg_temperature,
avg_vibration, avg_pressure, anomaly_count, health_score, alert_level
FROM iot_health.gold_device_health
ORDER BY health_score ASC
LIMIT 10;