网约车运营多城市供需分析数仓实践

整合出行平台乘客订单事件、司机 GPS 轨迹和历史行程数据,构建城市级供需分析数仓,支撑动态定价和司机激励策略计算。本文以 NYC Yellow Taxi Trip Data 为数据集,端到端演示 Kafka PIPE → ODS → DWD → DWS → ADS 的完整构建过程,覆盖 Kafka 实时接入、Dynamic Table 分区增量聚合、Table Stream + 激励批处理、SQL UDF 和 Studio Task 调度六项核心能力。


概述

出行平台数仓的典型挑战是:高频 GPS 事件 + 多城市分片订单 → 实时供需比 → 动态定价信号 → 司机激励结算

云器 Lakehouse 通过以下组合解决几个核心问题:

问题解决方案
司机 GPS 位置高频上报,秒级写入Kafka PIPE 持续摄取,无需手写消费者
订单系统分布在多城市 MySQL 分片库MySQL CDC 全库镜像,单 PIPE 合并多源
ODS → DWD → DWS 自动增量计算Dynamic Table,声明式 SQL,系统维护刷新依赖链
DWS 需按时段(早晚高峰/夜间/平峰)分区加速查询静态分区 Dynamic Table,
PARTITIONED BY (time_period)
PARTITIONED BY (time_period)
新完成订单触发司机激励批处理Table Stream 捕获增量行程,ZettaPark Task 消费
逆地理编码(坐标 → 行政区)External Function 调用地图 API(本文展示 SQL UDF 等价实现)

涉及的 SQL 命令

命令 / 函数用途说明
CREATE TABLE
CREATE TABLE
建 ODS 原始行程表和激励结果表普通表,作为 Dynamic Table 上游
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
pickup_longitude
pickup_longitude
创建过滤索引
高基数坐标列的点查加速
CREATE PIPE
CREATE PIPE
创建 Kafka 持续摄取管道实时接入 GPS 和订单事件
CREATE FUNCTION
CREATE FUNCTION
创建 SQL UDF
calc_trip_duration_min
calc_trip_duration_min
calc_surge_factor
calc_surge_factor
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
创建 DWD / DWS / ADS 增量计算表声明式 SQL,系统自动增量刷新
CREATE TABLE STREAM
CREATE TABLE STREAM
创建 APPEND_ONLY 行程变更流捕获新完成订单触发激励批处理
REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
手动触发一次刷新初次构建或调试时使用

前置准备

本文所有示例在

best_practice_ride_hailing
best_practice_ride_hailing
Schema 下运行。

CREATE SCHEMA IF NOT EXISTS best_practice_ride_hailing;

下载数据集

kaggle datasets download -d elemento/nyc-yellow-taxi-trip-data \ --unzip -p /tmp/ride_hailing/

解压后得到 4 个 CSV 文件(2015-01 月、2016-01~03 月),本文使用

yellow_tripdata_2015-01.csv
yellow_tripdata_2015-01.csv
的前 100 行作为演示数据集,字段包括上车/下车时间、位置坐标、行程距离、票价、小费等 19 列。

建 ODS 表

CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_trips ( vendor_id INT, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, passenger_count INT, trip_distance DOUBLE, pickup_longitude DOUBLE, pickup_latitude DOUBLE, rate_code_id INT, store_fwd_flag STRING, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, payment_type INT, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

ingest_time
ingest_time
使用
DEFAULT CURRENT_TIMESTAMP()
DEFAULT CURRENT_TIMESTAMP()
,Kafka PIPE 写入时自动填充,无需在消息体中携带。

创建 Bloomfilter Index

出行平台按上车坐标做地理围栏查询是高频操作,

pickup_longitude
pickup_longitude
列高基数适合 Bloomfilter 加速。

CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_pickup_lon ON TABLE doc_ods_trips (pickup_longitude);


ODS 层:实时接入与历史数据导入

Kafka PIPE 实时接入

生产环境中,司机 GPS 位置和订单状态变更通过 Kafka 实时上报。先建原始 JSON 接收表,再创建 PIPE:

-- 接收 Kafka 消息的原始表 CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_kafka_raw ( value STRING ); -- 创建 Kafka PIPE CREATE PIPE IF NOT EXISTS best_practice_ride_hailing.pipe_trip_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '30' AS COPY INTO best_practice_ride_hailing.doc_ods_kafka_raw FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- 替换为实际 broker 地址 'nyc_trip_events', -- topic 名称 '', 'cz_ride_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

方式一:通过 Kafka 实际写入(推荐)

在有 Kafka 环境时,可通过向

nyc_trip_events
nyc_trip_events
topic 发送消息触发 PIPE 摄取。以下是
kafka-python
kafka-python
生产者示例,展示构造一条行程事件消息并发送:

from kafka import KafkaProducer import json import time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) trip_event = { "vendor_id": 2, "pickup_datetime": "2015-01-15 19:05:39", "dropoff_datetime": "2015-01-15 19:23:42", "passenger_count": 1, "trip_distance": 1.59, "pickup_longitude": -73.993896, "pickup_latitude": 40.750110, "dropoff_longitude": -73.974784, "dropoff_latitude": 40.750617, "payment_type": 1, "fare_amount": 12.0, "tip_amount": 3.25, "total_amount": 17.05 } producer.send('nyc_trip_events', value=trip_event) producer.flush() print(f"Sent trip event: {trip_event['pickup_datetime']}")

PIPE 每隔

BATCH_INTERVAL_IN_SECONDS
BATCH_INTERVAL_IN_SECONDS
秒批量消费一次,消息自动写入
doc_ods_kafka_raw
doc_ods_kafka_raw
后由下游 Dynamic Table 解析。

方式二:INSERT 模拟(无 Kafka 环境时)

若暂未配置 Kafka 环境,可先将数据保存为本地 CSV 文件,通过 cz-cli 上传到 User Volume 后用 COPY INTO 导入(推荐):

从本地 CSV 导入(推荐)

-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume PUT '/path/to/nyc_trips_data.csv' TO USER VOLUME FILE 'nyc_trips_data.csv';

-- 第二步:从 User Volume COPY INTO 表 COPY INTO best_practice_ride_hailing.doc_ods_trips FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('nyc_trips_data.csv');

验证 ODS 行数:

SELECT COUNT(*) AS ods_row_count FROM best_practice_ride_hailing.doc_ods_trips;

ods_row_count ------------- 100


DWD 层 Dynamic Table:行程标准化与特征计算

DWD 层在 ODS 基础上做两件事:

  1. 调用 SQL UDF
    calc_trip_duration_min
    calc_trip_duration_min
    计算行程时长,避免在多处重复写时间差公式
  2. 打时段标签(
    time_period
    time_period
    )和计算单位里程票价(
    fare_per_mile
    fare_per_mile
    )、小费率(
    tip_rate_pct
    tip_rate_pct
    ),方便 DWS 层直接聚合

创建行程时长 UDF

CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_trip_duration_min( pickup_ts TIMESTAMP, dropoff_ts TIMESTAMP ) RETURNS DOUBLE AS ROUND((UNIX_TIMESTAMP(dropoff_ts) - UNIX_TIMESTAMP(pickup_ts)) / 60.0, 2);

验证函数(第一条数据:19:05:39 → 19:23:42,行程 18.05 分钟):

SELECT best_practice_ride_hailing.calc_trip_duration_min( CAST('2015-01-15 19:05:39' AS TIMESTAMP), CAST('2015-01-15 19:23:42' AS TIMESTAMP) ) AS duration_min;

duration_min ------------ 18.05

建 DWD Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dwd_trip_events AS SELECT vendor_id, pickup_datetime, dropoff_datetime, passenger_count, trip_distance, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, rate_code_id, store_fwd_flag, payment_type, fare_amount, tip_amount, tolls_amount, total_amount, best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) AS trip_duration_min, CASE WHEN HOUR(pickup_datetime) BETWEEN 7 AND 9 THEN 'morning_peak' WHEN HOUR(pickup_datetime) BETWEEN 17 AND 19 THEN 'evening_peak' WHEN HOUR(pickup_datetime) BETWEEN 22 AND 23 OR HOUR(pickup_datetime) BETWEEN 0 AND 5 THEN 'night' ELSE 'offpeak' END AS time_period, CASE WHEN trip_distance > 0 AND best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0 THEN ROUND(fare_amount / (trip_distance + 0.001), 2) ELSE NULL END AS fare_per_mile, CASE WHEN best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0 THEN ROUND(tip_amount / (total_amount + 0.001) * 100, 2) ELSE NULL END AS tip_rate_pct, ingest_time FROM best_practice_ride_hailing.doc_ods_trips WHERE pickup_datetime IS NOT NULL AND dropoff_datetime IS NOT NULL AND trip_distance >= 0 AND total_amount > 0;

手动触发首次刷新:

REFRESH DYNAMIC TABLE best_practice_ride_hailing.dwd_trip_events;

SELECT COUNT(*) AS dwd_count FROM best_practice_ride_hailing.dwd_trip_events;

dwd_count --------- 100

查看晚高峰行程样例:

SELECT vendor_id, pickup_datetime, trip_distance, trip_duration_min, time_period, fare_per_mile, tip_rate_pct FROM best_practice_ride_hailing.dwd_trip_events WHERE time_period = 'evening_peak' ORDER BY total_amount DESC LIMIT 5;

vendor_id | pickup_datetime | trip_distance | trip_duration_min | time_period | fare_per_mile | tip_rate_pct ----------+--------------------------+---------------+-------------------+--------------+---------------+------------- 2 | 2015-01-15T19:05:42 | 18.06 | 43.42 | evening_peak | 2.88 | 9.36 1 | 2015-01-10T19:12:21 | 16.4 | 33.78 | evening_peak | 3.17 | 15.92 2 | 2015-01-15T19:05:40 | 8.33 | 22.63 | evening_peak | 3.12 | 19.61 2 | 2015-01-15T19:05:41 | 7.13 | 14.68 | evening_peak | 3.02 | 16.19 2 | 2015-01-15T19:05:43 | 0.01 | 0.02 | evening_peak | 5454.55 | 0

结果解读:晚高峰长途行程(18 英里,43 分钟)票价约 $2.88/英里,小费率 9.4%;短途极端值(0.01 英里)的

fare_per_mile
fare_per_mile
因分母极小而失真,实际分析时可加
WHERE trip_distance > 0.5
WHERE trip_distance > 0.5
过滤。


DWS 层 Dynamic Table:时段供需聚合(静态分区)

DWS 层按时段(

time_period
time_period
)分区,把早晚高峰、夜间、平峰分别存在不同分区,查询时通过分区剪枝跳过不相关分区,加速供需比计算。

创建动态定价倍率 UDF

CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_surge_factor( trip_count INT, time_period STRING ) RETURNS DOUBLE AS CASE WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 15 THEN 1.8 WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 10 THEN 1.5 WHEN time_period = 'night' AND trip_count > 10 THEN 1.3 ELSE 1.0 END;

验证:

SELECT best_practice_ride_hailing.calc_surge_factor(20, 'morning_peak') AS surge_peak, best_practice_ride_hailing.calc_surge_factor(8, 'offpeak') AS surge_offpeak, best_practice_ride_hailing.calc_surge_factor(12, 'night') AS surge_night;

surge_peak | surge_offpeak | surge_night -----------+---------------+------------ 1.8 | 1 | 1.3

建分区 DWS Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dws_hourly_stats ( hour_window, time_period, trip_count, total_passengers, avg_distance_miles, avg_duration_min, avg_fare, avg_tip_rate_pct, total_revenue, avg_fare_per_mile, credit_card_trips, cash_trips ) PARTITIONED BY (time_period) AS SELECT DATE_TRUNC('hour', pickup_datetime) AS hour_window, time_period, COUNT(*) AS trip_count, SUM(passenger_count) AS total_passengers, ROUND(AVG(trip_distance), 2) AS avg_distance_miles, ROUND(AVG(trip_duration_min), 2) AS avg_duration_min, ROUND(AVG(fare_amount), 2) AS avg_fare, ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct, ROUND(SUM(total_amount), 2) AS total_revenue, ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile, SUM(CASE WHEN payment_type = 1 THEN 1 ELSE 0 END) AS credit_card_trips, SUM(CASE WHEN payment_type = 2 THEN 1 ELSE 0 END) AS cash_trips FROM best_practice_ride_hailing.dwd_trip_events WHERE time_period = SESSION_CONFIGS()['dt.args.time_period'] GROUP BY DATE_TRUNC('hour', pickup_datetime), time_period;

刷新各时段分区:

SET dt.args.time_period = 'morning_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'morning_peak'); SET dt.args.time_period = 'evening_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'evening_peak'); SET dt.args.time_period = 'night'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'night'); SET dt.args.time_period = 'offpeak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'offpeak');

查看各时段供需汇总:

SELECT hour_window, time_period, trip_count, avg_distance_miles, avg_fare, total_revenue, credit_card_trips, cash_trips FROM best_practice_ride_hailing.dws_hourly_stats ORDER BY hour_window, time_period;

hour_window | time_period | trip_count | avg_distance_miles | avg_fare | total_revenue | credit_card_trips | cash_trips ---------------------+--------------+------------+--------------------+----------+---------------+-------------------+----------- 2015-01-04T13:00:00 | offpeak | 17 | 2.16 | 9.21 | 198.26 | 10 | 7 2015-01-10T19:00:00 | evening_peak | 2 | 9.65 | 32.75 | 77.1 | 1 | 1 2015-01-10T20:00:00 | offpeak | 14 | 3.3 | 13.68 | 237.43 | 7 | 7 2015-01-15T14:00:00 | offpeak | 22 | 4.19 | 17.75 | 481.82 | 11 | 11 2015-01-15T19:00:00 | evening_peak | 22 | 3.22 | 15.84 | 447.35 | 19 | 3 2015-01-25T00:00:00 | night | 21 | 2.65 | 11.76 | 308.95 | 15 | 6 2015-01-26T12:00:00 | offpeak | 2 | 2.65 | 11.25 | 25.65 | 1 | 1

结果解读

  • 1 月 15 日晚高峰(22 次行程,平均票价 $15.84)与平峰(22 次,$17.75)行程量相当,但平峰行程距离更长(4.19 vs 3.22 英里),总收入更高($481 vs $447)。
  • 晚高峰信用卡支付比例高(19/22 = 86%),夜间也偏向信用卡(15/21 = 71%),可用于支付场景定向优惠策略。

供需汇总(按时段合并):

SELECT time_period, SUM(trip_count) AS total_trips, ROUND(AVG(avg_fare), 2) AS weighted_avg_fare, ROUND(SUM(total_revenue), 2) AS total_revenue FROM best_practice_ride_hailing.dws_hourly_stats GROUP BY time_period ORDER BY total_trips DESC;

time_period | total_trips | weighted_avg_fare | total_revenue -------------+-------------+-------------------+-------------- offpeak | 55 | 12.97 | 943.16 evening_peak | 24 | 24.3 | 524.45 night | 21 | 11.76 | 308.95


ADS 层 Dynamic Table:行程效率与司机激励数据集市

ADS 层以天 × 时段 × 支付方式为粒度聚合,输出行程效率画像和距离分段标签,供动态定价模型和司机激励计划直接读取。

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.ads_trip_efficiency AS SELECT DATE(pickup_datetime) AS trip_date, time_period, payment_type, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance_miles, ROUND(AVG(trip_duration_min), 2) AS avg_duration_min, ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile, ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct, ROUND(SUM(total_amount), 2) AS total_revenue, ROUND(AVG(total_amount), 2) AS avg_trip_revenue, CASE WHEN AVG(trip_distance) >= 5 THEN 'long_haul' WHEN AVG(trip_distance) >= 2 THEN 'medium' ELSE 'short' END AS distance_segment FROM best_practice_ride_hailing.dwd_trip_events GROUP BY DATE(pickup_datetime), time_period, payment_type;

手动触发刷新:

REFRESH DYNAMIC TABLE best_practice_ride_hailing.ads_trip_efficiency;

查看收入最高的时段 × 行程类型组合:

SELECT trip_date, time_period, trip_count, avg_distance_miles, avg_fare_per_mile, avg_tip_rate_pct, total_revenue, distance_segment FROM best_practice_ride_hailing.ads_trip_efficiency ORDER BY total_revenue DESC LIMIT 8;

trip_date | time_period | trip_count | avg_distance_miles | avg_fare_per_mile | avg_tip_rate_pct | total_revenue | distance_segment ------------+--------------+------------+--------------------+-------------------+------------------+---------------+----------------- 2015-01-15 | evening_peak | 19 | 3.31 | 291.86 | 14.43 | 402.95 | medium 2015-01-15 | offpeak | 11 | 5.15 | 6.06 | 16.85 | 310.02 | long_haul 2015-01-25 | night | 15 | 2.57 | 4.7 | 15.34 | 226.15 | medium 2015-01-15 | offpeak | 11 | 3.23 | 5.97 | 0 | 171.8 | medium 2015-01-04 | offpeak | 10 | 3.07 | 4.92 | 14.07 | 152.66 | medium 2015-01-10 | offpeak | 7 | 2.73 | 6.29 | 14.4 | 120.5 | medium 2015-01-10 | offpeak | 7 | 3.87 | 5.99 | 0 | 116.93 | medium 2015-01-25 | night | 6 | 2.84 | 24.39 | 0 | 82.8 | medium

结果解读

  • 1 月 15 日晚高峰信用卡行程(19 次)总收入最高($402.95),平均小费率 14.4%,是激励重点时段。
  • avg_fare_per_mile
    avg_fare_per_mile
    第一行出现 291.86 的极端值(来自极短行程 0.01 英里),实际使用时建议加
    WHERE trip_distance > 0.5
    WHERE trip_distance > 0.5
    过滤。
  • 平峰长途行程(5+ 英里,$310.02)在激励分配中值得单独设立里程奖励池。

Table Stream + 激励批处理

出行平台的司机激励计算需要:每新增一批完成订单 → 统计当日司机行程数 → 判断激励等级 → 写入激励结果表。Table Stream + ZettaPark Task 正好匹配这个模式。

创建 Table Stream

CREATE TABLE STREAM IF NOT EXISTS best_practice_ride_hailing.stream_new_trips ON TABLE best_practice_ride_hailing.doc_ods_trips WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

doc_ods_trips
doc_ods_trips
有新行写入后,Stream 会捕获这些增量行:

SELECT COUNT(*) AS stream_rows FROM best_practice_ride_hailing.stream_new_trips;

stream_rows ----------- 10

SELECT vendor_id, pickup_datetime, trip_distance, total_amount, fare_amount, tip_amount FROM best_practice_ride_hailing.stream_new_trips ORDER BY pickup_datetime LIMIT 5;

vendor_id | pickup_datetime | trip_distance | total_amount | fare_amount | tip_amount ----------+---------------------+---------------+--------------+-------------+----------- 1 | 2015-01-26T12:41:09 | 0.5 | 5.3 | 4.5 | 0 1 | 2015-01-26T12:41:09 | 0.8 | 5.8 | 5 | 0 1 | 2015-01-26T12:41:10 | 1.1 | 18.35 | 14.5 | 3.05 1 | 2015-01-26T12:41:10 | 2.9 | 14.8 | 14 | 0 1 | 2015-01-26T12:41:11 | 0.3 | 4.8 | 4 | 0

建激励结果表并消费 Stream

CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_driver_incentive_batch ( batch_date DATE, vendor_id INT, new_trip_count INT, new_revenue DOUBLE, avg_trip_value DOUBLE, incentive_tier STRING, processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

消费 Stream 并写入激励结果:

INSERT INTO best_practice_ride_hailing.doc_driver_incentive_batch (batch_date, vendor_id, new_trip_count, new_revenue, avg_trip_value, incentive_tier) SELECT DATE(pickup_datetime) AS batch_date, vendor_id, COUNT(*) AS new_trip_count, ROUND(SUM(total_amount), 2) AS new_revenue, ROUND(AVG(total_amount), 2) AS avg_trip_value, CASE WHEN COUNT(*) >= 5 THEN 'gold' WHEN COUNT(*) >= 3 THEN 'silver' ELSE 'bronze' END AS incentive_tier FROM best_practice_ride_hailing.stream_new_trips GROUP BY DATE(pickup_datetime), vendor_id;

SELECT batch_date, vendor_id, new_trip_count, new_revenue, incentive_tier FROM best_practice_ride_hailing.doc_driver_incentive_batch;

batch_date | vendor_id | new_trip_count | new_revenue | incentive_tier ------------+-----------+----------------+-------------+--------------- 2015-01-26 | 1 | 10 | 108.86 | gold

结果解读:vendor 1 当日新增 10 次行程,总收入 $108.86,达到 gold 激励等级(≥5 次行程)。Stream 消费后位点自动推进,下次 INSERT 只处理之后的新增行,无需人工管理游标。


Studio Task 调度

Dynamic Table 的定期刷新通过 Studio Task 管理,不在 DDL 里写

REFRESH INTERVAL
REFRESH INTERVAL
。本文在
skill_test
skill_test
profile 下创建了三个刷新任务:

# 1. DWD 行程标准化刷新任务(每 15 分钟) cz-cli task create refresh_dwd_trip_events --type SQL -p skill_test # 返回: {"data":{"id":10354660,...}} cz-cli task save-content 10354660 \ --content "REFRESH DYNAMIC TABLE best_practice_ride_hailing.dwd_trip_events;" \ -p skill_test cz-cli task save-cron 10354660 --cron "0 */15 * * * ?" -p skill_test # 2. DWS 各时段分区刷新任务(每 30 分钟) cz-cli task create refresh_dws_hourly_stats --type SQL -p skill_test # 返回: {"data":{"id":10354661,...}} cz-cli task save-content 10354661 \ --content "SET dt.args.time_period = 'morning_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'morning_peak'); SET dt.args.time_period = 'evening_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'evening_peak'); SET dt.args.time_period = 'night'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'night'); SET dt.args.time_period = 'offpeak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'offpeak');" \ -p skill_test cz-cli task save-cron 10354661 --cron "0 */30 * * * ?" -p skill_test # 3. ADS 行程效率刷新任务(每日凌晨 1 点) cz-cli task create refresh_ads_trip_efficiency --type SQL -p skill_test # 返回: {"data":{"id":10353704,...}} cz-cli task save-content 10353704 \ --content "REFRESH DYNAMIC TABLE best_practice_ride_hailing.ads_trip_efficiency;" \ -p skill_test cz-cli task save-cron 10353704 --cron "0 0 1 * * ?" -p skill_test


数仓对象总览

全部构建完成后,

best_practice_ride_hailing
best_practice_ride_hailing
Schema 下的对象:

SHOW TABLES IN best_practice_ride_hailing;

schema_name | table_name | is_dynamic ------------------------------+---------------------------+----------- best_practice_ride_hailing | doc_ods_trips | false best_practice_ride_hailing | doc_ods_kafka_raw | false best_practice_ride_hailing | doc_driver_incentive_batch| false best_practice_ride_hailing | dwd_trip_events | true best_practice_ride_hailing | dws_hourly_stats | true best_practice_ride_hailing | ads_trip_efficiency | true

数据流架构:


注意事项

  • Bloomfilter Index 对存量数据不自动生效

    CREATE BLOOMFILTER INDEX
    CREATE BLOOMFILTER INDEX
    只对创建后写入的新数据生效。已有的存量行程数据不会被索引覆盖,Bloomfilter 类型也不支持
    BUILD INDEX
    BUILD INDEX
    重建,如需存量覆盖,需重建表。

  • 分区 Dynamic Table 必须使用静态分区声明

    dws_hourly_stats
    dws_hourly_stats
    使用
    PARTITIONED BY (time_period)
    PARTITIONED BY (time_period)
    ,必须配合
    SESSION_CONFIGS()['dt.args.time_period']
    SESSION_CONFIGS()['dt.args.time_period']
    按分区刷新。不能直接在 DDL 中写
    REFRESH INTERVAL
    REFRESH INTERVAL
    ;调度通过 Studio Task 管理。

  • Table Stream 消费后位点自动推进:对

    stream_new_trips
    stream_new_trips
    执行的每次
    INSERT INTO ... SELECT FROM stream
    INSERT INTO ... SELECT FROM stream
    操作都会推进消费位点。如果同一个 Stream 被多个下游消费,每个消费者需要各自创建独立的 Stream 对象,共享同一个 Stream 对象会导致消费竞争。

  • calc_surge_factor 阈值是示例值:当前倍率阈值(高峰 15 次行程触发 1.5×)基于演示数据集设定,生产环境应根据城市历史供需数据动态标定。

  • Dynamic Table 第一次刷新是全量快照

    dwd_trip_events
    dwd_trip_events
    第一次
    REFRESH
    REFRESH
    会对
    doc_ods_trips
    doc_ods_trips
    做全量扫描;后续增量刷新只处理自上次刷新点以来新增或变更的行。若 ODS 层使用
    INSERT OVERWRITE
    INSERT OVERWRITE
    写入,会导致 Dynamic Table 退化为全量刷新。


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询