-- 接收 Kafka 消息的原始表
CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_kafka_raw (
value STRING
);
-- 创建 Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_ride_hailing.pipe_trip_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '30'
AS
COPY INTO best_practice_ride_hailing.doc_ods_kafka_raw
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'nyc_trip_events', -- topic 名称
'',
'cz_ride_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/nyc_trips_data.csv' TO USER VOLUME FILE 'nyc_trips_data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_ride_hailing.doc_ods_trips
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('nyc_trips_data.csv');
验证 ODS 行数:
SELECT COUNT(*) AS ods_row_count FROM best_practice_ride_hailing.doc_ods_trips;
ods_row_count
-------------
100
DWD 层 Dynamic Table:行程标准化与特征计算
DWD 层在 ODS 基础上做两件事:
调用 SQL UDF
calc_trip_duration_min
calc_trip_duration_min
计算行程时长,避免在多处重复写时间差公式
打时段标签(
time_period
time_period
)和计算单位里程票价(
fare_per_mile
fare_per_mile
)、小费率(
tip_rate_pct
tip_rate_pct
),方便 DWS 层直接聚合
创建行程时长 UDF
CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_trip_duration_min(
pickup_ts TIMESTAMP,
dropoff_ts TIMESTAMP
)
RETURNS DOUBLE
AS ROUND((UNIX_TIMESTAMP(dropoff_ts) - UNIX_TIMESTAMP(pickup_ts)) / 60.0, 2);
验证函数(第一条数据:19:05:39 → 19:23:42,行程 18.05 分钟):
SELECT best_practice_ride_hailing.calc_trip_duration_min(
CAST('2015-01-15 19:05:39' AS TIMESTAMP),
CAST('2015-01-15 19:23:42' AS TIMESTAMP)
) AS duration_min;
duration_min
------------
18.05
建 DWD Dynamic Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dwd_trip_events
AS
SELECT
vendor_id,
pickup_datetime,
dropoff_datetime,
passenger_count,
trip_distance,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
rate_code_id,
store_fwd_flag,
payment_type,
fare_amount,
tip_amount,
tolls_amount,
total_amount,
best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) AS trip_duration_min,
CASE
WHEN HOUR(pickup_datetime) BETWEEN 7 AND 9 THEN 'morning_peak'
WHEN HOUR(pickup_datetime) BETWEEN 17 AND 19 THEN 'evening_peak'
WHEN HOUR(pickup_datetime) BETWEEN 22 AND 23
OR HOUR(pickup_datetime) BETWEEN 0 AND 5 THEN 'night'
ELSE 'offpeak'
END AS time_period,
CASE
WHEN trip_distance > 0
AND best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0
THEN ROUND(fare_amount / (trip_distance + 0.001), 2)
ELSE NULL
END AS fare_per_mile,
CASE
WHEN best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0
THEN ROUND(tip_amount / (total_amount + 0.001) * 100, 2)
ELSE NULL
END AS tip_rate_pct,
ingest_time
FROM best_practice_ride_hailing.doc_ods_trips
WHERE pickup_datetime IS NOT NULL
AND dropoff_datetime IS NOT NULL
AND trip_distance >= 0
AND total_amount > 0;
⚠️ 注意:
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
的 DDL 中不写
REFRESH INTERVAL
REFRESH INTERVAL
。刷新调度通过 Studio Task 管理(见下文"Studio Task 调度"章节),这样可以在同一任务上附加数据质量检查和告警规则。
SELECT COUNT(*) AS dwd_count FROM best_practice_ride_hailing.dwd_trip_events;
dwd_count
---------
100
查看晚高峰行程样例:
SELECT vendor_id, pickup_datetime, trip_distance, trip_duration_min,
time_period, fare_per_mile, tip_rate_pct
FROM best_practice_ride_hailing.dwd_trip_events
WHERE time_period = 'evening_peak'
ORDER BY total_amount DESC
LIMIT 5;
CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_surge_factor(
trip_count INT,
time_period STRING
)
RETURNS DOUBLE
AS CASE
WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 15 THEN 1.8
WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 10 THEN 1.5
WHEN time_period = 'night' AND trip_count > 10 THEN 1.3
ELSE 1.0
END;
验证:
SELECT
best_practice_ride_hailing.calc_surge_factor(20, 'morning_peak') AS surge_peak,
best_practice_ride_hailing.calc_surge_factor(8, 'offpeak') AS surge_offpeak,
best_practice_ride_hailing.calc_surge_factor(12, 'night') AS surge_night;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dws_hourly_stats (
hour_window, time_period, trip_count, total_passengers,
avg_distance_miles, avg_duration_min, avg_fare, avg_tip_rate_pct,
total_revenue, avg_fare_per_mile, credit_card_trips, cash_trips
)
PARTITIONED BY (time_period)
AS
SELECT
DATE_TRUNC('hour', pickup_datetime) AS hour_window,
time_period,
COUNT(*) AS trip_count,
SUM(passenger_count) AS total_passengers,
ROUND(AVG(trip_distance), 2) AS avg_distance_miles,
ROUND(AVG(trip_duration_min), 2) AS avg_duration_min,
ROUND(AVG(fare_amount), 2) AS avg_fare,
ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct,
ROUND(SUM(total_amount), 2) AS total_revenue,
ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile,
SUM(CASE WHEN payment_type = 1 THEN 1 ELSE 0 END) AS credit_card_trips,
SUM(CASE WHEN payment_type = 2 THEN 1 ELSE 0 END) AS cash_trips
FROM best_practice_ride_hailing.dwd_trip_events
WHERE time_period = SESSION_CONFIGS()['dt.args.time_period']
GROUP BY DATE_TRUNC('hour', pickup_datetime), time_period;
SELECT hour_window, time_period, trip_count, avg_distance_miles,
avg_fare, total_revenue, credit_card_trips, cash_trips
FROM best_practice_ride_hailing.dws_hourly_stats
ORDER BY hour_window, time_period;
SELECT time_period,
SUM(trip_count) AS total_trips,
ROUND(AVG(avg_fare), 2) AS weighted_avg_fare,
ROUND(SUM(total_revenue), 2) AS total_revenue
FROM best_practice_ride_hailing.dws_hourly_stats
GROUP BY time_period
ORDER BY total_trips DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.ads_trip_efficiency
AS
SELECT
DATE(pickup_datetime) AS trip_date,
time_period,
payment_type,
COUNT(*) AS trip_count,
ROUND(AVG(trip_distance), 2) AS avg_distance_miles,
ROUND(AVG(trip_duration_min), 2) AS avg_duration_min,
ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile,
ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct,
ROUND(SUM(total_amount), 2) AS total_revenue,
ROUND(AVG(total_amount), 2) AS avg_trip_revenue,
CASE
WHEN AVG(trip_distance) >= 5 THEN 'long_haul'
WHEN AVG(trip_distance) >= 2 THEN 'medium'
ELSE 'short'
END AS distance_segment
FROM best_practice_ride_hailing.dwd_trip_events
GROUP BY DATE(pickup_datetime), time_period, payment_type;
CREATE TABLE STREAM IF NOT EXISTS best_practice_ride_hailing.stream_new_trips
ON TABLE best_practice_ride_hailing.doc_ods_trips
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
在
doc_ods_trips
doc_ods_trips
有新行写入后,Stream 会捕获这些增量行:
SELECT COUNT(*) AS stream_rows FROM best_practice_ride_hailing.stream_new_trips;
stream_rows
-----------
10
SELECT vendor_id, pickup_datetime, trip_distance, total_amount, fare_amount, tip_amount
FROM best_practice_ride_hailing.stream_new_trips
ORDER BY pickup_datetime
LIMIT 5;
INSERT INTO best_practice_ride_hailing.doc_driver_incentive_batch
(batch_date, vendor_id, new_trip_count, new_revenue, avg_trip_value, incentive_tier)
SELECT
DATE(pickup_datetime) AS batch_date,
vendor_id,
COUNT(*) AS new_trip_count,
ROUND(SUM(total_amount), 2) AS new_revenue,
ROUND(AVG(total_amount), 2) AS avg_trip_value,
CASE
WHEN COUNT(*) >= 5 THEN 'gold'
WHEN COUNT(*) >= 3 THEN 'silver'
ELSE 'bronze'
END AS incentive_tier
FROM best_practice_ride_hailing.stream_new_trips
GROUP BY DATE(pickup_datetime), vendor_id;
SELECT batch_date, vendor_id, new_trip_count, new_revenue, incentive_tier
FROM best_practice_ride_hailing.doc_driver_incentive_batch;