CREATE TABLE IF NOT EXISTS best_practice_energy_ts.doc_kafka_raw_load (
kafka_value STRING
);
CREATE PIPE IF NOT EXISTS best_practice_energy_ts.pipe_energy_load
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_energy_ts.doc_kafka_raw_load
FROM (
SELECT CAST(value AS STRING) AS kafka_value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'energy.load.realtime', -- topic 名称
'',
'cz_energy_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/doc_pjme_load_raw.csv' TO USER VOLUME FILE 'doc_pjme_load_raw.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_energy_ts.doc_pjme_load_raw
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_pjme_load_raw.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_energy_ts.doc_pjme_load_raw (event_time, load_mw) VALUES
(CAST('2018-01-01 00:00:00' AS TIMESTAMP), 39928.0),
(CAST('2018-01-01 01:00:00' AS TIMESTAMP), 38925.0),
-- ... 共 96 条,覆盖 2018-01-01, 2018-01-15, 2018-07-01, 2018-07-15
(CAST('2018-07-15 23:00:00' AS TIMESTAMP), 37301.0);
验证数据加载结果:
SELECT COUNT(*) AS total_rows FROM best_practice_energy_ts.doc_pjme_load_raw;
total_rows
----------
96
写入电表元数据:
从本地 CSV 导入数据(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/doc_meter_metadata.csv' TO USER VOLUME FILE 'doc_meter_metadata.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_energy_ts.doc_meter_metadata
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_meter_metadata.csv');
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_energy_ts.doc_silver_hourly_load
AS
SELECT
DATE_TRUNC('hour', event_time) AS hour_ts,
DATE(event_time) AS load_date,
HOUR(event_time) AS load_hour,
ROUND(AVG(load_mw), 1) AS avg_load_mw,
ROUND(MAX(load_mw), 1) AS max_load_mw,
ROUND(MIN(load_mw), 1) AS min_load_mw,
COUNT(*) AS data_points,
CASE
WHEN HOUR(event_time) BETWEEN 9 AND 21 THEN 'peak'
ELSE 'valley'
END AS tariff_period
FROM best_practice_energy_ts.doc_pjme_load_raw
WHERE load_mw IS NOT NULL AND load_mw > 0
GROUP BY DATE_TRUNC('hour', event_time), DATE(event_time), HOUR(event_time);
⚠️ 注意:Dynamic Table 的 DDL 不写
REFRESH INTERVAL
REFRESH INTERVAL
,刷新调度通过 Studio Task 管理。参见下方"配置刷新调度任务"小节。
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_energy_ts.doc_silver_hourly_load;
SELECT COUNT(*) AS silver_count FROM best_practice_energy_ts.doc_silver_hourly_load;
SELECT
load_date,
load_hour,
avg_load_mw,
LAG(avg_load_mw, 1) OVER (PARTITION BY load_date ORDER BY load_hour) AS prev_hour_mw,
LEAD(avg_load_mw, 1) OVER (PARTITION BY load_date ORDER BY load_hour) AS next_hour_mw,
ROUND(
avg_load_mw
- LAG(avg_load_mw, 1) OVER (PARTITION BY load_date ORDER BY load_hour),
1
) AS hour_delta_mw
FROM best_practice_energy_ts.doc_silver_hourly_load
WHERE load_date = CAST('2018-07-01' AS DATE)
ORDER BY load_hour;
SELECT
load_date,
load_hour,
avg_load_mw,
ROUND(AVG(avg_load_mw) OVER (
PARTITION BY load_date
ORDER BY load_hour
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
), 1) AS rolling_3h_avg_mw
FROM best_practice_energy_ts.doc_silver_hourly_load
WHERE load_date = CAST('2018-07-01' AS DATE)
ORDER BY load_hour;
SELECT
load_date,
load_hour,
avg_load_mw,
tariff_period,
MAX(avg_load_mw) OVER (PARTITION BY load_date) AS daily_peak_mw,
MIN(avg_load_mw) OVER (PARTITION BY load_date) AS daily_valley_mw,
ROUND(100.0 * avg_load_mw / MAX(avg_load_mw) OVER (PARTITION BY load_date), 1) AS pct_of_peak
FROM best_practice_energy_ts.doc_silver_hourly_load
WHERE load_date = CAST('2018-07-01' AS DATE)
ORDER BY load_hour;
Gold 层以日为粒度,从 Silver 层聚合输出峰谷价差、峰谷比和分时段均值,供 BI 看板和定价系统消费。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_energy_ts.doc_gold_daily_load_profile
AS
SELECT
load_date,
COUNT(*) AS hours_recorded,
ROUND(AVG(avg_load_mw), 1) AS daily_avg_mw,
ROUND(MAX(max_load_mw), 1) AS daily_peak_mw,
ROUND(MIN(min_load_mw), 1) AS daily_valley_mw,
ROUND(MAX(max_load_mw) - MIN(min_load_mw), 1) AS peak_valley_spread_mw,
ROUND(
100.0 * (MAX(max_load_mw) - MIN(min_load_mw)) / MAX(max_load_mw), 1
) AS peak_valley_ratio_pct,
ROUND(
SUM(CASE WHEN tariff_period = 'peak' THEN avg_load_mw ELSE 0 END)
/ NULLIF(SUM(CASE WHEN tariff_period = 'peak' THEN 1 ELSE 0 END), 0),
1
) AS peak_period_avg_mw,
ROUND(
SUM(CASE WHEN tariff_period = 'valley' THEN avg_load_mw ELSE 0 END)
/ NULLIF(SUM(CASE WHEN tariff_period = 'valley' THEN 1 ELSE 0 END), 0),
1
) AS valley_period_avg_mw,
CASE
WHEN MONTH(load_date) IN (6,7,8) THEN 'summer'
WHEN MONTH(load_date) IN (12,1,2) THEN 'winter'
WHEN MONTH(load_date) IN (3,4,5) THEN 'spring'
ELSE 'autumn'
END AS season
FROM best_practice_energy_ts.doc_silver_hourly_load
GROUP BY load_date;
手动触发刷新并查看结果:
REFRESH DYNAMIC TABLE best_practice_energy_ts.doc_gold_daily_load_profile;
SELECT load_date, daily_avg_mw, daily_peak_mw, daily_valley_mw,
peak_valley_spread_mw, peak_valley_ratio_pct,
peak_period_avg_mw, valley_period_avg_mw, season
FROM best_practice_energy_ts.doc_gold_daily_load_profile
ORDER BY load_date;
SELECT
load_hour,
ROUND(AVG(CASE WHEN MONTH(load_date) = 7 THEN avg_load_mw END), 1) AS summer_avg_mw,
ROUND(AVG(CASE WHEN MONTH(load_date) = 1 THEN avg_load_mw END), 1) AS winter_avg_mw,
ROUND(
AVG(CASE WHEN MONTH(load_date) = 7 THEN avg_load_mw END)
- AVG(CASE WHEN MONTH(load_date) = 1 THEN avg_load_mw END),
1
) AS summer_vs_winter_delta
FROM best_practice_energy_ts.doc_silver_hourly_load
GROUP BY load_hour
ORDER BY load_hour;
WITH stats AS (
SELECT
load_date,
AVG(avg_load_mw) AS mean_mw,
STDDEV(avg_load_mw) AS std_mw
FROM best_practice_energy_ts.doc_silver_hourly_load
GROUP BY load_date
)
SELECT
h.load_date,
h.load_hour,
h.avg_load_mw,
ROUND((h.avg_load_mw - s.mean_mw) / NULLIF(s.std_mw, 0), 2) AS z_score,
CASE
WHEN ABS((h.avg_load_mw - s.mean_mw) / NULLIF(s.std_mw, 0)) > 2.0 THEN 'anomaly'
ELSE 'normal'
END AS anomaly_flag
FROM best_practice_energy_ts.doc_silver_hourly_load h
JOIN stats s ON h.load_date = s.load_date
WHERE h.load_date = CAST('2018-07-01' AS DATE)
ORDER BY h.load_hour;
前 10 行结果(节选):
load_date | load_hour | avg_load_mw | z_score | anomaly_flag
-----------+-----------+-------------+---------+-------------
2018-07-01 | 0 | 37751 | -0.32 | normal
2018-07-01 | 1 | 34716 | -0.66 | normal
2018-07-01 | 2 | 32345 | -0.93 | normal
2018-07-01 | 3 | 30546 | -1.14 | normal
2018-07-01 | 4 | 29300 | -1.28 | normal
2018-07-01 | 5 | 28511 | -1.37 | normal
2018-07-01 | 6 | 27992 | -1.43 | normal
2018-07-01 | 7 | 28211 | -1.4 | normal
2018-07-01 | 8 | 30337 | -1.16 | normal
2018-07-01 | 9 | 33759 | -0.77 | normal
日内标准差统计:
SELECT
load_date,
ROUND(AVG(avg_load_mw), 1) AS mean_mw,
ROUND(STDDEV(avg_load_mw), 1) AS stddev_mw,
ROUND(AVG(avg_load_mw) + 2 * STDDEV(avg_load_mw), 1) AS upper_2sigma,
ROUND(AVG(avg_load_mw) - 2 * STDDEV(avg_load_mw), 1) AS lower_2sigma
FROM best_practice_energy_ts.doc_silver_hourly_load
GROUP BY load_date
ORDER BY load_date;