-- 先建 raw 字符串接收表,PIPE 写入 JSON 字符串
CREATE TABLE IF NOT EXISTS best_practice_education_dw.kafka_raw_vle (value STRING);
-- 创建 Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_education_dw.pipe_student_vle
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_education_dw.kafka_raw_vle
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092',
'edu_student_vle_events',
'',
'cz_edu_consumer',
'','','','',
'raw', 'raw',
0,
map()
)
);
💡 提示:PIPE 创建成功后默认处于运行状态,每隔
BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
秒批量消费一次。从
kafka_raw_vle
kafka_raw_vle
到
doc_ods_student_vle
doc_ods_student_vle
的 JSON 解析可通过 Dynamic Table 完成。
方式二:INSERT 模拟(无 Kafka 环境时)
若暂未配置 Kafka,可先将数据保存为本地 CSV 文件,通过 cz-cli 上传到 User Volume 后用 COPY INTO 导入(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/student_vle_data.csv' TO USER VOLUME FILE 'student_vle_data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_education_dw.doc_ods_student_vle
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('student_vle_data.csv');
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_education_dw.dwd_learning_events
AS
SELECT
v.code_module,
v.code_presentation,
v.id_student,
v.id_site,
vl.activity_type,
v.event_date,
v.sum_click,
v.ingest_time,
s.gender,
s.region,
s.highest_education,
s.age_band,
s.final_result AS enrollment_result,
c.module_presentation_length
FROM best_practice_education_dw.doc_ods_student_vle v
LEFT JOIN best_practice_education_dw.doc_ods_student_info s
ON v.code_module = s.code_module
AND v.code_presentation = s.code_presentation
AND v.id_student = s.id_student
LEFT JOIN best_practice_education_dw.doc_ods_vle vl
ON v.id_site = vl.id_site
LEFT JOIN best_practice_education_dw.doc_ods_courses c
ON v.code_module = c.code_module
AND v.code_presentation = c.code_presentation;
⚠️ 注意:Dynamic Table DDL 不写
REFRESH INTERVAL
REFRESH INTERVAL
,刷新调度通过 Studio 任务管理(见后文"Studio 任务调度"章节)。
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_education_dw.dwd_learning_events;
SELECT COUNT(*) AS dwd_count FROM best_practice_education_dw.dwd_learning_events;
dwd_count
---------
29
DWS 层 Dynamic Table:学生课程进度聚合
DWS 层以
id_student
id_student
+
code_module
code_module
+
code_presentation
code_presentation
为粒度聚合 DWD 层数据,输出每位学生在每门课的行为统计,作为 ADS 层评分的直接输入。
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_education_dw.dws_student_course_progress
AS
SELECT
e.code_module,
e.code_presentation,
e.id_student,
e.gender,
e.region,
e.highest_education,
e.age_band,
e.enrollment_result,
e.module_presentation_length,
COUNT(DISTINCT e.event_date) AS days_active,
SUM(e.sum_click) AS total_clicks,
COUNT(DISTINCT e.id_site) AS distinct_resources,
SUM(CASE WHEN e.activity_type = 'quiz' THEN e.sum_click ELSE 0 END) AS quiz_clicks,
COUNT(DISTINCT CASE WHEN e.activity_type = 'quiz' THEN e.event_date END) AS quiz_days,
SUM(CASE WHEN e.activity_type = 'oucontent' THEN e.sum_click ELSE 0 END) AS content_clicks,
MAX(e.event_date) AS last_active_day
FROM best_practice_education_dw.dwd_learning_events e
GROUP BY
e.code_module, e.code_presentation, e.id_student,
e.gender, e.region, e.highest_education, e.age_band,
e.enrollment_result, e.module_presentation_length;
手动触发首次刷新并查看结果:
REFRESH DYNAMIC TABLE best_practice_education_dw.dws_student_course_progress;
SELECT code_module, code_presentation, id_student,
days_active, total_clicks, distinct_resources, quiz_clicks, last_active_day
FROM best_practice_education_dw.dws_student_course_progress
ORDER BY total_clicks DESC
LIMIT 8;
-- 各课程活跃学生数
SELECT
code_module,
code_presentation,
GROUP_BITMAP(CAST(id_student AS BIGINT)) AS active_student_count
FROM best_practice_education_dw.dws_student_course_progress
GROUP BY code_module, code_presentation
ORDER BY code_module;
SELECT
a.code_module,
a.code_presentation,
GROUP_BITMAP(CAST(a.id_student AS BIGINT)) AS total_enrolled,
GROUP_BITMAP(CASE WHEN a.total_clicks > 20 THEN CAST(a.id_student AS BIGINT) END) AS high_engagement
FROM best_practice_education_dw.dws_student_course_progress a
GROUP BY a.code_module, a.code_presentation
ORDER BY a.code_module;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_education_dw.ads_student_learning_score
AS
SELECT
p.code_module,
p.code_presentation,
p.id_student,
p.gender,
p.region,
p.highest_education,
p.age_band,
p.enrollment_result,
p.days_active,
p.total_clicks,
p.distinct_resources,
p.quiz_clicks,
p.last_active_day,
COALESCE(a.submission_count, 0) AS submission_count,
COALESCE(a.avg_score, 0.0) AS avg_score,
COALESCE(a.max_score, 0.0) AS max_score,
best_practice_education_dw.calc_learning_score(
CAST(p.total_clicks AS INT),
a.avg_score,
CAST(COALESCE(a.submission_count, 0) AS INT),
CAST(p.days_active AS INT)
) AS learning_score,
CASE
WHEN best_practice_education_dw.calc_learning_score(
CAST(p.total_clicks AS INT), a.avg_score,
CAST(COALESCE(a.submission_count, 0) AS INT),
CAST(p.days_active AS INT)
) >= 75 THEN 'LOW_RISK'
WHEN best_practice_education_dw.calc_learning_score(
CAST(p.total_clicks AS INT), a.avg_score,
CAST(COALESCE(a.submission_count, 0) AS INT),
CAST(p.days_active AS INT)
) >= 50 THEN 'MEDIUM_RISK'
ELSE 'HIGH_RISK'
END AS risk_level
FROM best_practice_education_dw.dws_student_course_progress p
LEFT JOIN (
SELECT
sa.id_student,
COUNT(*) AS submission_count,
ROUND(AVG(CASE WHEN sa.is_banked = 0 THEN sa.score END), 2) AS avg_score,
MAX(CASE WHEN sa.is_banked = 0 THEN sa.score END) AS max_score
FROM best_practice_education_dw.doc_ods_student_assessment sa
GROUP BY sa.id_student
) a ON p.id_student = CAST(a.id_student AS STRING);
手动触发首次刷新并查看高风险预警结果:
REFRESH DYNAMIC TABLE best_practice_education_dw.ads_student_learning_score;
SELECT code_module, id_student, days_active, total_clicks, avg_score,
learning_score, risk_level
FROM best_practice_education_dw.ads_student_learning_score
ORDER BY learning_score ASC
LIMIT 10;