Studio Python 任务开发指南(Python Connector)

Studio Python 任务内置

clickzetta-connector
clickzetta-connector
,通过
get_active_lakehouse_engine().raw_connection()
get_active_lakehouse_engine().raw_connection()
即可拿到符合 PEP 249 规范的 connection 对象,直接使用 cursor/executemany 执行 SQL 和批量写入。

本文以 用户行为事件写入 + 漏斗分析 为例,演示完整开发流程,包含任务参数的使用方式。脚本自包含,直接粘贴到 Studio 任务里即可运行,无需额外准备。所有代码经过 cz-cli task execute 实测验证。


核心机制

Python 任务里获取 connector connection 只需两行:

from clickzetta_dbutils import get_active_lakehouse_engine engine = get_active_lakehouse_engine(schema="your_schema") conn = engine.raw_connection() # 底层 clickzetta connector connection cursor = conn.cursor()

get_active_lakehouse_engine()
get_active_lakehouse_engine()
从 Studio 运行时注入的连接信息中自动构建 engine,不需要硬编码用户名密码。
raw_connection()
raw_connection()
返回底层的 clickzetta connector connection,支持完整的 PEP 249 接口:
execute()
execute()
executemany()
executemany()
fetchall()
fetchall()
fetchmany()
fetchmany()
cursor.description
cursor.description
等。

任务参数:Studio 支持在脚本里用

'${param_name}'
'${param_name}'
引用参数,运行时自动替换为实际值。字符串参数加引号,数值参数不加引号:

biz_date = '${biz_date}' # 字符串,运行时替换为 2024-12-01 limit = ${limit} # 数值,运行时替换为 100

配置参数取值有两种方式:

  • Studio UI:在任务编辑器里点击右侧 参数 按钮,系统自动识别
    ${biz_date}
    ${biz_date}
    ,为它赋值(如
    $[yyyy-MM-dd, -1d]
    $[yyyy-MM-dd, -1d]
  • cz-cli:在
    save-content
    save-content
    时通过
    --params
    --params
    传入 JSON:

cz-cli task save-content my_task --file task.py \ --params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \ --profile <your-profile>

运行时参数替换规则

  • 调度运行:系统按配置的表达式自动计算并替换,如
    $[yyyy-MM-dd, -1d]
    $[yyyy-MM-dd, -1d]
    替换为昨天日期
  • 临时执行:配置的表达式不生效,必须通过
    --param
    --param
    手动指定值,或在 Studio UI 弹窗里输入

# 临时执行时手动指定参数值 cz-cli task execute my_task --param "biz_date=2024-12-01" --profile <your-profile>


场景:用户行为漏斗分析(含任务参数)

每天凌晨处理前一天的用户行为数据,通过任务参数

biz_date
biz_date
控制处理哪天的数据,结果写入汇总表。

完整脚本

import datetime from clickzetta_dbutils import get_active_lakehouse_engine # ── 任务参数 ───────────────────────────────────────────────────────────── # Studio 调度配置:biz_date = $[yyyy-MM-dd, -1d] (每次取昨天日期) # cz-cli 临时执行:--param "biz_date=2024-12-01" biz_date = '${biz_date}' print(f"处理日期:{biz_date}") # ── 1. 获取连接 ────────────────────────────────────────────────────────── engine = get_active_lakehouse_engine(schema="doc_connector_demo") conn = engine.raw_connection() cursor = conn.cursor() print("连接成功") # ── 2. 建表(幂等,首次运行自动创建) ──────────────────────────────────── cursor.execute("CREATE SCHEMA IF NOT EXISTS doc_connector_demo") cursor.execute(""" CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_events ( event_id BIGINT, user_id BIGINT, event_type STRING, page STRING, duration INT, event_time TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_funnel_daily ( biz_date STRING, step1_view BIGINT, step2_cart BIGINT, step3_checkout BIGINT, run_time TIMESTAMP ) """) print("建表完成") # ── 3. 写入当天事件数据(模拟上游推送) ────────────────────────────────── events = [ (1, 101, 'view', 'home', 30, datetime.datetime(2024, 12, 1, 10, 0, 0)), (2, 101, 'click', 'product', 5, datetime.datetime(2024, 12, 1, 10, 0, 35)), (3, 102, 'view', 'home', 45, datetime.datetime(2024, 12, 1, 10, 1, 0)), (4, 102, 'view', 'product', 120, datetime.datetime(2024, 12, 1, 10, 2, 0)), (5, 102, 'click', 'cart', 8, datetime.datetime(2024, 12, 1, 10, 4, 0)), (6, 103, 'view', 'home', 15, datetime.datetime(2024, 12, 1, 10, 5, 0)), (7, 103, 'view', 'product', 200, datetime.datetime(2024, 12, 1, 10, 5, 20)), (8, 103, 'click', 'cart', 6, datetime.datetime(2024, 12, 1, 10, 8, 0)), (9, 103, 'click', 'checkout', 12, datetime.datetime(2024, 12, 1, 10, 8, 10)), (10, 104, 'view', 'home', 10, datetime.datetime(2024, 12, 1, 10, 9, 0)), ] cursor.executemany( "INSERT INTO doc_connector_demo.doc_events VALUES (?, ?, ?, ?, ?, ?)", events ) print(f"写入 {len(events)} 条事件记录") # ── 4. 查询指定日期的事件统计 ──────────────────────────────────────────── cursor.execute(f""" SELECT event_type, COUNT(*) AS cnt, AVG(duration) AS avg_duration FROM doc_connector_demo.doc_events WHERE DATE(event_time) = DATE '{biz_date}' GROUP BY event_type ORDER BY cnt DESC """) col_names = [col[0] for col in cursor.description] rows = cursor.fetchall() print(f"\n{biz_date} 事件统计(列:{col_names}):") for row in rows: print(f" {row[0]:10s} 次数={row[1]} 平均时长={row[2]:.1f}s") # ── 5. 漏斗分析 ────────────────────────────────────────────────────────── cursor.execute(f""" SELECT COUNT(DISTINCT CASE WHEN event_type = 'view' THEN user_id END) AS step1_view, COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'cart' THEN user_id END) AS step2_cart, COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'checkout' THEN user_id END) AS step3_checkout FROM doc_connector_demo.doc_events WHERE DATE(event_time) = DATE '{biz_date}' """) row = cursor.fetchone() view, cart, checkout = row[0], row[1], row[2] print(f"\n漏斗分析({biz_date}):") print(f" 浏览首页: {view} 人") if view > 0: print(f" 加入购物车: {cart} 人 (转化率 {cart/view*100:.0f}%)") print(f" 进入结算: {checkout} 人 (转化率 {checkout/view*100:.0f}%)") # ── 6. fetchmany 分批读取 ──────────────────────────────────────────────── cursor.execute(f""" SELECT event_id, user_id, event_type, page, duration FROM doc_connector_demo.doc_events WHERE DATE(event_time) = DATE '{biz_date}' ORDER BY event_id """) print(f"\nfetchmany 分批读取(每批 3 行):") while True: batch = cursor.fetchmany(3) if not batch: break print(f" 本批 {len(batch)} 行: event_id {batch[0][0]} ~ {batch[-1][0]}") # ── 7. 写入汇总结果 ────────────────────────────────────────────────────── cursor.executemany( "INSERT INTO doc_connector_demo.doc_funnel_daily VALUES (?, ?, ?, ?, ?)", [(biz_date, view, cart, checkout, datetime.datetime.now())] ) print(f"\n汇总结果已写入 doc_funnel_daily") cursor.close() conn.close() print("完成")

创建并执行任务

Studio UI

  1. 进入 数据开发 → 新建任务,选择 Python 类型,填写任务名称
  2. 将上方脚本粘贴到编辑器
  3. 点击右侧 参数 按钮,系统自动识别
    ${biz_date}
    ${biz_date}
    ,赋值为
    $[yyyy-MM-dd, -1d]
    $[yyyy-MM-dd, -1d]
    (取昨天日期)
  4. 点击 调度 按钮,配置 VCluster(选通用型
    DEFAULT
    DEFAULT
    )和 Cron 表达式(如
    0 3 * * *
    0 3 * * *
  5. 点击 发布,再点击 运行 → 在弹窗里输入
    biz_date=2024-12-01
    biz_date=2024-12-01
    验证

cz-cli(适合 CI/CD 或批量管理场景,详见 Studio 任务开发与运维

# 创建任务 cz-cli task create connector_funnel --type python --profile <your-profile> # 上传脚本并配置参数(biz_date 取昨天日期) cz-cli task save-content connector_funnel --file connector_funnel.py \ --params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \ --profile <your-profile> # 配置调度(每天凌晨 3 点执行) cz-cli task save-config connector_funnel --vcluster default --retry-count 1 --profile <your-profile> cz-cli task save-cron connector_funnel --cron "0 3 * * *" --profile <your-profile> # 发布并临时执行验证 cz-cli task online connector_funnel -y --profile <your-profile> cz-cli task execute connector_funnel --param "biz_date=2024-12-01" --profile <your-profile>

执行结果

处理日期:2024-12-01 连接成功 建表完成 写入 10 条事件记录 2024-12-01 事件统计(列:['event_type', 'cnt', 'avg_duration']): view 次数=6 平均时长=70.0s click 次数=4 平均时长=7.8s 漏斗分析(2024-12-01): 浏览首页: 4 人 加入购物车: 2 人 (转化率 50%) 进入结算: 1 人 (转化率 25%) fetchmany 分批读取(每批 3 行): 本批 3 行: event_id 1 ~ 3 本批 3 行: event_id 4 ~ 6 本批 3 行: event_id 7 ~ 9 本批 1 行: event_id 10 ~ 10 汇总结果已写入 doc_funnel_daily 完成

验证写入结果:

SELECT biz_date, step1_view, step2_cart, step3_checkout FROM doc_connector_demo.doc_funnel_daily ORDER BY biz_date DESC LIMIT 5;

biz_date step1_view step2_cart step3_checkout 2024-12-01 4 2 1


常用接口速查

接口说明
cursor.execute(sql)
cursor.execute(sql)
执行单条 SQL
cursor.executemany(sql, data)
cursor.executemany(sql, data)
批量写入,
data
data
为 list of tuple
cursor.fetchall()
cursor.fetchall()
获取全部结果行
cursor.fetchone()
cursor.fetchone()
获取单行
cursor.fetchmany(n)
cursor.fetchmany(n)
分批获取,每次 n 行,结果耗尽返回
[]
[]
cursor.description
cursor.description
列元数据,
col[0]
col[0]
为列名

executemany
executemany
占位符:用
?
?
表示参数位置,对应 tuple 中的值按顺序绑定。TIMESTAMP 列传
datetime.datetime
datetime.datetime
对象,DATE 列传
datetime.date
datetime.date
对象。


与 ZettaPark 的选型对比

Python ConnectorZettaPark
接口风格PEP 249 cursor/SQLDataFrame 链式操作
适合场景批量写入、精确 SQL 控制数据处理、聚合、pandas 集成
写入方式
executemany()
executemany()
create_dataframe().write.save_as_table()
create_dataframe().write.save_as_table()
读取方式
fetchall()
fetchall()
/
fetchmany()
fetchmany()
to_pandas()
to_pandas()
/
show()
show()
依赖
clickzetta-connector
clickzetta-connector
(内置)
clickzetta-zettapark-python
clickzetta-zettapark-python
(内置)

两者可以在同一个 Python 任务里混用:用 connector 写入原始数据,用 ZettaPark 做聚合分析。


相关文档

联系我们
预约咨询
微信咨询
电话咨询