-- FHIR Patient resource
CREATE TABLE IF NOT EXISTS best_practice_fhir_clinical.doc_fhir_patient (
patient_id STRING,
resource_json STRING
);
-- FHIR Encounter resource(就诊事件)
CREATE TABLE IF NOT EXISTS best_practice_fhir_clinical.doc_fhir_encounter (
encounter_id STRING,
patient_id STRING,
resource_json STRING
);
-- FHIR Observation resource(检验/生命体征)
CREATE TABLE IF NOT EXISTS best_practice_fhir_clinical.doc_fhir_observation (
obs_id STRING,
patient_id STRING,
resource_json STRING
);
-- FHIR MedicationRequest resource(用药医嘱)
CREATE TABLE IF NOT EXISTS best_practice_fhir_clinical.doc_fhir_medication_request (
req_id STRING,
patient_id STRING,
resource_json STRING
);
写入模拟数据
生产环境通过 COPY INTO + Volume 批量导入 FHIR JSON 文件。
从本地 CSV 导入数据(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_fhir_clinical.doc_fhir_patient
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
SELECT COUNT(*) AS patient_count FROM best_practice_fhir_clinical.doc_fhir_patient;
SELECT COUNT(*) AS encounter_count FROM best_practice_fhir_clinical.doc_fhir_encounter;
SELECT COUNT(*) AS obs_count FROM best_practice_fhir_clinical.doc_fhir_observation;
SELECT COUNT(*) AS med_count FROM best_practice_fhir_clinical.doc_fhir_medication_request;
SELECT
patient_id,
get_json_object(resource_json, '$.name[0].family') AS family_name,
get_json_object(resource_json, '$.name[0].given[0]') AS given_name,
get_json_object(resource_json, '$.gender') AS gender,
get_json_object(resource_json, '$.birthDate') AS birth_date,
get_json_object(resource_json, '$.address[0].city') AS city
FROM best_practice_fhir_clinical.doc_fhir_patient;
patient_id | family_name | given_name | gender | birth_date | city
-----------+-------------+------------+--------+------------+-----------
P001 | Zhang | Wei | male | 1980-05-15 | Shanghai
P002 | Li | Fang | female | 1972-11-23 | Beijing
P003 | Wang | Jun | male | 1955-03-08 | Guangzhou
P004 | Chen | Mei | female | 1990-07-30 | Shenzhen
P005 | Liu | Yang | male | 1968-01-12 | Chengdu
验证 Encounter resource 中 ICD-10 码和就诊时长提取:
SELECT
encounter_id,
patient_id,
get_json_object(resource_json, '$.status') AS enc_status,
get_json_object(resource_json, '$.class.code') AS enc_class,
get_json_object(resource_json, '$.period.start') AS period_start,
get_json_object(resource_json, '$.period.end') AS period_end,
get_json_object(resource_json, '$.reasonCode[0].coding[0].code') AS icd_code,
get_json_object(resource_json, '$.reasonCode[0].coding[0].display') AS diagnosis,
get_json_object(resource_json, '$.serviceProvider.display') AS department
FROM best_practice_fhir_clinical.doc_fhir_encounter;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dwd_patient_dim
AS
SELECT
p.patient_id,
get_json_object(p.resource_json, '$.name[0].family') AS family_name,
get_json_object(p.resource_json, '$.name[0].given[0]') AS given_name,
get_json_object(p.resource_json, '$.gender') AS gender,
get_json_object(p.resource_json, '$.birthDate') AS birth_date,
get_json_object(p.resource_json, '$.address[0].city') AS city,
DATEDIFF(CURRENT_DATE(),
CAST(get_json_object(p.resource_json, '$.birthDate') AS DATE)) / 365 AS age_years
FROM best_practice_fhir_clinical.doc_fhir_patient p;
dwd_encounter_fact
dwd_encounter_fact
:就诊事实表,提取 ICD-10 码、住院天数、科室,并对 ICD 章节分类:
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dwd_encounter_fact
AS
SELECT
e.encounter_id,
e.patient_id,
get_json_object(e.resource_json, '$.status') AS enc_status,
get_json_object(e.resource_json, '$.class.code') AS enc_class,
CAST(get_json_object(e.resource_json, '$.period.start') AS TIMESTAMP) AS admit_time,
CAST(get_json_object(e.resource_json, '$.period.end') AS TIMESTAMP) AS discharge_time,
DATEDIFF(
CAST(get_json_object(e.resource_json, '$.period.end') AS TIMESTAMP),
CAST(get_json_object(e.resource_json, '$.period.start') AS TIMESTAMP)
) AS los_days,
get_json_object(e.resource_json, '$.reasonCode[0].coding[0].code') AS icd_code,
get_json_object(e.resource_json, '$.reasonCode[0].coding[0].display') AS primary_diagnosis,
get_json_object(e.resource_json, '$.serviceProvider.display') AS department,
CASE
WHEN UPPER(get_json_object(e.resource_json, '$.reasonCode[0].coding[0].code')) LIKE 'I%' THEN 'Cardiology'
WHEN UPPER(get_json_object(e.resource_json, '$.reasonCode[0].coding[0].code')) LIKE 'E%' THEN 'Endocrinology'
WHEN UPPER(get_json_object(e.resource_json, '$.reasonCode[0].coding[0].code')) LIKE 'J%' THEN 'Pulmonology'
WHEN UPPER(get_json_object(e.resource_json, '$.reasonCode[0].coding[0].code')) LIKE 'N%' THEN 'Nephrology'
ELSE 'Other'
END AS icd_chapter
FROM best_practice_fhir_clinical.doc_fhir_encounter e;
dwd_observation_fact
dwd_observation_fact
:检验/生命体征事实表,提取 LOINC 码、检验值和单位:
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dwd_observation_fact
AS
SELECT
o.obs_id,
o.patient_id,
get_json_object(o.resource_json, '$.category[0].coding[0].code') AS obs_category,
get_json_object(o.resource_json, '$.code.coding[0].code') AS loinc_code,
get_json_object(o.resource_json, '$.code.coding[0].display') AS obs_name,
CAST(get_json_object(o.resource_json, '$.valueQuantity.value') AS DOUBLE) AS obs_value,
get_json_object(o.resource_json, '$.valueQuantity.unit') AS obs_unit,
CAST(get_json_object(o.resource_json, '$.effectiveDateTime') AS TIMESTAMP) AS obs_time
FROM best_practice_fhir_clinical.doc_fhir_observation o;
dwd_medication_fact
dwd_medication_fact
:用药医嘱事实表,提取 RxNorm 码、药名、剂量和给药途径:
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dwd_medication_fact
AS
SELECT
m.req_id,
m.patient_id,
get_json_object(m.resource_json, '$.status') AS req_status,
get_json_object(m.resource_json, '$.medicationCodeableConcept.coding[0].code') AS rxnorm_code,
get_json_object(m.resource_json, '$.medicationCodeableConcept.coding[0].display') AS medication_name,
CAST(get_json_object(m.resource_json, '$.dosageInstruction[0].doseAndRate[0].doseQuantity.value') AS DOUBLE) AS dose_value,
get_json_object(m.resource_json, '$.dosageInstruction[0].doseAndRate[0].doseQuantity.unit') AS dose_unit,
get_json_object(m.resource_json, '$.dosageInstruction[0].route.coding[0].code') AS route,
CAST(get_json_object(m.resource_json, '$.authoredOn') AS TIMESTAMP) AS authored_on,
REPLACE(get_json_object(m.resource_json, '$.encounter.reference'), 'Encounter/', '') AS encounter_id
FROM best_practice_fhir_clinical.doc_fhir_medication_request m;
⚠️ 注意:Dynamic Table 的 DDL 不写
REFRESH INTERVAL
REFRESH INTERVAL
。定期刷新通过 Studio Task 管理(见"调度配置"章节),便于在同一任务上附加告警和数据质控规则。
手动触发首次刷新,验证数据:
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dwd_patient_dim;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dwd_encounter_fact;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dwd_observation_fact;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dwd_medication_fact;
SELECT patient_id, family_name, gender, birth_date, city, ROUND(age_years, 1) AS age
FROM best_practice_fhir_clinical.dwd_patient_dim
ORDER BY patient_id;
patient_id | family_name | gender | birth_date | city | age
-----------+-------------+--------+------------+-----------+-----
P001 | Zhang | male | 1980-05-15 | Shanghai | 46.1
P002 | Li | female | 1972-11-23 | Beijing | 53.6
P003 | Wang | male | 1955-03-08 | Guangzhou | 71.3
P004 | Chen | female | 1990-07-30 | Shenzhen | 35.9
P005 | Liu | male | 1968-01-12 | Chengdu | 58.4
就诊事实表验证(关注 los_days 计算和 icd_chapter 分类):
SELECT encounter_id, patient_id, enc_class, icd_code, primary_diagnosis,
department, los_days, icd_chapter
FROM best_practice_fhir_clinical.dwd_encounter_fact
ORDER BY encounter_id;
对 ICD-10 首字母分类,E005(脑梗 I63.9)按 I 系列归入 Cardiology 章节,这是 ICD-10 标准分类方式。若业务上需要按神经系统单独统计,可在 CASE 中增加
I6%
I6%
特殊处理。
检验事实表(含 LOINC 码和量化值):
SELECT obs_id, patient_id, obs_category, loinc_code, obs_name, obs_value, obs_unit
FROM best_practice_fhir_clinical.dwd_observation_fact
ORDER BY obs_id;
-- 创建脱敏函数:管理员看原始姓名,其他用户看掩码
CREATE OR REPLACE FUNCTION best_practice_fhir_clinical.mask_phi_name(name STRING)
RETURNS STRING
AS CASE
WHEN current_user() IN ('privileged_user') THEN name -- 替换为实际获授权的用户名
ELSE CONCAT(LEFT(name, 1), REPEAT('*', LENGTH(name) - 1))
END;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dws_department_cost
AS
SELECT
e.department,
e.icd_chapter,
COUNT(DISTINCT e.encounter_id) AS total_encounters,
COUNT(DISTINCT e.patient_id) AS total_patients,
SUM(CASE WHEN e.enc_class = 'IMP' THEN 1 ELSE 0 END) AS inpatient_count,
SUM(CASE WHEN e.enc_class = 'AMB' THEN 1 ELSE 0 END) AS outpatient_count,
SUM(CASE WHEN e.enc_class = 'EMER' THEN 1 ELSE 0 END) AS emergency_count,
ROUND(AVG(CAST(e.los_days AS DOUBLE)), 2) AS avg_los_days,
MAX(e.los_days) AS max_los_days,
COUNT(DISTINCT m.req_id) AS total_prescriptions,
COUNT(DISTINCT m.rxnorm_code) AS distinct_medications
FROM best_practice_fhir_clinical.dwd_encounter_fact e
LEFT JOIN best_practice_fhir_clinical.dwd_medication_fact m ON e.encounter_id = m.encounter_id
GROUP BY e.department, e.icd_chapter;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dws_department_cost;
SELECT department, icd_chapter, total_encounters, avg_los_days,
total_prescriptions, distinct_medications
FROM best_practice_fhir_clinical.dws_department_cost
ORDER BY total_encounters DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.dws_patient_obs_summary
AS
SELECT
o.patient_id,
o.loinc_code,
o.obs_name,
o.obs_unit,
COUNT(*) AS obs_count,
ROUND(MIN(o.obs_value), 2) AS min_value,
ROUND(MAX(o.obs_value), 2) AS max_value,
ROUND(AVG(o.obs_value), 2) AS avg_value,
MIN(o.obs_time) AS first_obs_time,
MAX(o.obs_time) AS last_obs_time
FROM best_practice_fhir_clinical.dwd_observation_fact o
GROUP BY o.patient_id, o.loinc_code, o.obs_name, o.obs_unit;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.dws_patient_obs_summary;
SELECT patient_id, loinc_code, obs_name, obs_unit, obs_count, min_value, max_value, avg_value
FROM best_practice_fhir_clinical.dws_patient_obs_summary
ORDER BY patient_id, loinc_code;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fhir_clinical.ads_clinical_quality_metrics
AS
SELECT
p.patient_id,
p.family_name,
p.gender,
FLOOR(p.age_years) AS age,
e.encounter_id,
e.department,
e.icd_chapter,
e.icd_code,
e.primary_diagnosis,
e.enc_class,
e.los_days,
e.enc_status,
COUNT(DISTINCT m.req_id) AS prescription_count,
COUNT(DISTINCT o.obs_id) AS lab_count,
MAX(CASE WHEN o.loinc_code = '4548-4' THEN 1 ELSE 0 END) AS hba1c_tested,
MAX(CASE WHEN o.loinc_code = '2160-0' THEN 1 ELSE 0 END) AS creatinine_tested,
MAX(CASE WHEN o.loinc_code = '59408-5' THEN 1 ELSE 0 END) AS spo2_monitored,
MAX(CASE WHEN o.loinc_code = '4548-4' THEN o.obs_value END) AS hba1c_value,
MAX(CASE WHEN o.loinc_code = '2160-0' THEN o.obs_value END) AS creatinine_value,
MAX(CASE WHEN o.loinc_code = '59408-5' THEN o.obs_value END) AS spo2_value
FROM best_practice_fhir_clinical.dwd_patient_dim p
JOIN best_practice_fhir_clinical.dwd_encounter_fact e ON p.patient_id = e.patient_id
LEFT JOIN best_practice_fhir_clinical.dwd_medication_fact m ON e.encounter_id = m.encounter_id
LEFT JOIN best_practice_fhir_clinical.dwd_observation_fact o ON p.patient_id = o.patient_id
GROUP BY
p.patient_id, p.family_name, p.gender, FLOOR(p.age_years),
e.encounter_id, e.department, e.icd_chapter, e.icd_code,
e.primary_diagnosis, e.enc_class, e.los_days, e.enc_status;
REFRESH DYNAMIC TABLE best_practice_fhir_clinical.ads_clinical_quality_metrics;
SELECT patient_id, family_name, age, department, primary_diagnosis,
enc_class, los_days, prescription_count, lab_count,
hba1c_tested, creatinine_tested, spo2_monitored,
hba1c_value, creatinine_value, spo2_value
FROM best_practice_fhir_clinical.ads_clinical_quality_metrics
ORDER BY patient_id;
SELECT
e.icd_chapter AS disease_group,
COUNT(DISTINCT a.encounter_id) AS encounter_count,
ROUND(AVG(CAST(a.los_days AS DOUBLE)), 1) AS avg_los,
SUM(a.prescription_count) AS total_prescriptions,
ROUND(SUM(a.hba1c_tested) * 100.0
/ NULLIF(SUM(CASE WHEN a.icd_code LIKE 'E%' THEN 1 ELSE 0 END), 0), 1) AS diabetes_hba1c_rate_pct,
ROUND(SUM(a.creatinine_tested) * 100.0
/ NULLIF(SUM(CASE WHEN a.icd_code LIKE 'N%' THEN 1 ELSE 0 END), 0), 1) AS ckd_creatinine_rate_pct,
ROUND(SUM(a.spo2_monitored) * 100.0
/ NULLIF(SUM(CASE WHEN a.icd_code LIKE 'J%' THEN 1 ELSE 0 END), 0), 1) AS copd_spo2_rate_pct
FROM best_practice_fhir_clinical.ads_clinical_quality_metrics a
JOIN best_practice_fhir_clinical.dwd_encounter_fact e ON a.encounter_id = e.encounter_id
GROUP BY e.icd_chapter
ORDER BY encounter_count DESC;
-- 查询 2026-06-06 23:38 时刻的就诊数据快照(保险对账用)
SELECT COUNT(*) AS encounter_count
FROM best_practice_fhir_clinical.doc_fhir_encounter
TIMESTAMP AS OF '2026-06-06 23:38:00';
encounter_count
---------------
5
-- 对比当前数据(验证差量)
SELECT COUNT(*) AS current_count FROM best_practice_fhir_clinical.doc_fhir_encounter;
⚠️ 注意:
TIMESTAMP AS OF
TIMESTAMP AS OF
要求使用字面量常量,不支持
NOW() - INTERVAL '1' MONTH
NOW() - INTERVAL '1' MONTH
等表达式。时间戳使用 UTC+8 时区,
DESC HISTORY
DESC HISTORY
返回的时间也是 UTC,查询时注意时区差。
Time Travel 数据保留期默认 7 天,超出保留期的历史版本不可查询。对于月度保险对账场景,建议定期将月末快照