Studio Python 任务开发指南(ZettaPark)
Studio Python 任务让你在 Lakehouse 调度体系内运行 Python 脚本,通过 ZettaPark DataFrame API 读写 Lakehouse 数据。适合 SQL 难以表达的场景:分位数打分、复杂业务规则映射、多步骤数据处理流程。
本文以 RFM 客户分层 + 营销标签生成 为例,完整演示从建表、写脚本到调度执行的全流程,包含任务参数的使用方式。脚本自包含,直接粘贴到 Studio 任务里即可运行,无需额外准备。所有代码经过 cz-cli task execute 实测验证。
核心机制
Python Task 执行方式 :Studio 直接运行脚本的顶层代码,不会自动调用
main()main()
函数。脚本里的所有顶层语句按顺序执行,
print()print()
输出写入任务日志。
ZettaPark Session 创建方式 :Python 任务运行在 Studio 托管环境中,连接信息通过
clickzetta_dbutilsclickzetta_dbutils
注入,不需要硬编码用户名密码:
from clickzetta_dbutils import get_active_lakehouse_engine
from clickzetta.zettapark.session import Session
from urllib.parse import urlparse, parse_qs
engine = get_active_lakehouse_engine(schema="your_schema")
url_str = str(engine.url)
parsed = urlparse(url_str.replace('clickzetta://', 'https://'))
params = parse_qs(parsed.query)
parts = parsed.hostname.split('.', 1)
session = Session.builder.configs({
"service": parts[1], # cn-shanghai-alicloud.api.clickzetta.com
"instance": parts[0], # 实例 ID,从 URL 自动提取
"magic_token": params['magic_token'][0],
"workspace": parsed.path.lstrip('/'),
"schema": params.get('schema', ['public'])[0],
"vcluster": params.get('virtualcluster', ['default'])[0],
}).getOrCreate()
⚠️ 注意 :
SessionSession
对象没有
get_current_database()get_current_database()
方法,应使用
get_current_catalog()get_current_catalog()
和
get_current_schema()get_current_schema()
。
任务参数 :Studio 支持在脚本里用
'${param_name}''${param_name}'
引用参数,运行时自动替换为实际值:
base_date = '${base_date}' # 运行时替换为实际日期,如 2024-12-31
配置参数取值 有两种方式:
Studio UI :在任务编辑器里点击右侧 参数 按钮,系统自动识别 ${base_date}${base_date}
,为它赋值(如 $[yyyy-MM-dd]$[yyyy-MM-dd]
)
cz-cli :在 save-contentsave-content
时通过 --params--params
传入 JSON:
cz-cli task save-content my_task --file task.py \
--params '{"base_date": "$[yyyy-MM-dd]"}' \
--profile <your-profile>
运行时参数替换规则 :
调度运行 :系统按配置的表达式自动计算并替换,如 $[yyyy-MM-dd]$[yyyy-MM-dd]
替换为当天日期
临时执行 :配置的表达式不生效,必须通过 --param--param
手动指定值,或在 Studio UI 弹窗里输入
# 临时执行时手动指定参数值
cz-cli task execute my_task --param "base_date=2024-12-31" --profile <your-profile>
场景:RFM 客户分层 + 营销标签
RFM 是什么 :电商和零售行业最常用的客户价值分层模型,基于三个维度:
R(Recency) :最近一次购买距今天数,越小越好
F(Frequency) :购买次数,越多越好
M(Monetary) :累计消费金额,越高越好
为什么用 Python 而不是 SQL :分位数打分需要先计算全局分位数,再对每行打分,SQL 的
NTILENTILE
窗口函数可以做,但分层规则映射(Champions/Loyal/At Risk 等)和营销文案生成用 Python 更直观,也更容易维护。
完整脚本
from clickzetta_dbutils import get_active_lakehouse_engine
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
from urllib.parse import urlparse, parse_qs
import datetime
# ── 任务参数 ─────────────────────────────────────────────────────────────
# Studio 调度配置:base_date = $[yyyy-MM-dd] (每次取当天日期)
# cz-cli 临时执行:--param "base_date=2024-12-31"
base_date = '${base_date}'
print(f"RFM 基准日期:{base_date}")
# ── 1. 创建 ZettaPark Session ────────────────────────────────────────────
engine = get_active_lakehouse_engine(schema="zettapark_demo")
url_str = str(engine.url)
parsed = urlparse(url_str.replace('clickzetta://', 'https://'))
params = parse_qs(parsed.query)
parts = parsed.hostname.split('.', 1)
session = Session.builder.configs({
"service": parts[1],
"instance": parts[0],
"magic_token": params['magic_token'][0],
"workspace": parsed.path.lstrip('/'),
"schema": params.get('schema', ['public'])[0],
"vcluster": params.get('virtualcluster', ['default'])[0],
}).getOrCreate()
print(f"Session 就绪:{session.get_current_catalog()}.{session.get_current_schema()}")
# ── 2. 建表并写入测试数据(幂等,首次运行自动创建) ──────────────────────
session.sql("CREATE SCHEMA IF NOT EXISTS zettapark_demo").collect()
session.sql("""
CREATE TABLE IF NOT EXISTS zettapark_demo.doc_rfm_orders (
order_id BIGINT,
customer_id BIGINT,
amount DOUBLE,
order_date DATE
)
""").collect()
# 用 connector 写入行数据(ZettaPark 无行级写入 API)
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM zettapark_demo.doc_rfm_orders")
if cursor.fetchone()[0] == 0:
orders_data = [
(1, 101, 299.00, datetime.date(2024, 11, 1)),
(2, 101, 150.00, datetime.date(2024, 11, 15)),
(3, 101, 89.00, datetime.date(2024, 12, 1)),
(4, 101, 210.00, datetime.date(2024, 12, 20)),
(5, 102, 450.00, datetime.date(2024, 6, 10)),
(6, 102, 320.00, datetime.date(2024, 7, 5)),
(7, 103, 35.00, datetime.date(2024, 12, 28)),
(8, 104, 180.00, datetime.date(2024, 9, 1)),
(9, 104, 220.00, datetime.date(2024, 9, 15)),
(10, 104, 190.00, datetime.date(2024, 10, 1)),
(11, 104, 250.00, datetime.date(2024, 10, 20)),
(12, 104, 300.00, datetime.date(2024, 11, 5)),
(13, 105, 25.00, datetime.date(2024, 1, 10)),
(14, 106, 500.00, datetime.date(2024, 12, 25)),
(15, 106, 480.00, datetime.date(2024, 12, 26)),
(16, 107, 60.00, datetime.date(2024, 8, 1)),
(17, 107, 55.00, datetime.date(2024, 8, 20)),
(18, 108, 120.00, datetime.date(2024, 11, 10)),
(19, 109, 999.00, datetime.date(2024, 12, 30)),
(20, 110, 40.00, datetime.date(2024, 3, 15)),
]
cursor.executemany(
"INSERT INTO zettapark_demo.doc_rfm_orders VALUES (?, ?, ?, ?)",
orders_data
)
print(f"写入 {len(orders_data)} 条订单记录")
else:
print("订单数据已存在,跳过写入")
cursor.close()
conn.close()
# ── 3. 计算 RFM 原始指标 ─────────────────────────────────────────────────
orders = session.table("zettapark_demo.doc_rfm_orders")
print(f"订单总行数:{orders.count()}")
rfm_raw = (
orders
.group_by("customer_id")
.agg(
F.expr(f"DATEDIFF(DATE '{base_date}', MAX(order_date))").alias("recency"),
F.count("order_id").alias("frequency"),
F.sum("amount").alias("monetary"),
)
)
# 转为 pandas 做分位数打分
df = rfm_raw.to_pandas()
print(f"\n原始 RFM 指标({len(df)} 个客户):")
print(df.sort_values('customer_id').to_string(index=False))
# ── 4. 分位数打分(1-3 分,3 分最优)────────────────────────────────────
r_33 = df['recency'].quantile(0.33)
r_66 = df['recency'].quantile(0.66)
f_33 = df['frequency'].quantile(0.33)
f_66 = df['frequency'].quantile(0.66)
m_33 = df['monetary'].quantile(0.33)
m_66 = df['monetary'].quantile(0.66)
def score_r(v):
# Recency 越小越好,分位数倒序
if v <= r_33: return 3
if v <= r_66: return 2
return 1
def score_fm(v, p33, p66):
if v >= p66: return 3
if v >= p33: return 2
return 1
df['r_score'] = df['recency'].apply(score_r)
df['f_score'] = df['frequency'].apply(lambda v: score_fm(v, f_33, f_66))
df['m_score'] = df['monetary'].apply(lambda v: score_fm(v, m_33, m_66))
df['rfm_total'] = df['r_score'] + df['f_score'] + df['m_score']
# ── 5. 分层规则 + 营销标签 ───────────────────────────────────────────────
def classify(row):
r, f, m = row['r_score'], row['f_score'], row['m_score']
if r == 3 and f == 3 and m == 3:
return 'Champions', '高价值忠实客户,优先推送新品和会员专属权益'
if r >= 2 and f >= 2 and m >= 2:
return 'Loyal', '忠实客户,推送积分兑换和复购优惠'
if r == 3 and f <= 2:
return 'New Customers', '新客户,发送欢迎礼包和首购优惠券'
if r <= 1 and f >= 2 and m >= 2:
return 'At Risk', '高价值流失预警,发送召回优惠和专属折扣'
if r <= 1 and f <= 1:
return 'Lost', '已流失客户,低成本触达或放弃'
return 'Potential', '潜力客户,推送热销商品和限时活动'
df[['segment', 'action']] = df.apply(classify, axis=1, result_type='expand')
print("\nRFM 打分与分层结果:")
print(df[['customer_id', 'recency', 'frequency', 'monetary',
'r_score', 'f_score', 'm_score', 'segment', 'action']].to_string(index=False))
# ── 6. 写回 Lakehouse ────────────────────────────────────────────────────
result_df = session.create_dataframe(df[[
'customer_id', 'recency', 'frequency', 'monetary',
'r_score', 'f_score', 'm_score', 'rfm_total', 'segment', 'action'
]])
result_df.write.mode("overwrite").save_as_table("zettapark_demo.doc_rfm_result")
print(f"\n结果已写入 zettapark_demo.doc_rfm_result({len(df)} 行)")
# ── 7. 分层汇总 ──────────────────────────────────────────────────────────
summary = (
session.table("zettapark_demo.doc_rfm_result")
.group_by("segment")
.agg(
F.count("customer_id").alias("customer_count"),
F.round(F.avg("monetary"), 2).alias("avg_monetary"),
)
.sort(F.col("avg_monetary").desc())
)
print("\n各分层汇总:")
summary.show()
session.close()
创建并执行任务
Studio UI
进入 数据开发 → 新建任务 ,选择 Python 类型,填写任务名称
将上方脚本粘贴到编辑器
点击右侧 参数 按钮,系统自动识别 ${base_date}${base_date}
,赋值为 $[yyyy-MM-dd]$[yyyy-MM-dd]
(取当天日期)
点击 调度 按钮,配置 VCluster(选通用型 DEFAULTDEFAULT
)和 Cron 表达式(如 0 2 * * *0 2 * * *
)
点击 发布 ,再点击 运行 → 在弹窗里输入 base_date=2024-12-31base_date=2024-12-31
验证
cz-cli (适合 CI/CD 或批量管理场景,详见 Studio 任务开发与运维 )
# 创建任务
cz-cli task create rfm_segmentation --type python --profile <your-profile>
# 上传脚本并配置参数(base_date 取当天日期)
cz-cli task save-content rfm_segmentation --file rfm_task.py \
--params '{"base_date": "$[yyyy-MM-dd]"}' \
--profile <your-profile>
# 配置调度(每天凌晨 2 点执行)
cz-cli task save-config rfm_segmentation --vcluster default --retry-count 1 --profile <your-profile>
cz-cli task save-cron rfm_segmentation --cron "0 2 * * *" --profile <your-profile>
# 发布并临时执行验证
cz-cli task online rfm_segmentation -y --profile <your-profile>
cz-cli task execute rfm_segmentation --param "base_date=2024-12-31" --profile <your-profile>
执行结果
RFM 基准日期:2024-12-31
Session 就绪:quick_start.zettapark_demo
写入 20 条订单记录
订单总行数:20
原始 RFM 指标(10 个客户):
customer_id recency frequency monetary
101 11 4 748.0
102 179 2 770.0
103 3 1 35.0
104 56 5 1140.0
105 356 1 25.0
106 5 2 980.0
107 133 2 115.0
108 51 1 120.0
109 1 1 999.0
110 291 1 40.0
RFM 打分与分层结果:
customer_id recency frequency monetary r_score f_score m_score segment action
101 11 4 748.0 2 3 2 Loyal 忠实客户,推送积分兑换和复购优惠
102 179 2 770.0 1 3 3 At Risk 高价值流失预警,发送召回优惠和专属折扣
103 3 1 35.0 3 2 1 New Customers 新客户,发送欢迎礼包和首购优惠券
104 56 5 1140.0 2 3 3 Loyal 忠实客户,推送积分兑换和复购优惠
105 356 1 25.0 1 2 1 Potential 潜力客户,推送热销商品和限时活动
106 5 2 980.0 3 3 3 Champions 高价值忠实客户,优先推送新品和会员专属权益
107 133 2 115.0 1 3 2 At Risk 高价值流失预警,发送召回优惠和专属折扣
108 51 1 120.0 2 2 2 Loyal 忠实客户,推送积分兑换和复购优惠
109 1 1 999.0 3 2 3 Loyal 忠实客户,推送积分兑换和复购优惠
110 291 1 40.0 1 2 1 Potential 潜力客户,推送热销商品和限时活动
结果已写入 zettapark_demo.doc_rfm_result(10 行)
各分层汇总:
---------------------------------------------------------------------------
|"SEGMENT" |"CUSTOMER_COUNT"|"AVG_MONETARY"|
---------------------------------------------------------------------------
|Champions |1 |980.0 |
|Loyal |4 |751.75 |
|At Risk |2 |442.5 |
|New Customers |1 |35.0 |
|Potential |2 |32.5 |
---------------------------------------------------------------------------
验证写入结果:
SELECT segment, customer_count, avg_monetary
FROM (
SELECT segment,
COUNT(*) AS customer_count,
ROUND(AVG(monetary), 2) AS avg_monetary
FROM zettapark_demo.doc_rfm_result
GROUP BY segment
)
ORDER BY avg_monetary DESC;
segment customer_count avg_monetary
Champions 1 980.00
Loyal 4 751.75
At Risk 2 442.50
New Customers 1 35.00
Potential 2 32.50
常见问题
AttributeError: 'Session' object has no attribute 'get_current_database'AttributeError: 'Session' object has no attribute 'get_current_database'
SessionSession
没有
get_current_database()get_current_database()
方法,改用
get_current_catalog()get_current_catalog()
。
python process interrupted exceptionpython process interrupted exception
Python 任务运行时错误,错误信息在 Studio 任务实例的日志里,不会出现在
information_schema.job_historyinformation_schema.job_history
(Python 运行时错误不走 SQL 引擎)。在 Studio UI 打开任务实例详情查看完整 traceback。
to_pandas()to_pandas()
数据量大时内存不足
to_pandas()to_pandas()
把数据拉到 Python 进程内存。数据量大时(百万行以上),分位数打分改用 SQL
PERCENTILE_CONTPERCENTILE_CONT
或
NTILENTILE
窗口函数在 Lakehouse 侧计算,只把打分结果用
create_dataframe()create_dataframe()
写回。
相关文档