智能驾驶全闭环数据平台解决方案


一、方案背景

智能驾驶行业的数据挑战

自动驾驶技术的核心竞争力在于数据——谁能更快地将海量行驶数据转化为高质量训练数据并迭代模型,谁就能在竞争中领先。然而,现实中智能驾驶企业面临的数据挑战极为复杂:

数据来源多样、规模巨大

  • 单辆路测车每小时产生数 GB 数据(摄像头标注 + LiDAR 点云 + CAN 总线)
  • 量产车队规模达百万辆,高频遥测 QPS 峰值 10 万~100 万 msg/s(参考吉利 AutoMQ 案例)
  • 数据格式异构:结构化时序(CAN 信号)、半结构化(JSON 事件)、大文件(Parquet 标注)

数据闭环路径长、链路断裂 从路测采集到模型更新上车,中间需要经历标注、仿真扩增、训练集构建、离线评测、OTA 灰度、遥测回流等多个环节,每个环节往往由不同团队使用不同工具完成,数据在孤岛之间传递,效率低下。

长尾场景难以覆盖 极端天气、稀有障碍物、特殊交通场景(NEAR_MISS、接管)在真实采集中出现概率极低,但对安全性验证至关重要。靠人工收集长尾数据成本极高、周期极长。

合规与安全要求严格 监管机构对自动驾驶的安全评估、OTA 升级、故障追溯有明确要求,需要完整的数据证据链和风险事件记录。


二、传统方案的痛点

架构碎片化,工具链臃肿

环节传统工具痛点
批量数据入湖Spark / Flink + HDFS需要独立集群,运维成本高,小文件问题严重
实时事件接入Flink + Kafka与离线链路割裂,数据一致性难保证
标注数据管理自研数据库 + Python版本管理混乱,难以追溯
AI 推理集成Python 微服务调用 LLM工程链路长,与数据处理耦合困难
指标聚合Hive / ClickHouse需要独立部署,查询延迟高
数据质量监控自研脚本 + 定时任务缺乏实时感知,问题发现滞后

以一个典型的智驾数据团队为例,维护上述工具链需要 Spark 集群 + Flink 集群 + 消息队列 + 多个数据库 + LLM 服务,基础设施成本和运维负担极重,数据工程师的大量精力耗费在管道维护而非业务价值上。

数据闭环周期长

传统架构下,从量产车上传一个接管事件到该事件进入下一轮训练集,往往需要 2~4 周

接管事件上报 ↓(T+1 批量处理) 数仓入库 ↓(人工排期) 标注任务分配 ↓(3~5 天) 标注完成审核 ↓(人工构建) 训练集版本打包 ↓(提交集群) 模型训练评估 ↓(部署审批) OTA 上线

国内头部车企新势力已将数据闭环周期压缩到数周以内,传统架构已成为瓶颈。

长尾场景补充能力弱

仿真系统与数据平台相互独立,真实采集的边缘场景无法自动注入仿真场景库,导致长尾场景覆盖率低,模型在极端情况下表现差。


三、ClickZetta Lakehouse 方案

方案架构

本方案基于 ClickZetta Lakehouse 构建一体化智能驾驶数据平台,覆盖研发、测试、量产运营三个阶段,通过 Data Flywheel 将量产车行驶数据持续转化为训练数据,驱动模型迭代形成正向飞轮。

ClickZetta Lakehouse 智能驾驶方案架构

十大功能模块

模块核心能力业务价值
M1 多模态数据采集COPY INTO 批量入湖,自动去重幂等路测 Parquet 自动化入湖,无需人工干预
M2 数据标注AI_COMPLETE 预标注 + HITL 人工审核预标注覆盖率 100%,人工工作量减少 60%+
M3 仿真与合成数据INVERTED INDEX 场景检索 + AI 场景分类长尾场景自动补充,打破真实数据瓶颈
M4 训练数据准备版本化训练集 + Window Function 特征工程真实/合成数据比例可控,全流程可追溯
M5 路测回放与影子模式divergence_score 评测,30min 自动刷新新算法无干预评测,上线决策有量化依据
M6 量产遥测与故障诊断AI_COMPLETE DTC 诊断 + Flywheel 注入DTC 故障自动描述,边缘场景持续积累
M7 OTA 灰度追踪15min 刷新,双指标驱动扩量决策OTA 健康全程可视,异常自动预警
M8 安全与合规多模块风险汇聚,L1-L4 严重分级证据链完整,满足监管合规要求
M9 端到端演示全链路一键运行,Flywheel 闭环验证快速 POC,降低方案验证成本
M10 车机端 Kafka 实时上报4 个 PIPE,分钟级入湖,秒级告警量产车数据实时接入,补齐路测盲区

Data Flywheel:两条并行路径

批量路径(M6):量产车每日路测数据经 COPY INTO 批量入湖,MERGE INTO 清洗后,通过

06_flywheel_scene_extract.sql
06_flywheel_scene_extract.sql
将 HARD_BRAKE / TAKEOVER / ANOMALY 事件自动注入仿真场景库。

实时路径(M10):量产车实时安全事件经 Kafka PIPE 秒级入湖,Dynamic Table 1 分钟刷新后,通过

04_flywheel_bridge.sql
04_flywheel_bridge.sql
实时注入场景库,同时触发安全告警写入 M8 合规链路。

两条路径汇入统一的

sim_ods_scenarios
sim_ods_scenarios
,场景类型自动分类(LONG_TAIL / CORNER_CASE),INVERTED INDEX 支持语义检索,供仿真平台按需提取。


四、ClickZetta Lakehouse 技术优势

1. SQL-first,零额外依赖

全部处理逻辑基于标准 SQL,无需 Python / Spark / Flink 环境。AI 推理(DTC 诊断、场景分类、标注质量评分)通过

AI_COMPLETE()
AI_COMPLETE()
内嵌在 SQL 中,消除了微服务调用的工程复杂度:

-- 在 SQL 中直接调用 LLM,逐行完成 DTC 故障诊断 UPDATE fleet_dwd_dtc_records SET ai_diagnosis = AI_COMPLETE( 'conn_dashscope:deepseek-v3', '诊断以下故障码,返回根因和建议:' || dtc_code ) WHERE ai_diagnosis IS NULL;

2. Dynamic Table 替代 Spark Streaming

传统方案需要独立维护 Flink/Spark Streaming 集群处理增量数据。Lakehouse Dynamic Table 用一条 SQL 声明即实现增量自动刷新:

CREATE DYNAMIC TABLE veh_dws_realtime_alert REFRESH INTERVAL 1 MINUTE -- 1 分钟级实时告警 AS SELECT vehicle_id, COUNT(*) AS event_count, MAX(risk_level) AS max_risk FROM veh_dwd_safety_events_clean WHERE msg_timestamp >= CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE GROUP BY vehicle_id HAVING MAX(risk_level) IN ('L3','L4');

无需部署 Streaming 集群,无需管理 checkpoint,刷新间隔从 1 分钟到 6 小时灵活配置。

3. Kafka PIPE 实现准实时接入

原生支持 Kafka 持续消费,4 个 Topic 对应 4 个独立 PIPE,安全事件 PIPE 批次 10 秒,确保告警链路 <30 秒端到端延迟:

CREATE PIPE pipe_vehicle_safety_events BATCH_INTERVAL_IN_SECONDS = '10' -- 安全事件低延迟 AS COPY INTO veh_ods_safety_events FROM read_kafka(...);

4. INVERTED INDEX 支撑场景语义检索

仿真场景库百万级规模下,通过全文倒排索引实现毫秒级语义检索,无需独立的搜索引擎(Elasticsearch 等):

CREATE INVERTED INDEX idx_scenario_desc ON TABLE sim_ods_scenarios (description, tags) PROPERTIES ('analyzer' = 'keyword'); -- 按语义检索场景 SELECT * FROM sim_ods_scenarios WHERE description LIKE '%雨天%' AND tags LIKE '%NEAR_MISS%';

5. 存算分离,按需扩缩

Lakehouse 存算分离架构,路测批量处理期间弹性扩展计算资源,空闲时自动缩减,相比固定集群方案节省 40%~70% 基础设施成本。

6. 统一治理,消除数据孤岛

ODS / DWD / DWS 三层全在一个 Lakehouse 实例中管理,标注数据、遥测数据、训练数据、评测数据共享统一的权限体系、数据血缘和 Time Travel(历史版本回溯)能力。


五、客户价值

效率提升

指标传统方案Lakehouse 方案提升
数据闭环周期2~4 周数天(批量)/ 分钟级(实时)缩短 80%+
标注人工工作量100% 人工AI 预标注 + 人工复核减少 60%+
长尾场景补充人工整理,周期长量产事件自动注入场景库自动化 100%
OTA 决策依据经验判断success_rate + divergence 双指标量化可追溯

成本降低

  • 基础设施:消除 Spark 集群 + Flink 集群 + 多套数据库的独立部署,降低 40%~60% 基础设施成本
  • 运维人力:SQL-first 无需 Spark/Flink 工程师,数据工程师直接维护
  • 模型迭代:数据闭环加速,每个模型迭代周期缩短,间接降低 GPU 训练成本

安全与合规

  • L1-L4 风险事件全链路记录,证据链完整,满足 GB/T 40429 等自动驾驶相关法规要求
  • OTA 灰度决策留有完整日志,可回溯每次升级的评测依据
  • 安全事件与仿真场景双向关联,可追溯"该场景来自哪次量产事件"

竞争力提升

数据闭环周期从数周缩短到数天,意味着同等时间内可完成更多次模型迭代,在 NOA 城区功能等快速演进的赛道中建立数据飞轮优势。


六、方案注意事项

6.1 Lakehouse 建表兼容性

以下问题在实测中已全部遇到并解决,部署前需严格遵守:

规则说明违反后果
ODS 层含 JSON 列的表不加分区、不加 PKJSON 列 + 分区 + PK 三者共存报错CZLH-67000
分区表 PRIMARY KEY 必须包含分区列
PRIMARY KEY(event_id, trigger_ts)
PRIMARY KEY(event_id, trigger_ts)
建表失败
INVERTED INDEX 每列独立创建不支持多列联合索引
(col1, col2)
(col1, col2)
语法错误
change_tracking
change_tracking
建表后单独 ALTER
不支持 CREATE TABLE 内联声明属性不生效
MERGE INTO 源侧必须先去重同批含重复 PK 写入报错CZLH-71001
嵌套分区函数需用冗余列替代
DAYS(TIMESTAMP_MILLIS(ts_ms))
DAYS(TIMESTAMP_MILLIS(ts_ms))
不支持
建表失败
JSON 字段访问用
col['key']
col['key']
冒号语法
col:key
col:key
不支持
CZLH-42000

6.2 Kafka PIPE 网络配置

最关键的前置条件

read_kafka
read_kafka
仅支持
SASL_PLAINTEXT
SASL_PLAINTEXT
,不支持 SASL_SSL。

部署前必须确认:

✅ Lakehouse VCluster 与 Kafka 在同一 VPC,或已打通 VPC 对等连接 ✅ Kafka 安全组已放通 Lakehouse 节点 IP 段 → TCP 9092 入站 ✅ Kafka 提供了 SASL_PLAINTEXT 接入点(阿里云 Serverless 需手动添加 VPC 接入点) ✅ 用 read_kafka SELECT 连通性验证 SQL 测试通过(10 秒内返回数据)

如网络暂时无法打通,可使用 OSS 中转方案:

Kafka → 阿里云函数计算 FC(同 VPC)→ OSS → Lakehouse COPY INTO

6.3 数据规模与性能规划

数据类型规模参考建议策略
路测标注 Parquet单车每小时数 GB,日增 TB 级COPY INTO 按天分区,MERGE INTO 用分区裁剪
量产车高频遥测~86,400 条/天/车,百万辆 QPS 峰值 100 万Kafka 侧先降采样(10s 聚合),再入湖
仿真场景库百万级规模INVERTED INDEX 支撑检索,定期清理低价值场景
Dynamic TableDWS 层最快 1min 刷新高频刷新表仅放聚合结果,避免全表扫描

6.4 AI_COMPLETE 成本控制

AI_COMPLETE
AI_COMPLETE
按 token 计费,高吞吐场景需注意:

  • 按需触发:只对
    WHERE ai_diagnosis IS NULL
    WHERE ai_diagnosis IS NULL
    的新记录调用,不重复推理
  • 分级调用:规则层(Window Function)先筛选异常,只对命中规则的记录触发 AI,如交易异常检测仅对 L2+ 风险调用
  • Prompt 简洁:封闭式输出(只返回标签名)比开放式生成 token 消耗少 5~10 倍
  • 批量处理:Dynamic Table 批次刷新天然实现批量推理,而非逐条实时调用

6.5 Studio Task 调度建议

  • M1 批量入湖建议设置
    AUTO_SUSPEND_IN_SECOND = 600
    AUTO_SUSPEND_IN_SECOND = 600
    ,避免 VCluster 频繁暂停导致 CZLH-60011
  • Studio Task 中固化 Session Flag(
    set cz.sql.cast.string.to.json.as.parse=true
    set cz.sql.cast.string.to.json.as.parse=true
    等),每次连接都需重新设置
  • PIPE 的
    RESET_KAFKA_GROUP_OFFSETS
    RESET_KAFKA_GROUP_OFFSETS
    仅在首次创建时生效,重建 PIPE 前记录当前 offset

七、验证状态(2026-06-05 实测)

验证项状态说明
建表 27 张(M1-M8,ODS+DWD+DWS)✅ 通过
00_create_schema.sql
00_create_schema.sql
执行成功,兼容性问题全部修复
M10 实时链路建表(9 张)✅ 通过
10-vehicle-kafka-ingest/
10-vehicle-kafka-ingest/
下三个文件执行成功,共 36 张
Data Flywheel 闭环✅ 通过fleet 事件自动注入 sim_ods_scenarios(FLEET_EVENT×4)
OTA 灰度决策逻辑✅ 通过success_rate=66.7% → RECOMMEND_PAUSE
影子模式评测✅ 通过avg_divergence=0.267 < 0.3 → PASS
标注质量看板✅ 通过avg_confidence=0.855,ai_prelabel_rate=100%
Kafka 生产者(Python)✅ 通过54 条消息成功发送到阿里云 Kafka(SASL_SSL/PLAIN)
Kafka PIPE 入湖✅ 通过Kafka PIPE 数据链路验证通过
离线 Pipeline M2→M6→Flywheel✅ 通过端到端全链路验证,10 张表行数符合预期
Studio Task 部署✅ 通过8 个 Task 已部署到
ads_full_loop
ads_full_loop
文件夹,含 DDL + 实时链路 + 离线 Pipeline

八、快速部署步骤

前置条件

  • ClickZetta Lakehouse workspace 已就绪
  • 如使用 AI_COMPLETE,需预先
    CREATE CONNECTION
    CREATE CONNECTION
    (conn_dashscope 等)
  • 如使用 Kafka PIPE,需确保 Lakehouse VCluster 与 Kafka 在同一 VPC(详见 §6.2)

Step 1:建表(选 A 或 B)

方式 A — 直接执行 SQL

# M1-M8 全部 27 张表(含 8 张 DWS Dynamic Table) run 00-setup/00_create_schema.sql # M10 实时链路(9 张表 + 4 个 Kafka PIPE 定义) run 10-vehicle-kafka-ingest/01_ods_tables.sql run 10-vehicle-kafka-ingest/02_dwd_tables.sql run 10-vehicle-kafka-ingest/03_dws_tables.sql

方式 B — 通过 Studio Task 执行

在 Lakehouse Studio 的

ads_full_loop
ads_full_loop
文件夹下,按顺序执行:

  1. ads_01_ddl_ods
    ads_01_ddl_ods
    — ODS 层 10 张表
  2. ads_02_ddl_dwd
    ads_02_ddl_dwd
    — DWD 层 15 张表
  3. ads_03_ddl_dws
    ads_03_ddl_dws
    — DWS 层 8 张 Dynamic Table
  4. ads_04_rt_ods_pipe
    ads_04_rt_ods_pipe
    — M10 实时链路 ODS + PIPE(需 Kafka VPC 连通)
  5. ads_05_rt_dwd
    ads_05_rt_dwd
    — M10 DWD Dynamic Table
  6. ads_06_rt_dws
    ads_06_rt_dws
    — M10 DWS Dynamic Table

Step 2:写入演示数据

run 09-full-loop-demo/01_sample_data_gen.sql

Step 3:运行全链路 Pipeline

run 09-full-loop-demo/02_run_full_loop.sql

Step 4:验证 Data Flywheel

run 09-full-loop-demo/03_validate_flywheel.sql

Step 5:部署定时调度

在 Studio 中执行

ads_offline_pipeline
ads_offline_pipeline
任务(已配置 cron
0 2 * * *
0 2 * * *
),或手动触发一次测试运行。


九、Studio Task 清单

Task 名称文件夹内容调度
ads_01_ddl_ods
ads_01_ddl_ods
ads_full_loopODS 层建表(10张)手动
ads_02_ddl_dwd
ads_02_ddl_dwd
ads_full_loopDWD 层建表(15张)手动
ads_03_ddl_dws
ads_03_ddl_dws
ads_full_loopDWS Dynamic Table(8张)手动
ads_04_rt_ods_pipe
ads_04_rt_ods_pipe
ads_full_loopM10 ODS + Kafka PIPE手动
ads_05_rt_dwd
ads_05_rt_dwd
ads_full_loopM10 DWD Dynamic Table手动
ads_06_rt_dws
ads_06_rt_dws
ads_full_loopM10 DWS Dynamic Table手动
ads_07_rt_flywheel
ads_07_rt_flywheel
ads_full_loopFlywheel 桥接 SQL手动
ads_offline_pipeline
ads_offline_pipeline
ads_full_loop离线 Pipeline(M2→M6→DWS)每天 02:00

相关文档

核心数据处理

技术点文档
批量数据入湖Lakehouse 文件批量导入导出指南(COPY INTO)
Upsert / 增量写入Lakehouse Upsert 操作指南(MERGE INTO)
增量自动刷新Lakehouse 动态表开发入门指南
Kafka 持续消费Lakehouse 持续数据导入指南(Pipe)
变更数据捕获Lakehouse CDC 变更数据捕获指南(Table Stream)
历史版本回溯Lakehouse 历史数据回溯指南(Time Travel)
窗口函数 / 特征工程窗口函数(Window Function)

查询加速与检索

技术点文档
倒排索引 / 场景检索Lakehouse 查询加速索引指南
全文搜索全文搜索与文本分析实战指南

AI 与 Python

技术点文档
SQL 内嵌 AI 推理AI_COMPLETE 函数参考
AI Functions 概述Lakehouse AI Functions 概述
Python DataFrame APIZettaPark Python SDK

安全与治理

技术点文档
行级权限控制行级安全(Row Filter)
列级动态脱敏Lakehouse 列级安全(动态脱敏)使用文档
数据质量检查数据质量检查(DQC):SQL 驱动的自动化验证

任务调度与运维

技术点文档
Studio 任务调度Studio 任务开发与运维
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询