Storage Connection + API Connection + External Function 组合实战
External Function 看起来很复杂——又要配 OSS Bucket、又要配函数计算、RAM 角色、还要写 Python、打包 zip、写 DDL……三个对象凑在一起,概念多、步骤多,容易劝退。
实际跑通之后你会发现,复杂度主要集中在一次性环境准备 上。配好之后,后续加新函数的成本极低:写 Python 逻辑 → 打包 → 一条 DDL,三分钟搞定。本文用四个渐进式场景带你跑通全流程,覆盖阿里云、腾讯云、AWS 三朵云。
读完你会意识到 :External Function 不复杂,它只是一次性配好"存代码的地方"和"跑代码的地方",之后每次加新函数就是写代码 + 一条 SQL。
完整代码见 GitHub:clickzetta_external_function
铁三角:三个对象如何配合
External Function 不只是一个对象,而是三个对象配合的结果。理解它们的分工,整个机制就清楚了:
对象 比喻 实际做的事 配几次 Storage Connection 停车场 认证对象存储(OSS/COS/S3),让 Lakehouse 能读写代码包 一个 Schema 配一次 API Connection 车间 认证云函数计算(FC/SCF/Lambda),定义代码在哪跑 一个 Region 配一次 External Volume 货架 把对象存储的 Bucket 挂载到 Schema,让 PUT 命令能上传 一个 Bucket 配一次 CREATE EXTERNAL FUNCTION 注册表 注册函数名 → 入口类 → zip 包的映射 每个函数配一次
前三者是一次性的——配好 Storage Connection + API Connection + External Volume,所有函数共享。后续加新函数只需一条
CREATE EXTERNAL FUNCTIONCREATE EXTERNAL FUNCTION
。
前置准备(一次性,四个场景共享)
四个场景共享同一套云环境配置。这一步只做一次 ,后面四个场景都能直接用。
第一步:选云,配 config.json
确认你的 Lakehouse 在哪朵云,外部函数的运行时(FC/SCF/Lambda)必须在同一朵云、同一地域:
cz-cli profile list
# service 列:
# alicloud.api.clickzetta.com → 阿里云
# tencentcloud.api.clickzetta.com → 腾讯云
# aws.api.clickzetta.com → AWS
git clone https://github.com/clickzetta/clickzetta_external_function.git
cd clickzetta_external_function
cp config.example.json config.json
打开
config.jsonconfig.json
,只改
platformplatform
字段:
"platform": "aliyun" // 或 "tencent" 或 "aws"
然后按 SETUP.md 完成对应云的环境准备(OSS/COS/S3 Bucket、FC/SCF/Lambda、RAM/CAM/IAM 角色、百炼 API Key),把信息填入 config.json。
第二步:安装 cz-cli 并验证
cz-cli profile use <your-profile>
cz-cli sql "SELECT current_schema()"
# 将输出的 schema 名称填入 config.json → schema 字段
第三步:四项目通用步骤
所有四个场景的执行流程完全一致:
填 config.json → check (校验配置) → package (打包代码) → render (生成 SQL) → deploy (执行部署) → verify (调用验证)
对应命令:
python ../1-check-config.py # ① 检查配置
python 2-package.py # ② 打包代码(AI 函数加 --deps)
python ../3-render-sql.py # ③ 占位符替换,生成 SQL
cz-cli sql -f dist/4-deploy_generated.sql --write # ④ 部署
场景一:Python External Function 快速入门
一个函数,零依赖,5 分钟跑通。看懂 Storage Connection + API Connection + External Function 三者如何配合。
部署
cd python_quickstart
python ../1-check-config.py
python 2-package.py
python ../3-render-sql.py
cz-cli sql -f dist/4-deploy_generated.sql --write
4-deploy_generated.sql4-deploy_generated.sql
做了什么:
-- 1. Storage Connection(OSS 认证)
CREATE STORAGE CONNECTION IF NOT EXISTS oss_sh_conn
TYPE OSS
access_id = '<your-id>' access_key = '<your-key>'
ENDPOINT = 'oss-cn-shanghai.aliyuncs.com';
-- 2. API Connection(函数计算 FC 认证)
CREATE API CONNECTION IF NOT EXISTS shanghai_func_conn
TYPE CLOUD_FUNCTION PROVIDER = 'aliyun'
REGION = 'cn-shanghai' ROLE_ARN = '<your-role-arn>'
CODE_BUCKET = '<your-bucket>';
-- 3. External Volume(挂载 Bucket)
CREATE EXTERNAL VOLUME IF NOT EXISTS external_functions_prod
LOCATION 'oss://<bucket>/' USING CONNECTION oss_sh_conn;
-- 4. 上传 zip
PUT '<project>/dist/my_upper.zip' TO VOLUME external_functions_prod FILE 'my_upper.zip';
-- 5. 注册函数
CREATE EXTERNAL FUNCTION IF NOT EXISTS <schema>.my_upper
AS 'my_upper.my_upper'
USING ARCHIVE 'volume://external_functions_prod/my_upper.zip'
CONNECTION shanghai_func_conn
WITH PROPERTIES ('remote.udf.api'='python3.mc.v0','remote.udf.protocol'='http.arrow.v0');
-- 6. 验证
SELECT <schema>.my_upper('hello'); -- 返回 HELLO
函数源码
src/my_upper.pysrc/my_upper.py
,一个类、一个
evaluateevaluate
方法:
@annotate("*->string")
class my_upper(object):
def evaluate(self, s):
return s.upper() if s else s
本地测试
FC 环境没有 stdout,也没有堆栈。每次部署前先在本地测通 :
python3 -c "
import sys; sys.path.insert(0, 'src')
from my_upper import my_upper
print(my_upper().evaluate('hello')) # HELLO
"
关键收获
remote.udf.api='python3.mc.v0'remote.udf.api='python3.mc.v0'
指定 FC 走 Python 3.10 运行时
调用函数必须加 schema 前缀 :SELECT <schema>.my_upper('hello')SELECT <schema>.my_upper('hello')
,不加报 function not foundfunction not found
第一次调用可能等 5-10 秒(FC 冷启动),后续调用正常
场景二:Python ML 函数 + 第三方依赖打包
5 个 ML/PII 函数,基于 scikit-learn + jieba。演示含 C 扩展的第三方依赖如何正确打包。
函数列表
函数 用到的库 用途 pii_maskpii_mask
re 手机号/邮箱/身份证脱敏 feature_normalizefeature_normalize
numpy + sklearn 数值列归一化(minmax/zscore) anomaly_detectanomaly_detect
numpy + sklearn 孤立森林异常检测 sentiment_scoresentiment_score
jieba 中文情感评分(0-1) tfidf_keywordstfidf_keywords
sklearn TF-IDF 关键词提取
核心问题:FC 是 Linux,macOS 打出来的包跑不了
FC 运行时是 Linux x86_64 + Python 3.10 。macOS 上
pip install scikit-learnpip install scikit-learn
拿到的
.dylib.dylib
在 FC 上加载不了。
解决:分两个 requirements 文件,两种安装方式。
文件 放什么 安装方式 requirements.txtrequirements.txt
含 C 扩展的包(scikit-learn, numpy) pip install --platform manylinux2014_x86_64 --only-binary :all:pip install --platform manylinux2014_x86_64 --only-binary :all:
requirements_pure.txtrequirements_pure.txt
纯 Python 包(jieba) 普通 pip installpip install
为什么不能放在一起? 把 jieba(纯 Python)放进
requirements.txtrequirements.txt
加
--only-binary :all:--only-binary :all:
,pip 会报
No matching distribution foundNo matching distribution found
——纯 Python 包没有 binary wheel。
部署
cd python_advanced
python 2-package.py # 双模式打包(~100 MB)
python ../1-check-config.py
python ../3-render-sql.py
cz-cli sql -f dist/4-deploy_generated.sql --write
本地测试
pip install -r requirements.txt -r requirements_pure.txt
python3 -c "
import sys; sys.path.insert(0, 'src')
from ml_toolkit import pii_mask, feature_normalize, sentiment_score
print(pii_mask().evaluate('我的手机13812345678'))
print(feature_normalize().evaluate('[1,2,3,4,5]', 'minmax'))
print(sentiment_score().evaluate('产品质量非常好'))
"
在 SQL 中使用
SELECT <schema>.pii_mask('手机13812345678,邮箱alice@example.com');
SELECT <schema>.feature_normalize('[10,20,30,40,50]', 'minmax');
SELECT <schema>.anomaly_detect('[1,2,3,4,100]');
SELECT <schema>.sentiment_score('产品质量非常好,物流很快,非常满意!');
SELECT <schema>.tfidf_keywords('["AI和机器学习是未来方向","深度学习在图像识别取得突破"]', 3);
关键收获
C 扩展包需要 Linux binary wheel ,macOS 的 .dylib.dylib
不能在 FC 上跑
纯 Python 包要拆出来 ,不能和 binary 包混在同一个 requirements 里
zip 大小决定冷启动时间 :scikit-learn + numpy 约 100MB,首次调用需等 5-10 秒
场景三:30 个 AI SQL 函数——一次打包,到处调用
30 个 AI 函数共享一个 zip。在 SQL 里直接做摘要、翻译、情感分析、OCR、向量相似度搜索。
设计要点
一个 zip,30 个函数 :30 个函数共享同一个
clickzetta_ai_functions_full.zipclickzetta_ai_functions_full.zip
,DDL 只有
AS 'ai_functions_complete.ai_xxx'AS 'ai_functions_complete.ai_xxx'
的类名不同。如果每个函数分别打包上传,就是 30 个 zip,管理复杂度会爆炸。
API Key 作为 SQL 参数 :百炼 API Key 不写死在代码里,也不打进 zip,而是作为函数参数传入:
SELECT <schema>.ai_text_summarize('人工智能正在改变世界。', '<your-api-key>');
如果 API Key 写死在 zip 里,zip 泄露 = Key 泄露。传参方式让调用方自己管理密钥。
函数分类
类别 数量 典型函数 文本处理 8 摘要、翻译、情感分析、实体提取、关键词、分类、清洗、标签 向量处理 5 向量化、相似度、聚类准备、相似搜索、文档搜索 多模态 8 图片描述、OCR、图片分析、图片向量化、图片相似度、视频摘要、图表分析、文档解析 业务场景 9 客户意图、销售评分、评论分析、风险检测、合同提取、简历解析、客户分群、产品描述、行业分类
部署
cd python_ai_function
pip install -r requirements.txt # 仅本地测试用
python 2-package.py --deps # 打包代码 + dashscope Linux 依赖
python 1-check-config.py # 独立校验(config 结构不同)
python 3-render-sql.py
cz-cli sql -f dist/4-deploy_generated.sql --write
本地测试
FC 没有日志也没有 stdout。先本地测通再部署,省很多时间:
python3 -c "
import sys; sys.path.insert(0, 'src')
from ai_functions_complete import ai_text_summarize, ai_text_translate
print(ai_text_summarize().evaluate('你好世界', '<your-api-key>'))
print(ai_text_translate().evaluate('Hello', '中文', '<your-api-key>'))
"
在 SQL 中使用
-- 摘要
SELECT <schema>.ai_text_summarize('人工智能正在改变世界……', '<key>');
-- 翻译
SELECT <schema>.ai_text_translate('Hello, how are you?', '中文', '<key>');
-- 情感分析
SELECT <schema>.ai_text_sentiment_analyze('产品质量非常好!', '<key>');
-- 向量化 + 相似度搜索
SELECT <schema>.ai_semantic_similarity('苹果好吃', '苹果是健康水果', '<key>');
-- 图像描述
SELECT <schema>.ai_image_describe('<image-url>', '<key>');
-- 合同提取
SELECT <schema>.ai_contract_extract('<合同文本>', '<key>');
返回 JSON,用
JSON_EXTRACTJSON_EXTRACT
取值:
SELECT JSON_EXTRACT(
<schema>.ai_text_summarize('人工智能正在改变世界……', '<key>'),
'$.summary'
);
关键收获
30 个函数共享一个 zip ,新增函数 = 加 20 行 prompt + 一条 DDL
API Key 不写死在代码里 ,作为 SQL 参数传入,zip 泄露不影响安全
config.json 双重身份 :构建时嵌入 zip(运行时读模型配置),渲染时替换 SQL 占位符
场景四:Java UDF/UDAF/UDTF
Java 外部函数支持三种类型:UDF(一行入一行出)、UDAF(多行入一行出)、UDTF(一行入多行出)。
三种类型速览
类型 基类 输入 → 输出 DDL 特殊属性 示例函数 UDF GenericUDFGenericUDF
1 行 → 1 行 — pii_maskpii_mask
PII 脱敏UDAF GenericUDAFResolver2GenericUDAFResolver2
多行 → 1 行 AGGREGATORAGGREGATOR
agg_statsagg_stats
SUM/AVG/MIN/MAX/COUNTUDTF GenericUDTFGenericUDTF
1 行 → N 行 TABLE_VALUEDTABLE_VALUED
log_explodelog_explode
日志拆行
和 Python External Function 的区别
Python Java 运行时 Python 3.10 Java 8 (不支持 Java 9+)DDL 属性 python3.mc.v0python3.mc.v0
java8.hive2.v0java8.hive2.v0
函数类型 UDF UDF / UDAF / UDTF 打包 zip Maven jar-with-dependenciesjar-with-dependencies
→ zip 依赖 pip --platform manylinux--platform manylinux
Maven scope=providedscope=provided
(Hive 运行时自带)
部署
cd java_udf
python 2-package.py # Maven 编译 + zip 打包
python ../1-check-config.py
python ../3-render-sql.py
cz-cli sql -f dist/4-deploy_generated.sql --write
UDF 示例
SELECT <schema>.pii_mask('我的手机13812345678,邮箱alice@example.com');
UDAF 示例
INSERT INTO <schema>.java_udf_test_scores VALUES (3.5), (4.2), (2.8), (5.0), (3.9);
SELECT <schema>.agg_stats(val) FROM <schema>.java_udf_test_scores;
-- → [sum, avg, min, max, count]
UDTF 示例
UDTF 必须用
LATERALLATERAL
语法,不能用
SELECT func(x)SELECT func(x)
:
SELECT t.ts, t.event
FROM (
SELECT '[2025-01-15 10:30:00] 用户登录
[2025-01-15 10:35:00] 查询订单' AS log
) s, LATERAL <schema>.log_explode(s.log) t;
-- → 每行日志展开为一行
关键收获
Java 版本必须是 8 ,FC 只有 Java 8 运行时
Hive 依赖 scope=provided :FC 自带 hive-exec.jarhive-exec.jar
,不打包进 zip 避免冲突
UDAF DDL 必须加 AGGREGATORAGGREGATOR
,UDTF 必须加 TABLE_VALUEDTABLE_VALUED
,漏了创建成功但调用报错
UDTF 必须用 LATERALLATERAL
语法,不能直接 SELECT func(x)SELECT func(x)
函数速查表
Python(quickstart + advanced + AI function)
函数 用途 来源 my_uppermy_upper
字符串转大写 quickstart pii_maskpii_mask
手机/邮箱/身份证脱敏 advanced feature_normalizefeature_normalize
数值列归一化 advanced anomaly_detectanomaly_detect
孤立森林异常检测 advanced sentiment_scoresentiment_score
中文情感评分 advanced tfidf_keywordstfidf_keywords
TF-IDF 关键词提取 advanced ai_text_summarizeai_text_summarize
文本摘要 AI function ai_text_translateai_text_translate
文本翻译 AI function ai_text_sentiment_analyzeai_text_sentiment_analyze
情感分析 AI function ai_text_extract_entitiesai_text_extract_entities
实体提取 AI function ai_text_extract_keywordsai_text_extract_keywords
关键词提取 AI function ai_text_classifyai_text_classify
文本分类 AI function ai_text_clean_normalizeai_text_clean_normalize
文本清洗 AI function ai_auto_tag_generateai_auto_tag_generate
自动标签 AI function ai_text_to_embeddingai_text_to_embedding
文本向量化 AI function ai_semantic_similarityai_semantic_similarity
语义相似度 AI function ai_text_clustering_prepareai_text_clustering_prepare
聚类准备 AI function ai_find_similar_textai_find_similar_text
相似文本搜索 AI function ai_document_searchai_document_search
文档搜索 AI function ai_image_describeai_image_describe
图片描述 AI function ai_image_ocrai_image_ocr
图片 OCR AI function ai_image_analyzeai_image_analyze
图片分析 AI function ai_image_to_embeddingai_image_to_embedding
图片向量化 AI function ai_image_similarityai_image_similarity
图片相似度 AI function ai_video_summarizeai_video_summarize
视频摘要 AI function ai_chart_analyzeai_chart_analyze
图表分析 AI function ai_document_parseai_document_parse
文档解析 AI function ai_customer_intent_analyzeai_customer_intent_analyze
客户意图分析 AI function ai_sales_lead_scoreai_sales_lead_score
销售评分 AI function ai_review_analyzeai_review_analyze
评论分析 AI function ai_risk_text_detectai_risk_text_detect
风险文本检测 AI function ai_contract_extractai_contract_extract
合同提取 AI function ai_resume_parseai_resume_parse
简历解析 AI function ai_customer_segmentai_customer_segment
客户分群 AI function ai_product_description_generateai_product_description_generate
产品描述生成 AI function ai_industry_classificationai_industry_classification
行业分类 AI function
Java
函数 类型 用途 pii_maskpii_mask
UDF PII 脱敏 agg_statsagg_stats
UDAF SUM/AVG/MIN/MAX/COUNT log_explodelog_explode
UDTF 日志拆行
故障排查
错误 原因 解决 function not foundfunction not found
没加 schema 前缀 调用时加 <schema>.<schema>.
前缀 HTTP_GENERAL_ERROR(640)HTTP_GENERAL_ERROR(640)
RAM 信任策略未配 / Bucket 跨地域 检查 RAM 角色信任策略(含 13843226919042831384322691904283
);确认 Bucket 和 FC 同地域 AccessDeniedAccessDenied
RAM 角色缺 OSS 权限 添加 AliyunOSSFullAccessAliyunOSSFullAccess
或自定义 OSS 策略 ImportError: No module named 'sklearn'ImportError: No module named 'sklearn'
zip 里没打包依赖 重跑 python 2-package.pypython 2-package.py
(advanced)/ python 2-package.py --depspython 2-package.py --deps
(AI function) OSError: cannot open shared object fileOSError: cannot open shared object file
打包了 macOS 的 .dylib.dylib
确认 --platform manylinux2014_x86_64--platform manylinux2014_x86_64
ClassNotFoundExceptionClassNotFoundException
Java 类名或包名写错 检查 ASAS
路径与 jar 内实际类名一致 UDAF 创建成功但调用报错 DDL 忘了加 AGGREGATORAGGREGATOR
检查 WITH PROPERTIES ('remote.udf.category'='AGGREGATOR')WITH PROPERTIES ('remote.udf.category'='AGGREGATOR')
UDTF 创建成功但 not a table functionnot a table function
DDL 忘了加 TABLE_VALUEDTABLE_VALUED
检查 WITH PROPERTIES ('remote.udf.category'='TABLE_VALUED')WITH PROPERTIES ('remote.udf.category'='TABLE_VALUED')
第一次调用很久没返回 FC 冷启动 等 5-10 秒,不是挂了 改完 config.jsonconfig.json
部署没变化 忘了重新渲染 重跑 python ../3-render-sql.pypython ../3-render-sql.py
相关文档