Studio Python 任务开发指南(ZettaPark)

Studio Python 任务让你在 Lakehouse 调度体系内运行 Python 脚本,通过 ZettaPark DataFrame API 读写 Lakehouse 数据。适合 SQL 难以表达的场景:分位数打分、复杂业务规则映射、多步骤数据处理流程。

本文以 RFM 客户分层 + 营销标签生成 为例,完整演示从建表、写脚本到调度执行的全流程,包含任务参数的使用方式。脚本自包含,直接粘贴到 Studio 任务里即可运行,无需额外准备。所有代码经过 cz-cli task execute 实测验证。


核心机制

Python Task 执行方式:Studio 直接运行脚本的顶层代码,不会自动调用

main()
main()
函数。脚本里的所有顶层语句按顺序执行,
print()
print()
输出写入任务日志。

ZettaPark Session 创建方式:Python 任务运行在 Studio 托管环境中,连接信息通过

clickzetta_dbutils
clickzetta_dbutils
注入,不需要硬编码用户名密码:

from clickzetta_dbutils import get_active_lakehouse_engine from clickzetta.zettapark.session import Session from urllib.parse import urlparse, parse_qs engine = get_active_lakehouse_engine(schema="your_schema") url_str = str(engine.url) parsed = urlparse(url_str.replace('clickzetta://', 'https://')) params = parse_qs(parsed.query) parts = parsed.hostname.split('.', 1) session = Session.builder.configs({ "service": parts[1], # cn-shanghai-alicloud.api.clickzetta.com "instance": parts[0], # 实例 ID,从 URL 自动提取 "magic_token": params['magic_token'][0], "workspace": parsed.path.lstrip('/'), "schema": params.get('schema', ['public'])[0], "vcluster": params.get('virtualcluster', ['default'])[0], }).getOrCreate()

任务参数:Studio 支持在脚本里用

'${param_name}'
'${param_name}'
引用参数,运行时自动替换为实际值:

base_date = '${base_date}' # 运行时替换为实际日期,如 2024-12-31

配置参数取值有两种方式:

  • Studio UI:在任务编辑器里点击右侧 参数 按钮,系统自动识别
    ${base_date}
    ${base_date}
    ,为它赋值(如
    $[yyyy-MM-dd]
    $[yyyy-MM-dd]
  • cz-cli:在
    save-content
    save-content
    时通过
    --params
    --params
    传入 JSON:

cz-cli task save-content my_task --file task.py \ --params '{"base_date": "$[yyyy-MM-dd]"}' \ --profile <your-profile>

运行时参数替换规则

  • 调度运行:系统按配置的表达式自动计算并替换,如
    $[yyyy-MM-dd]
    $[yyyy-MM-dd]
    替换为当天日期
  • 临时执行:配置的表达式不生效,必须通过
    --param
    --param
    手动指定值,或在 Studio UI 弹窗里输入

# 临时执行时手动指定参数值 cz-cli task execute my_task --param "base_date=2024-12-31" --profile <your-profile>


场景:RFM 客户分层 + 营销标签

RFM 是什么:电商和零售行业最常用的客户价值分层模型,基于三个维度:

  • R(Recency):最近一次购买距今天数,越小越好
  • F(Frequency):购买次数,越多越好
  • M(Monetary):累计消费金额,越高越好

为什么用 Python 而不是 SQL:分位数打分需要先计算全局分位数,再对每行打分,SQL 的

NTILE
NTILE
窗口函数可以做,但分层规则映射(Champions/Loyal/At Risk 等)和营销文案生成用 Python 更直观,也更容易维护。

完整脚本

from clickzetta_dbutils import get_active_lakehouse_engine from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F from urllib.parse import urlparse, parse_qs import datetime # ── 任务参数 ───────────────────────────────────────────────────────────── # Studio 调度配置:base_date = $[yyyy-MM-dd] (每次取当天日期) # cz-cli 临时执行:--param "base_date=2024-12-31" base_date = '${base_date}' print(f"RFM 基准日期:{base_date}") # ── 1. 创建 ZettaPark Session ──────────────────────────────────────────── engine = get_active_lakehouse_engine(schema="zettapark_demo") url_str = str(engine.url) parsed = urlparse(url_str.replace('clickzetta://', 'https://')) params = parse_qs(parsed.query) parts = parsed.hostname.split('.', 1) session = Session.builder.configs({ "service": parts[1], "instance": parts[0], "magic_token": params['magic_token'][0], "workspace": parsed.path.lstrip('/'), "schema": params.get('schema', ['public'])[0], "vcluster": params.get('virtualcluster', ['default'])[0], }).getOrCreate() print(f"Session 就绪:{session.get_current_catalog()}.{session.get_current_schema()}") # ── 2. 建表并写入测试数据(幂等,首次运行自动创建) ────────────────────── session.sql("CREATE SCHEMA IF NOT EXISTS zettapark_demo").collect() session.sql(""" CREATE TABLE IF NOT EXISTS zettapark_demo.doc_rfm_orders ( order_id BIGINT, customer_id BIGINT, amount DOUBLE, order_date DATE ) """).collect() # 用 connector 写入行数据(ZettaPark 无行级写入 API) conn = engine.raw_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM zettapark_demo.doc_rfm_orders") if cursor.fetchone()[0] == 0: orders_data = [ (1, 101, 299.00, datetime.date(2024, 11, 1)), (2, 101, 150.00, datetime.date(2024, 11, 15)), (3, 101, 89.00, datetime.date(2024, 12, 1)), (4, 101, 210.00, datetime.date(2024, 12, 20)), (5, 102, 450.00, datetime.date(2024, 6, 10)), (6, 102, 320.00, datetime.date(2024, 7, 5)), (7, 103, 35.00, datetime.date(2024, 12, 28)), (8, 104, 180.00, datetime.date(2024, 9, 1)), (9, 104, 220.00, datetime.date(2024, 9, 15)), (10, 104, 190.00, datetime.date(2024, 10, 1)), (11, 104, 250.00, datetime.date(2024, 10, 20)), (12, 104, 300.00, datetime.date(2024, 11, 5)), (13, 105, 25.00, datetime.date(2024, 1, 10)), (14, 106, 500.00, datetime.date(2024, 12, 25)), (15, 106, 480.00, datetime.date(2024, 12, 26)), (16, 107, 60.00, datetime.date(2024, 8, 1)), (17, 107, 55.00, datetime.date(2024, 8, 20)), (18, 108, 120.00, datetime.date(2024, 11, 10)), (19, 109, 999.00, datetime.date(2024, 12, 30)), (20, 110, 40.00, datetime.date(2024, 3, 15)), ] cursor.executemany( "INSERT INTO zettapark_demo.doc_rfm_orders VALUES (?, ?, ?, ?)", orders_data ) print(f"写入 {len(orders_data)} 条订单记录") else: print("订单数据已存在,跳过写入") cursor.close() conn.close() # ── 3. 计算 RFM 原始指标 ───────────────────────────────────────────────── orders = session.table("zettapark_demo.doc_rfm_orders") print(f"订单总行数:{orders.count()}") rfm_raw = ( orders .group_by("customer_id") .agg( F.expr(f"DATEDIFF(DATE '{base_date}', MAX(order_date))").alias("recency"), F.count("order_id").alias("frequency"), F.sum("amount").alias("monetary"), ) ) # 转为 pandas 做分位数打分 df = rfm_raw.to_pandas() print(f"\n原始 RFM 指标({len(df)} 个客户):") print(df.sort_values('customer_id').to_string(index=False)) # ── 4. 分位数打分(1-3 分,3 分最优)──────────────────────────────────── r_33 = df['recency'].quantile(0.33) r_66 = df['recency'].quantile(0.66) f_33 = df['frequency'].quantile(0.33) f_66 = df['frequency'].quantile(0.66) m_33 = df['monetary'].quantile(0.33) m_66 = df['monetary'].quantile(0.66) def score_r(v): # Recency 越小越好,分位数倒序 if v <= r_33: return 3 if v <= r_66: return 2 return 1 def score_fm(v, p33, p66): if v >= p66: return 3 if v >= p33: return 2 return 1 df['r_score'] = df['recency'].apply(score_r) df['f_score'] = df['frequency'].apply(lambda v: score_fm(v, f_33, f_66)) df['m_score'] = df['monetary'].apply(lambda v: score_fm(v, m_33, m_66)) df['rfm_total'] = df['r_score'] + df['f_score'] + df['m_score'] # ── 5. 分层规则 + 营销标签 ─────────────────────────────────────────────── def classify(row): r, f, m = row['r_score'], row['f_score'], row['m_score'] if r == 3 and f == 3 and m == 3: return 'Champions', '高价值忠实客户,优先推送新品和会员专属权益' if r >= 2 and f >= 2 and m >= 2: return 'Loyal', '忠实客户,推送积分兑换和复购优惠' if r == 3 and f <= 2: return 'New Customers', '新客户,发送欢迎礼包和首购优惠券' if r <= 1 and f >= 2 and m >= 2: return 'At Risk', '高价值流失预警,发送召回优惠和专属折扣' if r <= 1 and f <= 1: return 'Lost', '已流失客户,低成本触达或放弃' return 'Potential', '潜力客户,推送热销商品和限时活动' df[['segment', 'action']] = df.apply(classify, axis=1, result_type='expand') print("\nRFM 打分与分层结果:") print(df[['customer_id', 'recency', 'frequency', 'monetary', 'r_score', 'f_score', 'm_score', 'segment', 'action']].to_string(index=False)) # ── 6. 写回 Lakehouse ──────────────────────────────────────────────────── result_df = session.create_dataframe(df[[ 'customer_id', 'recency', 'frequency', 'monetary', 'r_score', 'f_score', 'm_score', 'rfm_total', 'segment', 'action' ]]) result_df.write.mode("overwrite").save_as_table("zettapark_demo.doc_rfm_result") print(f"\n结果已写入 zettapark_demo.doc_rfm_result({len(df)} 行)") # ── 7. 分层汇总 ────────────────────────────────────────────────────────── summary = ( session.table("zettapark_demo.doc_rfm_result") .group_by("segment") .agg( F.count("customer_id").alias("customer_count"), F.round(F.avg("monetary"), 2).alias("avg_monetary"), ) .sort(F.col("avg_monetary").desc()) ) print("\n各分层汇总:") summary.show() session.close()

创建并执行任务

Studio UI

  1. 进入 数据开发 → 新建任务,选择 Python 类型,填写任务名称
  2. 将上方脚本粘贴到编辑器
  3. 点击右侧 参数 按钮,系统自动识别
    ${base_date}
    ${base_date}
    ,赋值为
    $[yyyy-MM-dd]
    $[yyyy-MM-dd]
    (取当天日期)
  4. 点击 调度 按钮,配置 VCluster(选通用型
    DEFAULT
    DEFAULT
    )和 Cron 表达式(如
    0 2 * * *
    0 2 * * *
  5. 点击 发布,再点击 运行 → 在弹窗里输入
    base_date=2024-12-31
    base_date=2024-12-31
    验证

cz-cli(适合 CI/CD 或批量管理场景,详见 Studio 任务开发与运维

# 创建任务 cz-cli task create rfm_segmentation --type python --profile <your-profile> # 上传脚本并配置参数(base_date 取当天日期) cz-cli task save-content rfm_segmentation --file rfm_task.py \ --params '{"base_date": "$[yyyy-MM-dd]"}' \ --profile <your-profile> # 配置调度(每天凌晨 2 点执行) cz-cli task save-config rfm_segmentation --vcluster default --retry-count 1 --profile <your-profile> cz-cli task save-cron rfm_segmentation --cron "0 2 * * *" --profile <your-profile> # 发布并临时执行验证 cz-cli task online rfm_segmentation -y --profile <your-profile> cz-cli task execute rfm_segmentation --param "base_date=2024-12-31" --profile <your-profile>

执行结果

RFM 基准日期:2024-12-31 Session 就绪:quick_start.zettapark_demo 写入 20 条订单记录 订单总行数:20 原始 RFM 指标(10 个客户): customer_id recency frequency monetary 101 11 4 748.0 102 179 2 770.0 103 3 1 35.0 104 56 5 1140.0 105 356 1 25.0 106 5 2 980.0 107 133 2 115.0 108 51 1 120.0 109 1 1 999.0 110 291 1 40.0 RFM 打分与分层结果: customer_id recency frequency monetary r_score f_score m_score segment action 101 11 4 748.0 2 3 2 Loyal 忠实客户,推送积分兑换和复购优惠 102 179 2 770.0 1 3 3 At Risk 高价值流失预警,发送召回优惠和专属折扣 103 3 1 35.0 3 2 1 New Customers 新客户,发送欢迎礼包和首购优惠券 104 56 5 1140.0 2 3 3 Loyal 忠实客户,推送积分兑换和复购优惠 105 356 1 25.0 1 2 1 Potential 潜力客户,推送热销商品和限时活动 106 5 2 980.0 3 3 3 Champions 高价值忠实客户,优先推送新品和会员专属权益 107 133 2 115.0 1 3 2 At Risk 高价值流失预警,发送召回优惠和专属折扣 108 51 1 120.0 2 2 2 Loyal 忠实客户,推送积分兑换和复购优惠 109 1 1 999.0 3 2 3 Loyal 忠实客户,推送积分兑换和复购优惠 110 291 1 40.0 1 2 1 Potential 潜力客户,推送热销商品和限时活动 结果已写入 zettapark_demo.doc_rfm_result(10 行) 各分层汇总: --------------------------------------------------------------------------- |"SEGMENT" |"CUSTOMER_COUNT"|"AVG_MONETARY"| --------------------------------------------------------------------------- |Champions |1 |980.0 | |Loyal |4 |751.75 | |At Risk |2 |442.5 | |New Customers |1 |35.0 | |Potential |2 |32.5 | ---------------------------------------------------------------------------

验证写入结果:

SELECT segment, customer_count, avg_monetary FROM ( SELECT segment, COUNT(*) AS customer_count, ROUND(AVG(monetary), 2) AS avg_monetary FROM zettapark_demo.doc_rfm_result GROUP BY segment ) ORDER BY avg_monetary DESC;

segment customer_count avg_monetary Champions 1 980.00 Loyal 4 751.75 At Risk 2 442.50 New Customers 1 35.00 Potential 2 32.50


常见问题

AttributeError: 'Session' object has no attribute 'get_current_database'
AttributeError: 'Session' object has no attribute 'get_current_database'

Session
Session
没有
get_current_database()
get_current_database()
方法,改用
get_current_catalog()
get_current_catalog()

python process interrupted exception
python process interrupted exception

Python 任务运行时错误,错误信息在 Studio 任务实例的日志里,不会出现在

information_schema.job_history
information_schema.job_history
(Python 运行时错误不走 SQL 引擎)。在 Studio UI 打开任务实例详情查看完整 traceback。

to_pandas()
to_pandas()
数据量大时内存不足

to_pandas()
to_pandas()
把数据拉到 Python 进程内存。数据量大时(百万行以上),分位数打分改用 SQL
PERCENTILE_CONT
PERCENTILE_CONT
NTILE
NTILE
窗口函数在 Lakehouse 侧计算,只把打分结果用
create_dataframe()
create_dataframe()
写回。


相关文档

联系我们
预约咨询
微信咨询
电话咨询