CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_dim_district (
district_code STRING,
district_name STRING,
borough STRING,
city STRING,
population INT,
area_sqkm DOUBLE,
district_type STRING
);
通过 COPY INTO + Volume 批量导入 CSV 文件
各部门定期将数据以 CSV 文件形式上报,存放在各自专属的对象存储目录下。你先创建指向部门上报目录的 Volume,再用 COPY INTO 批量导入。
第一步:创建 Storage Connection 和 Volume(以阿里云 OSS 为例):
-- 创建 Storage Connection
CREATE STORAGE CONNECTION IF NOT EXISTS conn_city_data
TYPE = OSS
ACCESS_ID = '<your-access-id>'
ACCESS_KEY = '<your-access-key>'
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com';
-- 为 311 诉求数据创建 Volume
CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_smart_city.vol_311_complaints
TYPE = OSS
BUCKET = '<your-bucket>'
PATH = 'smart-city/311_complaints/'
CONNECTION = conn_city_data;
第二步:执行 COPY INTO 加载:
COPY INTO best_practice_smart_city.doc_ods_311_complaints
FROM VOLUME best_practice_smart_city.vol_311_complaints
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
ON_ERROR = CONTINUE;
⚠️ 注意:
ON_ERROR = CONTINUE
ON_ERROR = CONTINUE
跳过格式错误行继续加载;生产环境建议改为
ON_ERROR = ABORT
ON_ERROR = ABORT
并在加载后用
SELECT * FROM load_history('doc_ods_311_complaints')
SELECT * FROM load_history('doc_ods_311_complaints')
查看跳过的文件列表,防止静默丢数据。
COPY INTO 会自动记录已加载文件的元数据,再次执行同一 COPY 语句不会重复导入相同文件——这对部门定期上报场景非常有用:只需定期触发 COPY INTO,系统自动只处理新增文件。
查看加载历史:
SELECT * FROM load_history('doc_ods_311_complaints') LIMIT 5;
-- 联合查询:Hive 历史数据 + 云器新增数据
SELECT
ha.district_code,
ha.population_2020,
COUNT(c.complaint_id) AS recent_complaints
FROM hive_civil_affairs.district_population ha
LEFT JOIN best_practice_smart_city.doc_ods_311_complaints c
ON ha.zip_code = c.incident_zip
AND c.created_date >= CAST('2026-01-01' AS TIMESTAMP)
GROUP BY ha.district_code, ha.population_2020;
在诉求工单表上创建 STANDARD 模式的 Stream,捕获所有 INSERT / UPDATE / DELETE 操作:
CREATE TABLE STREAM IF NOT EXISTS best_practice_smart_city.doc_stream_complaint_changes
ON TABLE best_practice_smart_city.doc_ods_311_complaints
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 只保留 UPDATE_AFTER,写入审计日志
INSERT INTO best_practice_smart_city.doc_dwd_complaint_audit_log
SELECT
complaint_id,
__change_type AS change_type,
__commit_version AS change_version,
__commit_timestamp AS change_time,
status AS new_status,
closed_date,
resolution_desc,
agency,
borough,
complaint_type
FROM best_practice_smart_city.doc_stream_complaint_changes
WHERE __change_type = 'UPDATE_AFTER';
消费后验证偏移量已推进——Stream 变为空:
SELECT COUNT(*) AS remaining_changes
FROM best_practice_smart_city.doc_stream_complaint_changes;
remaining_changes
-----------------
0
查看审计日志:
SELECT complaint_id, change_type, change_version, change_time,
new_status, borough, complaint_type
FROM best_practice_smart_city.doc_dwd_complaint_audit_log
ORDER BY change_time;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_dwd_city_events
AS
SELECT
complaint_id AS event_id,
'COMPLAINT' AS event_category,
complaint_type AS event_type,
agency AS dept_code,
agency_name AS dept_name,
borough,
incident_zip AS zip_code,
latitude,
longitude,
created_date AS event_time,
CASE
WHEN status = 'Closed' THEN 'RESOLVED'
WHEN status = 'Open' THEN 'OPEN'
ELSE 'IN_PROGRESS'
END AS event_status,
DATEDIFF(CAST(closed_date AS DATE), CAST(created_date AS DATE)) AS resolution_days,
CONCAT(borough, '-', incident_zip) AS geo_key
FROM best_practice_smart_city.doc_ods_311_complaints
WHERE created_date IS NOT NULL
UNION ALL
SELECT
accident_id AS event_id,
'TRAFFIC_ACCIDENT' AS event_category,
contributing_factor_1 AS event_type,
'DOT' AS dept_code,
'Department of Transportation' AS dept_name,
borough,
zip_code,
latitude,
longitude,
CAST(crash_date AS TIMESTAMP) AS event_time,
'RESOLVED' AS event_status,
0 AS resolution_days,
CONCAT(borough, '-', zip_code) AS geo_key
FROM best_practice_smart_city.doc_ods_traffic_accidents
WHERE crash_date IS NOT NULL;
手动触发首次刷新并验证:
REFRESH DYNAMIC TABLE best_practice_smart_city.doc_dwd_city_events;
SELECT event_category, COUNT(*) AS cnt
FROM best_practice_smart_city.doc_dwd_city_events
GROUP BY event_category
ORDER BY cnt DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_dws_borough_daily
AS
SELECT
borough,
DATE_TRUNC('day', event_time) AS stat_date,
event_category,
COUNT(*) AS event_count,
SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END) AS resolved_count,
SUM(CASE WHEN event_status = 'OPEN' THEN 1 ELSE 0 END) AS open_count,
ROUND(AVG(CASE WHEN resolution_days IS NOT NULL
AND resolution_days >= 0 THEN resolution_days END), 2) AS avg_resolution_days
FROM best_practice_smart_city.doc_dwd_city_events
WHERE event_time IS NOT NULL
GROUP BY borough, DATE_TRUNC('day', event_time), event_category;
REFRESH DYNAMIC TABLE best_practice_smart_city.doc_dws_borough_daily;
SELECT borough, stat_date, event_category, event_count, resolved_count, avg_resolution_days
FROM best_practice_smart_city.doc_dws_borough_daily
ORDER BY stat_date, borough, event_category
LIMIT 10;
SELECT
borough,
event_category,
SUM(event_count) AS total_events,
ROUND(100.0 * SUM(resolved_count)
/ NULLIF(SUM(event_count), 0), 1) AS resolution_rate_pct,
ROUND(AVG(avg_resolution_days), 2) AS avg_days_to_resolve
FROM best_practice_smart_city.doc_dws_borough_daily
GROUP BY borough, event_category
ORDER BY total_events DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_ads_city_ops_index
AS
SELECT
borough,
DATE_TRUNC('day', event_time) AS stat_date,
COUNT(*) AS total_events,
SUM(CASE WHEN event_category = 'COMPLAINT' THEN 1 ELSE 0 END) AS complaint_count,
SUM(CASE WHEN event_category = 'TRAFFIC_ACCIDENT' THEN 1 ELSE 0 END) AS accident_count,
SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END) AS resolved_count,
ROUND(
100.0 * SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0), 1
) AS resolution_rate_pct,
ROUND(
AVG(CASE WHEN resolution_days IS NOT NULL
AND resolution_days >= 0 THEN resolution_days END), 2
) AS avg_resolution_days,
-- 城市运营评分:积压事件和处置时长各有惩罚
ROUND(
100.0 - LEAST(100.0,
SUM(CASE WHEN event_status = 'OPEN' THEN 1 ELSE 0 END) * 10.0
+ COALESCE(AVG(CASE WHEN resolution_days IS NOT NULL
AND resolution_days >= 0 THEN resolution_days END), 0) * 5.0
), 1
) AS city_ops_score
FROM best_practice_smart_city.doc_dwd_city_events
WHERE event_time IS NOT NULL
GROUP BY borough, DATE_TRUNC('day', event_time);
REFRESH DYNAMIC TABLE best_practice_smart_city.doc_ads_city_ops_index;
SELECT borough,
ROUND(AVG(city_ops_score), 1) AS avg_ops_score,
SUM(total_events) AS total_events,
SUM(complaint_count) AS complaints,
SUM(accident_count) AS accidents,
ROUND(AVG(resolution_rate_pct), 1) AS resolution_rate_pct
FROM best_practice_smart_city.doc_ads_city_ops_index
GROUP BY borough
ORDER BY avg_ops_score DESC;
SELECT
agency_name,
COUNT(*) AS total_complaints,
SUM(CASE WHEN status = 'Closed' THEN 1 ELSE 0 END) AS resolved,
SUM(CASE WHEN status IN ('Open', 'In Progress') THEN 1 ELSE 0 END) AS pending,
ROUND(100.0 * SUM(CASE WHEN status = 'Closed' THEN 1 ELSE 0 END)
/ COUNT(*), 1) AS resolution_rate_pct
FROM best_practice_smart_city.doc_ods_311_complaints
GROUP BY agency_name
ORDER BY total_complaints DESC;
agency_name | total_complaints | resolved | pending | resolution_rate_pct
--------------------------------------------------+-----------------+----------+---------+--------------------
New York City Police Department | 4 | 3 | 1 | 75.0
Department of Transportation | 4 | 3 | 1 | 75.0
Department of Sanitation New York | 4 | 4 | 0 | 100.0
Department of Housing Preservation and Development| 4 | 4 | 0 | 100.0
Department of Environmental Protection | 4 | 2 | 2 | 50.0
-- 只读访问 ADS 层公开指数(供各部门查看城市整体运营情况)
CREATE ROLE IF NOT EXISTS smart_city_viewer;
-- 某部门专属分析师(本例为 NYPD 分析师)
CREATE ROLE IF NOT EXISTS dept_nypd_analyst;
-- 城市运营平台管理员
CREATE ROLE IF NOT EXISTS city_ops_admin;
按层次授权
-- smart_city_viewer:只读 ADS 和 DWS 汇总层(所有表)
GRANT SELECT ON ALL TABLES IN SCHEMA best_practice_smart_city TO ROLE smart_city_viewer;
-- city_ops_admin:全层访问
GRANT SELECT ON ALL TABLES IN SCHEMA best_practice_smart_city TO ROLE city_ops_admin;
CREATE OR REPLACE FUNCTION best_practice_smart_city.mask_geo_coord(coord DOUBLE)
RETURNS DOUBLE
AS CASE
WHEN CURRENT_USER() IN ('privileged_user') THEN coord -- 替换为实际授权用户名
ELSE ROUND(coord, 1) -- 非特权用户精度降低到小数点后 1 位(约 11km 范围)
END;
💡 提示:将
'privileged_user'
'privileged_user'
替换为实际需要查看明文数据的用户名。Column Masking 通过
current_user()
current_user()
函数匹配当前连接的用户名,需将所有授权用户名显式列在
IN()
IN()
列表中。
第二步:将脱敏函数绑定到敏感列:
ALTER TABLE best_practice_smart_city.doc_ods_311_complaints
CHANGE COLUMN latitude
SET MASK best_practice_smart_city.mask_geo_coord;
ALTER TABLE best_practice_smart_city.doc_ods_311_complaints
CHANGE COLUMN longitude
SET MASK best_practice_smart_city.mask_geo_coord;