构建政务智慧城市数据平台

汇聚交通、城管、民政、公共卫生等多个政府部门的开放数据,构建支持城市运营 KPI 看板和跨部门联合分析的四层数仓。本文以 NYC Open Data 的 311 市民诉求工单、交通事故、卫生检查三类数据集为样本,端到端演示 COPY INTO + Volume → ODS → DWD → DWS → ADS 的完整构建过程,并覆盖 External Schema(接入已有 Hive 数据湖)、RBAC(多部门数据隔离)、Dynamic Table(跨部门数据融合)、Table Stream(工单状态变更捕获)、Column Masking(PII 字段脱敏)五项关键能力的落地用法。


概述

政务数据平台面临的核心挑战是:数据来源多、格式各异、权限边界复杂,同时需要在保护个人隐私的前提下支持跨部门联合分析。

问题解决方案
各部门定期上报 CSV 文件,需统一批量入仓COPY INTO + Volume,声明式加载,自动跳过重复文件
已有 Hive 数据湖存有历史数据,迁移成本高External Schema 挂载 Hive 元数据,直接跨源联合查询
各部门数据格式不统一,需标准化为城市主题事件Dynamic Table,CTE 化 UNION ALL,自动增量融合
某些字段(经纬度、姓名)是 PII,需动态脱敏Column Masking 绑定到列,脱敏对查询透明
部门分析师只能看本部门数据,管理员看全局RBAC,按角色授予 Schema 或表级 SELECT 权限
投诉工单状态变更需实时捕获,驱动处置统计Table Stream,捕获 UPDATE_BEFORE / UPDATE_AFTER

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 ODS 层各部门原始数据表普通表,COPY INTO 的目标
COPY INTO
COPY INTO
从 Volume 批量导入各部门 CSV 文件自动跳过已加载文件
CREATE EXTERNAL SCHEMA
CREATE EXTERNAL SCHEMA
挂载已有 Hive 数据湖无需迁移,直接联合查询
CREATE TABLE STREAM
CREATE TABLE STREAM
捕获投诉工单状态变更STANDARD 模式,支持 INSERT / UPDATE / DELETE
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 DWD / DWS / ADS 三层增量计算表声明式 SQL,自动依赖链刷新
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用
CREATE ROLE
CREATE ROLE
/
GRANT
GRANT
/
REVOKE
REVOKE
创建角色和授权管理多部门数据隔离
ALTER TABLE ... CHANGE COLUMN ... SET MASK
ALTER TABLE ... CHANGE COLUMN ... SET MASK
绑定 Column Masking 脱敏策略PII 字段动态脱敏

前置准备

本文所有示例在

best_practice_smart_city
best_practice_smart_city
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_smart_city;


ODS 层:各部门原始数据表

ODS 层按数据来源隔离存储。本文演示三类数据:NYC 311 诉求工单(市民服务热线)、交通事故(DOT/NYPD)、卫生检查(Dept of Health),均对应 NYC Open Data 的真实数据结构。

建表

CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_ods_311_complaints ( complaint_id STRING, created_date TIMESTAMP, closed_date TIMESTAMP, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, city STRING, borough STRING, latitude DOUBLE, longitude DOUBLE, status STRING, resolution_desc STRING, community_board STRING, bbl STRING, open_data_channel STRING, load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_ods_traffic_accidents ( accident_id STRING, crash_date DATE, crash_time STRING, borough STRING, zip_code STRING, latitude DOUBLE, longitude DOUBLE, on_street_name STRING, cross_street_name STRING, persons_injured INT, persons_killed INT, pedestrians_injured INT, pedestrians_killed INT, cyclists_injured INT, cyclists_killed INT, motorists_injured INT, motorists_killed INT, vehicle_type_1 STRING, vehicle_type_2 STRING, contributing_factor_1 STRING, contributing_factor_2 STRING, load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_ods_health_inspections ( inspection_id STRING, facility_name STRING, facility_type STRING, borough STRING, zip_code STRING, inspection_date DATE, inspection_type STRING, violation_code STRING, violation_desc STRING, grade STRING, grade_date DATE, score INT, latitude DOUBLE, longitude DOUBLE, load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_dim_district ( district_code STRING, district_name STRING, borough STRING, city STRING, population INT, area_sqkm DOUBLE, district_type STRING );

通过 COPY INTO + Volume 批量导入 CSV 文件

各部门定期将数据以 CSV 文件形式上报,存放在各自专属的对象存储目录下。你先创建指向部门上报目录的 Volume,再用 COPY INTO 批量导入。

第一步:创建 Storage Connection 和 Volume(以阿里云 OSS 为例):

-- 创建 Storage Connection CREATE STORAGE CONNECTION IF NOT EXISTS conn_city_data TYPE = OSS ACCESS_ID = '<your-access-id>' ACCESS_KEY = '<your-access-key>' ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'; -- 为 311 诉求数据创建 Volume CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_smart_city.vol_311_complaints TYPE = OSS BUCKET = '<your-bucket>' PATH = 'smart-city/311_complaints/' CONNECTION = conn_city_data;

第二步:执行 COPY INTO 加载:

COPY INTO best_practice_smart_city.doc_ods_311_complaints FROM VOLUME best_practice_smart_city.vol_311_complaints USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') ON_ERROR = CONTINUE;

COPY INTO 会自动记录已加载文件的元数据,再次执行同一 COPY 语句不会重复导入相同文件——这对部门定期上报场景非常有用:只需定期触发 COPY INTO,系统自动只处理新增文件。

查看加载历史:

SELECT * FROM load_history('doc_ods_311_complaints') LIMIT 5;

External Schema:连接已有 Hive 数据湖

如果部门已有 Hive 数据湖存有历史数据,External Schema 可以直接挂载,无需迁移。

-- 先创建指向 Hive Metastore 的 Connection CREATE CATALOG CONNECTION IF NOT EXISTS conn_hive_metastore CATALOG_TYPE = HIVE METASTORE_URI = 'thrift://<hive-metastore-host>:9083' HDFS_DEFAULT_FS = 'hdfs://<namenode>:8020'; -- 将 Hive 数据库挂载为 External Schema CREATE EXTERNAL SCHEMA IF NOT EXISTS hive_civil_affairs CONNECTION = conn_hive_metastore DATABASE = 'civil_affairs_db';

挂载后可直接查询 Hive 表:

-- 联合查询:Hive 历史数据 + 云器新增数据 SELECT ha.district_code, ha.population_2020, COUNT(c.complaint_id) AS recent_complaints FROM hive_civil_affairs.district_population ha LEFT JOIN best_practice_smart_city.doc_ods_311_complaints c ON ha.zip_code = c.incident_zip AND c.created_date >= CAST('2026-01-01' AS TIMESTAMP) GROUP BY ha.district_code, ha.population_2020;


Table Stream:捕获工单状态变更

311 投诉工单在处理过程中会经历多次状态变更(

Open
Open
In Progress
In Progress
Closed
Closed
),每次变更需要记录审计日志用于处置时效分析。

创建 Table Stream

在诉求工单表上创建 STANDARD 模式的 Stream,捕获所有 INSERT / UPDATE / DELETE 操作:

CREATE TABLE STREAM IF NOT EXISTS best_practice_smart_city.doc_stream_complaint_changes ON TABLE best_practice_smart_city.doc_ods_311_complaints WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

验证 Stream 创建成功:

SHOW STREAMS IN best_practice_smart_city;

create_time | schema_name | name | table_name | mode | comment -----------------------------+-----------------------------+--------------------------------+-------------------------------------------------------------------------+----------+-------- 2026-06-06T14:40:10.831 | best_practice_smart_city | doc_stream_complaint_changes | quick_start.best_practice_smart_city.doc_ods_311_complaints | STANDARD |

模拟工单状态变更

-- 工单 CMP002:Sewer Backup 诉求处置完毕 UPDATE best_practice_smart_city.doc_ods_311_complaints SET status = 'Closed', closed_date = CAST('2026-01-10 16:00:00' AS TIMESTAMP), resolution_desc = 'Issue resolved after second inspection.' WHERE complaint_id = 'CMP002'; -- 工单 CMP015:电梯故障修复完成 UPDATE best_practice_smart_city.doc_ods_311_complaints SET status = 'Closed', closed_date = CAST('2026-01-11 10:00:00' AS TIMESTAMP), resolution_desc = 'Elevator maintenance completed.' WHERE complaint_id = 'CMP015';

消费 Stream:写入工单审计日志

工单状态变更审计日志表:

CREATE TABLE IF NOT EXISTS best_practice_smart_city.doc_dwd_complaint_audit_log ( complaint_id STRING, change_type STRING, change_version BIGINT, change_time TIMESTAMP, new_status STRING, closed_date TIMESTAMP, resolution_desc STRING, agency STRING, borough STRING, complaint_type STRING );

查看 Stream 中的变更记录(消费前可以先 SELECT 预览):

SELECT __change_type, complaint_id, status, closed_date, __commit_timestamp FROM best_practice_smart_city.doc_stream_complaint_changes LIMIT 10;

__change_type | complaint_id | status | closed_date | __commit_timestamp --------------+--------------+--------+---------------------+-------------------- UPDATE_AFTER | CMP002 | Closed | 2026-01-10T16:00:00 | 2026-06-06T14:42:56.449 UPDATE_AFTER | CMP015 | Closed | 2026-01-11T10:00:00 | 2026-06-06T14:43:02.979 UPDATE_BEFORE | CMP002 | Open | null | 2026-06-06T14:37:45.066 UPDATE_BEFORE | CMP015 | In Progress | null | 2026-06-06T14:37:45.066

STANDARD 模式下,每次 UPDATE 会产生一条

UPDATE_BEFORE
UPDATE_BEFORE
(变更前)和一条
UPDATE_AFTER
UPDATE_AFTER
(变更后)。通过
INSERT INTO ... SELECT FROM stream
INSERT INTO ... SELECT FROM stream
消费变更并推进偏移量:

-- 只保留 UPDATE_AFTER,写入审计日志 INSERT INTO best_practice_smart_city.doc_dwd_complaint_audit_log SELECT complaint_id, __change_type AS change_type, __commit_version AS change_version, __commit_timestamp AS change_time, status AS new_status, closed_date, resolution_desc, agency, borough, complaint_type FROM best_practice_smart_city.doc_stream_complaint_changes WHERE __change_type = 'UPDATE_AFTER';

消费后验证偏移量已推进——Stream 变为空:

SELECT COUNT(*) AS remaining_changes FROM best_practice_smart_city.doc_stream_complaint_changes;

remaining_changes ----------------- 0

查看审计日志:

SELECT complaint_id, change_type, change_version, change_time, new_status, borough, complaint_type FROM best_practice_smart_city.doc_dwd_complaint_audit_log ORDER BY change_time;

complaint_id | change_type | change_version | change_time | new_status | borough | complaint_type -------------+--------------+----------------+-------------------------+------------+----------+--------------- CMP002 | UPDATE_AFTER | 3 | 2026-06-06T14:42:56.449 | Closed | BROOKLYN | Sewer CMP015 | UPDATE_AFTER | 4 | 2026-06-06T14:43:02.979 | Closed | QUEENS | ELEVATOR

change_version
change_version
单调递增,确保审计日志的时序完整性。此
INSERT INTO ... SELECT FROM stream
INSERT INTO ... SELECT FROM stream
语句在 Studio 中创建为定期任务(建议每小时调度一次)即可实现工单状态变更的持续追踪。


DWD 层:跨部门事件标准化

DWD 层将来自不同部门的原始事件统一为城市主题事件,解决各源表字段命名不一致、状态码含义各异的问题。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_dwd_city_events AS SELECT complaint_id AS event_id, 'COMPLAINT' AS event_category, complaint_type AS event_type, agency AS dept_code, agency_name AS dept_name, borough, incident_zip AS zip_code, latitude, longitude, created_date AS event_time, CASE WHEN status = 'Closed' THEN 'RESOLVED' WHEN status = 'Open' THEN 'OPEN' ELSE 'IN_PROGRESS' END AS event_status, DATEDIFF(CAST(closed_date AS DATE), CAST(created_date AS DATE)) AS resolution_days, CONCAT(borough, '-', incident_zip) AS geo_key FROM best_practice_smart_city.doc_ods_311_complaints WHERE created_date IS NOT NULL UNION ALL SELECT accident_id AS event_id, 'TRAFFIC_ACCIDENT' AS event_category, contributing_factor_1 AS event_type, 'DOT' AS dept_code, 'Department of Transportation' AS dept_name, borough, zip_code, latitude, longitude, CAST(crash_date AS TIMESTAMP) AS event_time, 'RESOLVED' AS event_status, 0 AS resolution_days, CONCAT(borough, '-', zip_code) AS geo_key FROM best_practice_smart_city.doc_ods_traffic_accidents WHERE crash_date IS NOT NULL;

手动触发首次刷新并验证:

REFRESH DYNAMIC TABLE best_practice_smart_city.doc_dwd_city_events; SELECT event_category, COUNT(*) AS cnt FROM best_practice_smart_city.doc_dwd_city_events GROUP BY event_category ORDER BY cnt DESC;

event_category | cnt -----------------+---- COMPLAINT | 20 TRAFFIC_ACCIDENT | 15

DWD 层合并了 311 诉求(20 条)和交通事故(15 条),统一为

event_id
event_id
event_category
event_category
geo_key
geo_key
等标准字段。后续 DWS / ADS 层只需查询这一张 Dynamic Table,无需关心各部门的原始字段差异。


DWS 层:街道/区级每日汇总

DWS 层以

borough
borough
(区)× 日期 × 事件类别为粒度做聚合,供区级管理人员查看每日事件量和处置效率。

建表

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_dws_borough_daily AS SELECT borough, DATE_TRUNC('day', event_time) AS stat_date, event_category, COUNT(*) AS event_count, SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END) AS resolved_count, SUM(CASE WHEN event_status = 'OPEN' THEN 1 ELSE 0 END) AS open_count, ROUND(AVG(CASE WHEN resolution_days IS NOT NULL AND resolution_days >= 0 THEN resolution_days END), 2) AS avg_resolution_days FROM best_practice_smart_city.doc_dwd_city_events WHERE event_time IS NOT NULL GROUP BY borough, DATE_TRUNC('day', event_time), event_category;

REFRESH DYNAMIC TABLE best_practice_smart_city.doc_dws_borough_daily; SELECT borough, stat_date, event_category, event_count, resolved_count, avg_resolution_days FROM best_practice_smart_city.doc_dws_borough_daily ORDER BY stat_date, borough, event_category LIMIT 10;

borough | stat_date | event_category | event_count | resolved_count | avg_resolution_days --------------+---------------------+-----------------+-------------+----------------+-------------------- BROOKLYN | 2026-01-03T00:00:00 | COMPLAINT | 1 | 0 | null BROOKLYN | 2026-01-03T00:00:00 | TRAFFIC_ACCIDENT| 1 | 1 | 0 MANHATTAN | 2026-01-03T00:00:00 | COMPLAINT | 2 | 2 | 1.5 MANHATTAN | 2026-01-03T00:00:00 | TRAFFIC_ACCIDENT| 1 | 1 | 0 BRONX | 2026-01-04T00:00:00 | COMPLAINT | 1 | 1 | 2 BRONX | 2026-01-04T00:00:00 | TRAFFIC_ACCIDENT| 1 | 1 | 0 QUEENS | 2026-01-04T00:00:00 | TRAFFIC_ACCIDENT| 1 | 1 | 0 STATEN ISLAND | 2026-01-04T00:00:00 | COMPLAINT | 1 | 0 | null BROOKLYN | 2026-01-05T00:00:00 | COMPLAINT | 1 | 1 | 1 MANHATTAN | 2026-01-05T00:00:00 | TRAFFIC_ACCIDENT| 1 | 1 | 0

avg_resolution_days
avg_resolution_days
为 null 表示该日投诉尚未关闭(
closed_date
closed_date
为空),
CASE WHEN resolution_days >= 0
CASE WHEN resolution_days >= 0
过滤了负数(数据异常)和 null 值,避免平均值被错误拉低。

各区诉求处置汇总(跨日聚合):

SELECT borough, event_category, SUM(event_count) AS total_events, ROUND(100.0 * SUM(resolved_count) / NULLIF(SUM(event_count), 0), 1) AS resolution_rate_pct, ROUND(AVG(avg_resolution_days), 2) AS avg_days_to_resolve FROM best_practice_smart_city.doc_dws_borough_daily GROUP BY borough, event_category ORDER BY total_events DESC;

borough | event_category | total_events | resolution_rate_pct | avg_days_to_resolve --------------+-----------------+--------------+---------------------+-------------------- MANHATTAN | COMPLAINT | 6 | 66.7 | 1.5 BROOKLYN | COMPLAINT | 4 | 50.0 | 1.5 BRONX | COMPLAINT | 4 | 100.0 | 2 QUEENS | COMPLAINT | 4 | 75.0 | 2 MANHATTAN | TRAFFIC_ACCIDENT| 4 | 100.0 | 0 BROOKLYN | TRAFFIC_ACCIDENT| 3 | 100.0 | 0 BRONX | TRAFFIC_ACCIDENT| 3 | 100.0 | 0 QUEENS | TRAFFIC_ACCIDENT| 3 | 100.0 | 0 STATEN ISLAND | COMPLAINT | 2 | 50.0 | 1 STATEN ISLAND | TRAFFIC_ACCIDENT| 2 | 100.0 | 0

结果解读:BRONX 区投诉关闭率最高(100%),但平均处置时长 2 天,高于 MANHATTAN 的 1.5 天——说明布朗克斯区的诉求都有处置结果,但响应速度偏慢。BROOKLYN 和 STATEN ISLAND 关闭率仅 50%,有积压工单,需要重点关注。交通事故类事件因记录时已完成处置,关闭率均为 100%。


ADS 层:城市运营指数与 KPI 看板

ADS 层汇总多类事件计算城市运营综合评分,直接供 BI 工具和管理驾驶舱消费。

建表

城市运营评分公式:

100 - (积压工单比例 × 惩罚系数 + 平均处置时长 × 时效系数)
100 - (积压工单比例 × 惩罚系数 + 平均处置时长 × 时效系数)
,范围 0–100 分,越高表示运营状态越好。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_smart_city.doc_ads_city_ops_index AS SELECT borough, DATE_TRUNC('day', event_time) AS stat_date, COUNT(*) AS total_events, SUM(CASE WHEN event_category = 'COMPLAINT' THEN 1 ELSE 0 END) AS complaint_count, SUM(CASE WHEN event_category = 'TRAFFIC_ACCIDENT' THEN 1 ELSE 0 END) AS accident_count, SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END) AS resolved_count, ROUND( 100.0 * SUM(CASE WHEN event_status = 'RESOLVED' THEN 1 ELSE 0 END) / NULLIF(COUNT(*), 0), 1 ) AS resolution_rate_pct, ROUND( AVG(CASE WHEN resolution_days IS NOT NULL AND resolution_days >= 0 THEN resolution_days END), 2 ) AS avg_resolution_days, -- 城市运营评分:积压事件和处置时长各有惩罚 ROUND( 100.0 - LEAST(100.0, SUM(CASE WHEN event_status = 'OPEN' THEN 1 ELSE 0 END) * 10.0 + COALESCE(AVG(CASE WHEN resolution_days IS NOT NULL AND resolution_days >= 0 THEN resolution_days END), 0) * 5.0 ), 1 ) AS city_ops_score FROM best_practice_smart_city.doc_dwd_city_events WHERE event_time IS NOT NULL GROUP BY borough, DATE_TRUNC('day', event_time);

REFRESH DYNAMIC TABLE best_practice_smart_city.doc_ads_city_ops_index; SELECT borough, ROUND(AVG(city_ops_score), 1) AS avg_ops_score, SUM(total_events) AS total_events, SUM(complaint_count) AS complaints, SUM(accident_count) AS accidents, ROUND(AVG(resolution_rate_pct), 1) AS resolution_rate_pct FROM best_practice_smart_city.doc_ads_city_ops_index GROUP BY borough ORDER BY avg_ops_score DESC;

borough | avg_ops_score | total_events | complaints | accidents | resolution_rate_pct --------------+---------------+--------------+------------+-----------+-------------------- QUEENS | 95.0 | 7 | 4 | 3 | 90.0 BRONX | 93.8 | 7 | 4 | 3 | 100.0 BROOKLYN | 79.2 | 7 | 4 | 3 | 75.0 MANHATTAN | 78.8 | 10 | 6 | 4 | 75.0 STATEN ISLAND | 73.8 | 4 | 2 | 2 | 75.0

结果解读

  • QUEENS(95 分)BRONX(93.8 分) 城市运营状态最优,BRONX 区关闭率达到 100% 是其高分的主要来源。
  • MANHATTAN(78.8 分) 事件量最大(10 条),但有 2 条积压诉求(未关闭),拉低了运营分。MANHATTAN 的绝对事件量高,对城市整体运营管理的压力最大。
  • STATEN ISLAND(73.8 分) 虽然事件量最少(4 条),但积压率为 50%,仍需重点跟进。

投诉工单部门处置效率对比(不同维度下的跨部门分析):

SELECT agency_name, COUNT(*) AS total_complaints, SUM(CASE WHEN status = 'Closed' THEN 1 ELSE 0 END) AS resolved, SUM(CASE WHEN status IN ('Open', 'In Progress') THEN 1 ELSE 0 END) AS pending, ROUND(100.0 * SUM(CASE WHEN status = 'Closed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS resolution_rate_pct FROM best_practice_smart_city.doc_ods_311_complaints GROUP BY agency_name ORDER BY total_complaints DESC;

agency_name | total_complaints | resolved | pending | resolution_rate_pct --------------------------------------------------+-----------------+----------+---------+-------------------- New York City Police Department | 4 | 3 | 1 | 75.0 Department of Transportation | 4 | 3 | 1 | 75.0 Department of Sanitation New York | 4 | 4 | 0 | 100.0 Department of Housing Preservation and Development| 4 | 4 | 0 | 100.0 Department of Environmental Protection | 4 | 2 | 2 | 50.0

卫生局(DEP)关闭率仅 50%,4 条诉求中有 2 条积压(水质、噪音类诉求处理周期较长),是当前最需要关注的部门。


Studio 定期刷新任务配置

Dynamic Table 的调度刷新通过在 Studio 中创建任务来管理,这样可以在同一任务上配置监控告警和数据质量规则。

三张 Dynamic Table 按依赖层次分别创建刷新任务:

  1. 在 Studio 开发 → 任务 中进入路径
    best_practices/smart_city/
    best_practices/smart_city/
  2. 为每张 Dynamic Table 创建"刷新动态表"任务:
任务名目标表调度周期依赖
refresh_dwd_city_events
refresh_dwd_city_events
doc_dwd_city_events
doc_dwd_city_events
每天 02:00无(直接读 ODS)
refresh_dws_borough_daily
refresh_dws_borough_daily
doc_dws_borough_daily
doc_dws_borough_daily
每天 03:00
refresh_dwd_city_events
refresh_dwd_city_events
完成后
refresh_ads_city_ops_index
refresh_ads_city_ops_index
doc_ads_city_ops_index
doc_ads_city_ops_index
每天 04:00
refresh_dws_borough_daily
refresh_dws_borough_daily
完成后
  1. refresh_ads_city_ops_index
    refresh_ads_city_ops_index
    任务附加数据质量规则:结果行数 > 0,
    avg_ops_score
    avg_ops_score
    在 0–100 之间

在任务页面同一位置还可以为 Table Stream 的消费任务(

INSERT INTO doc_dwd_complaint_audit_log SELECT FROM stream
INSERT INTO doc_dwd_complaint_audit_log SELECT FROM stream
)设置调度,建议每小时执行一次以保持审计日志的实时性。


RBAC:多部门数据隔离

政务数据平台通常需要严格的数据隔离:各部门分析师只能查看本部门的 ODS 数据,城市运营管理层可以查看 DWS/ADS 汇总,平台管理员拥有全层访问权限。

创建角色

-- 只读访问 ADS 层公开指数(供各部门查看城市整体运营情况) CREATE ROLE IF NOT EXISTS smart_city_viewer; -- 某部门专属分析师(本例为 NYPD 分析师) CREATE ROLE IF NOT EXISTS dept_nypd_analyst; -- 城市运营平台管理员 CREATE ROLE IF NOT EXISTS city_ops_admin;

按层次授权

-- smart_city_viewer:只读 ADS 和 DWS 汇总层(所有表) GRANT SELECT ON ALL TABLES IN SCHEMA best_practice_smart_city TO ROLE smart_city_viewer; -- city_ops_admin:全层访问 GRANT SELECT ON ALL TABLES IN SCHEMA best_practice_smart_city TO ROLE city_ops_admin;

查看角色权限:

SHOW GRANTS TO ROLE smart_city_viewer;

granted_type | privilege | granted_on | object_name | grantee_name -------------------+--------------+------------+---------------------------------------+------------------ OBJECT_HIERARCHY | SELECT TABLE | TABLE | quick_start.best_practice_smart_city.* | smart_city_viewer PRIVILEGE | READ METADATA| SCHEMA | quick_start.best_practice_smart_city | smart_city_viewer

OBJECT_HIERARCHY
OBJECT_HIERARCHY
类型的授权会自动包含 Schema 下所有现有和未来新建的表,无需每次建表后重新授权。

撤销权限

-- 离职人员或权限变更时执行 REVOKE SELECT ON ALL TABLES IN SCHEMA best_practice_smart_city FROM ROLE dept_nypd_analyst;


Column Masking:PII 字段动态脱敏

311 投诉工单和卫生检查数据中包含地理坐标(经纬度),部分场景下还包含投诉人姓名和联系方式等 PII(个人身份信息)字段。Column Masking 可以在列级别绑定脱敏函数,对非特权用户自动返回脱敏后的值。

第一步:创建脱敏函数(以经纬度降精度为例):

CREATE OR REPLACE FUNCTION best_practice_smart_city.mask_geo_coord(coord DOUBLE) RETURNS DOUBLE AS CASE WHEN CURRENT_USER() IN ('privileged_user') THEN coord -- 替换为实际授权用户名 ELSE ROUND(coord, 1) -- 非特权用户精度降低到小数点后 1 位(约 11km 范围) END;

第二步:将脱敏函数绑定到敏感列:

ALTER TABLE best_practice_smart_city.doc_ods_311_complaints CHANGE COLUMN latitude SET MASK best_practice_smart_city.mask_geo_coord; ALTER TABLE best_practice_smart_city.doc_ods_311_complaints CHANGE COLUMN longitude SET MASK best_practice_smart_city.mask_geo_coord;

绑定后,普通分析师查询时只能看到降精度后的坐标(如

40.7
40.7
而非
40.7484
40.7484
),而管理员账号可以看到原始精度。脱敏效果对 Dynamic Table(DWD/DWS/ADS 层)也透明生效——下游表存储的是已脱敏的值。


数仓对象总览

构建完成后,

best_practice_smart_city
best_practice_smart_city
Schema 下的对象:

SHOW TABLES IN best_practice_smart_city;

schema_name | table_name | is_dynamic --------------------------+----------------------------------+----------- best_practice_smart_city | doc_ads_city_ops_index | true best_practice_smart_city | doc_dim_district | false best_practice_smart_city | doc_dwd_city_events | true best_practice_smart_city | doc_dwd_complaint_audit_log | false best_practice_smart_city | doc_dws_borough_daily | true best_practice_smart_city | doc_ods_311_complaints | false best_practice_smart_city | doc_ods_health_inspections | false best_practice_smart_city | doc_ods_traffic_accidents | false

数仓分层结构:

各部门上报 CSV(OSS / COS / S3) │ ▼ COPY INTO + Volume(批量导入) doc_ods_311_complaints doc_ods_traffic_accidents doc_ods_health_inspections (市民诉求热线) (交通事故) (卫生检查) │ │ │ ├── Table Stream ──────────────────────────────────────────────┤ │ doc_stream_complaint_changes │ │ (STANDARD mode:UPDATE_BEFORE / UPDATE_AFTER) │ │ → INSERT INTO doc_dwd_complaint_audit_log(Studio 每小时) │ │ │ └──────────────────────────────┬───────────────────────────────┘ ▼ REFRESH 每天 02:00(Studio Task) doc_dwd_city_events(Dynamic Table) UNION ALL 标准化 · event_category · geo_key │ ▼ REFRESH 每天 03:00(Studio Task) doc_dws_borough_daily(Dynamic Table) borough × day × category · avg_resolution_days │ ▼ REFRESH 每天 04:00(Studio Task) doc_ads_city_ops_index(Dynamic Table) city_ops_score · resolution_rate_pct External Schema(Hive 数据湖) └── 直连查询,无需迁移,可与 ODS 表做跨源 JOIN

同时创建了 3 个角色(

smart_city_viewer
smart_city_viewer
dept_nypd_analyst
dept_nypd_analyst
city_ops_admin
city_ops_admin
)用于多部门数据隔离访问控制。


注意事项

  • COPY INTO 重复执行安全:COPY INTO 会自动记录已加载文件的元数据,相同文件不会被重复导入。但若文件内容变化而文件名不变(覆盖写),系统不会重新加载,需配合 Volume 目录的文件命名规范(如按日期分目录)使用。

  • Dynamic Table 不写 REFRESH INTERVAL:本文所有 Dynamic Table DDL 均不含

    REFRESH INTERVAL
    REFRESH INTERVAL
    参数,调度完全由 Studio 任务管理。这样可以在一处统一配置监控告警、数据质量检查,并实现任务间的依赖编排(如 DWD 刷新完成后才触发 DWS 刷新)。

  • Table Stream 偏移量推进机制:Stream 偏移量只在 DML 语句(

    INSERT INTO ... SELECT FROM stream
    INSERT INTO ... SELECT FROM stream
    )执行后才会推进。纯
    SELECT
    SELECT
    查询不消费偏移量,不会丢失变更。偏移量一旦推进则不可回退,确保 DML 语句的幂等性。

  • External Schema 只读:挂载 Hive 数据湖后只能读取,写入操作仍需在云器自身 Schema 中进行。External Schema 不支持 Dynamic Table 作为目标表,只能作为上游数据源使用。

  • RBAC 行级权限无 DDL:部门分析师只查看本部门数据的行级过滤规则,通过 Studio 数据安全 → 行级权限 的 UI 配置,没有对应 SQL DDL,不要在代码中写

    CREATE ROW ACCESS POLICY
    CREATE ROW ACCESS POLICY
    等语句(该语法不存在)。

  • Column Masking:脱敏效果对所有下游查询(包括 Dynamic Table 的 SELECT)透明生效,DWD/DWS/ADS 层存储的也是脱敏后的值;若需要高精度坐标做空间分析,使用有特权的管理员账号直接查询 ODS 原始表。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询