商品评论情感分析


方案背景

电商平台每天产生海量用户评论。用户提交的星级评分和文字评论,是商品运营、质量管理和客服团队最直接的用户反馈来源。随着平台 SKU 数量和评论量的持续增长,人工逐条阅读已不可行。

核心诉求是将非结构化的评论文本自动转化为可查询、可告警、可聚合的结构化数据,并与现有的 Kafka 消息队列架构无缝对接。

典型业务数据链路:

用户提交评论 → 业务系统写入 → Kafka 推送事件 → 分析平台处理 → 运营/客服消费结果


传统方案痛点

技术栈复杂,维护成本高

典型实现需要多系统协作:Kafka 消费用 Flink/Spark Streaming,情感分析用独立 Python 服务(HuggingFace/OpenAI),结果写回用 Delta Lake,BI 工具对外展示。每个环节都有独立运维负担,故障排查链路长、耗时。

AI 推理与数据处理割裂

情感分析模型运行在独立 Python 微服务中,通过 HTTP API 调用。数据在数仓和 AI 服务之间来回传输,移动成本高、延迟难压缩,同时需要专门的 MLOps 能力维护模型服务的上线、扩缩容和版本管理。

增量处理难,重复计算浪费

Spark 批处理通常按时间窗口全量重跑,大量已处理评论被重复推理,既浪费算力也浪费 LLM token。实现真正的增量处理需要额外的状态管理逻辑,开发成本高。

数据工程与 AI 工程双栈割裂

数据工程师写 SQL,AI 工程师写 Python,两个团队之间协作摩擦大,需求迭代周期长,难以快速响应业务变化。


ClickZetta Lakehouse 方案优势

本方案将整个链路收敛到 Lakehouse 一个平台内,用纯 SQL 完成从 Kafka 消费到 AI 推理再到结果聚合的全流程。

能力传统方案Lakehouse 方案
Kafka 消费Flink / Spark Streaming 集群
CREATE PIPE
CREATE PIPE
原生对接,无需外部集群
增量处理手动管理 checkpoint / 状态
CREATE DYNAMIC TABLE
CREATE DYNAMIC TABLE
自动增量刷新
AI 推理独立 Python 微服务 + HTTP 调用
AI_SENTIMENT
AI_SENTIMENT
/
AI_COMPLETE
AI_COMPLETE
内嵌 SQL
数据移动数仓 ↔ AI 服务来回传输数据不离开 Lakehouse
运维监控多系统独立监控PIPE lag / Dynamic Table refresh 一体化
开发语言Python + SQL 双栈纯 SQL
上线周期周级(多系统联调)天级(单一平台,SQL 即部署)

客户价值

商品运营团队

通过

product_review_summary
product_review_summary
视图实时掌握各 SKU 的好评率、均分和情感分布,新品上架后 10 分钟内即可看到用户口碑走势。
key_aspects
key_aspects
字段的枚举约束(价格/质量/物流/客服/包装)让聚合有意义,可直接回答"差评主要集中在哪个维度",支撑商品详情页优化和定价决策。

质量/售后团队

cons
cons
字段高频聚类可快速识别批次性质量问题。当多条评论的 cons 集中在同一描述(如"断连""键帽破损")时,可触发供应商反馈或主动换货预案,将被动投诉转为主动管控,降低退货率和负面口碑扩散。

客服团队

sentiment IN ('negative', 'mixed')
sentiment IN ('negative', 'mixed')
的评论可作为主动回访优先级队列。与邮件客服自动分流方案(03-email-customer-support)组合后,负评评论可自动生成客服工单,显著缩短响应时间。

IT/数据团队

整个方案只需 SQL,不依赖 Python 环境或外部 AI 服务,数据团队可独立维护、快速迭代。Kafka PIPE 的 lag 监控内置在

DESC PIPE EXTENDED
DESC PIPE EXTENDED
中,端到端延迟通过
__kafka_timestamp__
__kafka_timestamp__
字段直接查询,无需额外搭建监控体系。


方案架构

架构图

[业务系统] │ JSON 消息推送 ▼ Kafka topic: product_reviews │ CREATE PIPE(60s 一批,consumer group 持久化 offset) ▼ ods_review_events (ODS 落地表,含 Kafka 元数据列) │ VIEW product_reviews (屏蔽 Kafka 元数据,接口解耦) │ change_tracking = true ▼ review_staging (Dynamic Table,REFRESH 10 MIN,清洗过滤) ▼ review_sentiment (Dynamic Table,REFRESH 10 MIN,AI_SENTIMENT 情感分类) ▼ review_detail_raw (Dynamic Table,REFRESH 10 MIN,AI_COMPLETE 摘要提取,仅非 neutral) ▼ product_review_analysis_dt (Dynamic Table,REFRESH 10 MIN,LEFT JOIN 合并结果) ▼ product_review_summary (视图,产品维度聚合输出)


技术亮点

1. 双 AI 函数分工,成本与灵活性兼顾

函数职责设计原因
AI_SENTIMENT('conn:model', text)
AI_SENTIMENT('conn:model', text)
情感分类 → positive / negative / neutral / mixed专用函数,内置 prompt,输出枚举固定,不需要解析 JSON,token 消耗低
AI_COMPLETE('conn:model', prompt)
AI_COMPLETE('conn:model', prompt)
pros / cons / key_aspects 结构化提取灵活提取,prompt 约束 key_aspects 枚举值保证下游可聚合

neutral 评论跳过

AI_COMPLETE
AI_COMPLETE
,实测减少约 35% 的 token 消耗:

CASE WHEN sentiment = 'neutral' THEN NULL ELSE AI_COMPLETE('cz_bailian:qwen3.5-plus', prompt || review_text) END AS ai_detail_raw

2. ODS 层保留 Kafka 元数据,支持端到端监控

__kafka_partition__
__kafka_partition__
__kafka_offset__
__kafka_offset__
__kafka_timestamp__
__kafka_timestamp__
写入 ODS,无需外部工具即可监控消费延迟:

SELECT MAX(DATEDIFF('second', __kafka_timestamp__, CURRENT_TIMESTAMP())) AS max_delay_s FROM ods_review_events WHERE __kafka_timestamp__ >= CURRENT_TIMESTAMP() - INTERVAL 1 HOUR;

3. VIEW 屏蔽层解耦 ODS 与业务逻辑

product_reviews
product_reviews
视图屏蔽 Kafka 元数据列,下游 Dynamic Table 面向视图开发,ODS 物理结构变更不影响业务逻辑。

4. mixed 情感的正确统计

positive_rate
positive_rate
分子使用
sentiment IN ('positive', 'mixed')
sentiment IN ('positive', 'mixed')
。"音质好但续航差"类 mixed 评论也计入正向参考,比只统计 positive 更贴近真实用户满意度。

5. 四层 Dynamic Table 链路,AI 阶段独立可 debug

review_staging(清洗) → review_sentiment(AI_SENTIMENT,全量) → review_detail_raw(AI_COMPLETE,conditional) → product_review_analysis_dt(LEFT JOIN 合并)

两个 AI 层独立成表,互不干扰,可以单独 REFRESH 验证中间结果,故障定位精准。


Kafka 消息格式

PIPE 期望 topic 中每条消息的 value 为 JSON 字符串:

{ "review_id": "r001", "product_id": "p001", "product_name": "无线蓝牙耳机 Pro", "rating": 5, "review_text": "音质非常好,降噪效果一流", "reviewer": "用户A", "review_date": "2026-05-01T10:00:00" }

AI 输出示例

AI_SENTIMENT
AI_SENTIMENT
直接返回枚举标签:

positive

AI_COMPLETE
AI_COMPLETE
返回 JSON(neutral 评论返回 NULL):

{ "pros": "音质好,降噪一流", "cons": "", "key_aspects": "质量,物流" }


运行步骤

# 1. 初始化(建表、建视图) run setup.sql # 2a. 生产环境:修改 pipeline.sql 中的 broker/topic 后执行,PIPE 自动启动 run pipeline.sql # 2b. 本地测试:跳过 PIPE,直接写测试数据 run test_data.sql # 手动触发刷新: # REFRESH DYNAMIC TABLE review_staging; # REFRESH DYNAMIC TABLE review_sentiment; # REFRESH DYNAMIC TABLE review_detail_raw; # REFRESH DYNAMIC TABLE product_review_analysis_dt; # 3. 查看结果 SELECT * FROM product_review_summary ORDER BY review_count DESC; # 4. 清理(可选) run teardown.sql


核心查询

-- 各产品好评率排名(positive + mixed 均计入) SELECT product_name, avg_rating, positive_rate, review_count FROM product_review_summary ORDER BY positive_rate DESC; -- 情感分布 SELECT sentiment, COUNT(*) AS cnt, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 1) AS pct FROM product_review_analysis_dt GROUP BY sentiment ORDER BY cnt DESC; -- 高频负评关键词 SELECT product_name, cons, COUNT(*) AS cnt FROM product_review_analysis_dt WHERE sentiment IN ('negative', 'mixed') AND cons IS NOT NULL AND cons != '' GROUP BY product_name, cons ORDER BY cnt DESC;


Pipe 运维

-- 查看消费状态和 lag DESC PIPE EXTENDED pipe_product_reviews; -- 暂停 / 恢复 ALTER PIPE pipe_product_reviews SET PIPE_EXECUTION_PAUSED = true; ALTER PIPE pipe_product_reviews SET PIPE_EXECUTION_PAUSED = false;


注意事项

Kafka PIPE — 网络连通性(重要)

Lakehouse PIPE 的

read_kafka
read_kafka
仅支持
PLAINTEXT
PLAINTEXT
SASL_PLAINTEXT
SASL_PLAINTEXT
,不支持
SASL_SSL
SASL_SSL

阿里云 MSK Serverless 默认仅开放 9093 端口(SASL_SSL),生产部署前必须先解决网络连通性问题:

部署场景解决方案
Lakehouse 与 MSK 在同一 VPC直接使用 MSK VPC 内网地址(9092,SASL_PLAINTEXT)
Lakehouse 与 MSK 跨 VPC通过 VPC PeeringPrivateLink 打通,再使用内网 9092 端口
MSK 仅有公网地址联系阿里云开启公网 SASL_PLAINTEXT 端口,并配置 Lakehouse vcluster 出口白名单

Kafka PIPE — 使用规范

  • CREATE PIPE
    CREATE PIPE
    不支持
    CREATE OR REPLACE
    CREATE OR REPLACE
    ,修改消费逻辑需
    DROP PIPE
    DROP PIPE
    后重建;重建时不设置
    RESET_KAFKA_GROUP_OFFSETS
    RESET_KAFKA_GROUP_OFFSETS
    即可从上次位点续传,避免重复消费
  • RESET_KAFKA_GROUP_OFFSETS = 'earliest'
    RESET_KAFKA_GROUP_OFFSETS = 'earliest'
    仅用于初始化或数据修复,生产环境建议用
    'latest'
    'latest'
  • SASL 密码以明文写在 SQL 中,建议通过
    CREATE CONNECTION
    CREATE CONNECTION
    集中管理凭据,不要硬编码在 pipeline.sql 里
  • PIPE 创建后自动启动,删除前建议先
    PAUSE
    PAUSE
    ,确认无排队 job 后再
    DROP
    DROP

Dynamic Table

  • 不支持 DML(INSERT/UPDATE/DELETE),数据修正只能在上游源表操作
  • change_tracking = true
    change_tracking = true
    必须在
    ods_review_events
    ods_review_events
    上开启(
    ALTER TABLE SET TBLPROPERTIES
    ALTER TABLE SET TBLPROPERTIES
    ),Dynamic Table 增量感知依赖此属性,建表后不能遗漏
  • 分区表的 PRIMARY KEY 必须包含分区键(本方案为
    review_date
    review_date
    ),否则建表报错

AI 函数

  • AI_SENTIMENT
    AI_SENTIMENT
    AI_COMPLETE
    AI_COMPLETE
    第一个参数格式为
    'connection_name:model_name'
    'connection_name:model_name'
    ,只传 connection 名会报错(UAT 验证确认,正确格式:
    'cz_bailian:qwen3.5-plus'
    'cz_bailian:qwen3.5-plus'
  • AI_COMPLETE
    AI_COMPLETE
    的 prompt 必须明确约束"返回 JSON 不要多余文字",否则 LLM 可能返回 markdown 代码块包裹的 JSON,导致
    GET_JSON_OBJECT
    GET_JSON_OBJECT
    解析失败
  • Dynamic Table REFRESH 是异步操作,手动
    REFRESH DYNAMIC TABLE
    REFRESH DYNAMIC TABLE
    后需等 job 完成才能查到最新数据

Kafka 消息格式

  • review_date
    review_date
    字段建议统一使用 ISO 8601 格式(
    2026-05-01T10:00:00
    2026-05-01T10:00:00
    ),确保与
    TIMESTAMP_NTZ
    TIMESTAMP_NTZ
    解析兼容
  • PIPE 只支持 flat JSON(顶层 key-value),嵌套 JSON 需额外的
    parse_json
    parse_json
    嵌套调用处理

扩展方向

  • 告警集成
    sentiment = 'negative' AND key_aspects LIKE '%质量%'
    sentiment = 'negative' AND key_aspects LIKE '%质量%'
    触发钉钉/企微通知,实现批次质量问题实时预警
  • 客服联动:与 邮件客服自动分流 组合,负评评论自动生成客服工单
  • 商品下钻:加入
    product_id
    product_id
    维度细化分析,识别同一 SKU 的批次性质量问题
  • 实时告警:缩短 Dynamic Table REFRESH INTERVAL(如 1 MIN),配合 Studio Task 定时推送异常报表
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询