Studio Python 任务开发指南(Python Connector)
Studio Python 任务内置
clickzetta-connector
clickzetta-connector
,通过
get_active_lakehouse_engine().raw_connection()
get_active_lakehouse_engine().raw_connection()
即可拿到符合 PEP 249 规范的 connection 对象,直接使用 cursor/executemany 执行 SQL 和批量写入。
本文以 用户行为事件写入 + 漏斗分析 为例,演示完整开发流程,包含任务参数的使用方式。脚本自包含,直接粘贴到 Studio 任务里即可运行,无需额外准备。所有代码经过 cz-cli task execute 实测验证。
核心机制
Python 任务里获取 connector connection 只需两行:
from clickzetta_dbutils import get_active_lakehouse_engine
engine = get_active_lakehouse_engine(schema="your_schema")
conn = engine.raw_connection() # 底层 clickzetta connector connection
cursor = conn.cursor()
get_active_lakehouse_engine()
get_active_lakehouse_engine()
从 Studio 运行时注入的连接信息中自动构建 engine,不需要硬编码用户名密码。
raw_connection()
raw_connection()
返回底层的 clickzetta connector connection,支持完整的 PEP 249 接口:
execute()
execute()
、
executemany()
executemany()
、
fetchall()
fetchall()
、
fetchmany()
fetchmany()
、
cursor.description
cursor.description
等。
⚠️ 注意:ClickZetta 不支持事务,
commit()
commit()
和
rollback()
rollback()
接口无效,每条 SQL 自动提交。
任务参数:Studio 支持在脚本里用
'${param_name}'
'${param_name}'
引用参数,运行时自动替换为实际值。字符串参数加引号,数值参数不加引号:
biz_date = '${biz_date}' # 字符串,运行时替换为 2024-12-01
limit = ${limit} # 数值,运行时替换为 100
配置参数取值有两种方式:
- Studio UI:在任务编辑器里点击右侧 参数 按钮,系统自动识别
${biz_date}
${biz_date}
,为它赋值(如 $[yyyy-MM-dd, -1d]
$[yyyy-MM-dd, -1d]
)
- cz-cli:在
save-content
save-content
时通过 --params
--params
传入 JSON:
cz-cli task save-content my_task --file task.py \
--params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \
--profile <your-profile>
运行时参数替换规则:
- 调度运行:系统按配置的表达式自动计算并替换,如
$[yyyy-MM-dd, -1d]
$[yyyy-MM-dd, -1d]
替换为昨天日期
- 临时执行:配置的表达式不生效,必须通过
--param
--param
手动指定值,或在 Studio UI 弹窗里输入
# 临时执行时手动指定参数值
cz-cli task execute my_task --param "biz_date=2024-12-01" --profile <your-profile>
场景:用户行为漏斗分析(含任务参数)
每天凌晨处理前一天的用户行为数据,通过任务参数
biz_date
biz_date
控制处理哪天的数据,结果写入汇总表。
完整脚本
import datetime
from clickzetta_dbutils import get_active_lakehouse_engine
# ── 任务参数 ─────────────────────────────────────────────────────────────
# Studio 调度配置:biz_date = $[yyyy-MM-dd, -1d] (每次取昨天日期)
# cz-cli 临时执行:--param "biz_date=2024-12-01"
biz_date = '${biz_date}'
print(f"处理日期:{biz_date}")
# ── 1. 获取连接 ──────────────────────────────────────────────────────────
engine = get_active_lakehouse_engine(schema="doc_connector_demo")
conn = engine.raw_connection()
cursor = conn.cursor()
print("连接成功")
# ── 2. 建表(幂等,首次运行自动创建) ────────────────────────────────────
cursor.execute("CREATE SCHEMA IF NOT EXISTS doc_connector_demo")
cursor.execute("""
CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
page STRING,
duration INT,
event_time TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_funnel_daily (
biz_date STRING,
step1_view BIGINT,
step2_cart BIGINT,
step3_checkout BIGINT,
run_time TIMESTAMP
)
""")
print("建表完成")
# ── 3. 写入当天事件数据(模拟上游推送) ──────────────────────────────────
events = [
(1, 101, 'view', 'home', 30, datetime.datetime(2024, 12, 1, 10, 0, 0)),
(2, 101, 'click', 'product', 5, datetime.datetime(2024, 12, 1, 10, 0, 35)),
(3, 102, 'view', 'home', 45, datetime.datetime(2024, 12, 1, 10, 1, 0)),
(4, 102, 'view', 'product', 120, datetime.datetime(2024, 12, 1, 10, 2, 0)),
(5, 102, 'click', 'cart', 8, datetime.datetime(2024, 12, 1, 10, 4, 0)),
(6, 103, 'view', 'home', 15, datetime.datetime(2024, 12, 1, 10, 5, 0)),
(7, 103, 'view', 'product', 200, datetime.datetime(2024, 12, 1, 10, 5, 20)),
(8, 103, 'click', 'cart', 6, datetime.datetime(2024, 12, 1, 10, 8, 0)),
(9, 103, 'click', 'checkout', 12, datetime.datetime(2024, 12, 1, 10, 8, 10)),
(10, 104, 'view', 'home', 10, datetime.datetime(2024, 12, 1, 10, 9, 0)),
]
cursor.executemany(
"INSERT INTO doc_connector_demo.doc_events VALUES (?, ?, ?, ?, ?, ?)",
events
)
print(f"写入 {len(events)} 条事件记录")
# ── 4. 查询指定日期的事件统计 ────────────────────────────────────────────
cursor.execute(f"""
SELECT event_type,
COUNT(*) AS cnt,
AVG(duration) AS avg_duration
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
GROUP BY event_type
ORDER BY cnt DESC
""")
col_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
print(f"\n{biz_date} 事件统计(列:{col_names}):")
for row in rows:
print(f" {row[0]:10s} 次数={row[1]} 平均时长={row[2]:.1f}s")
# ── 5. 漏斗分析 ──────────────────────────────────────────────────────────
cursor.execute(f"""
SELECT
COUNT(DISTINCT CASE WHEN event_type = 'view' THEN user_id END) AS step1_view,
COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'cart' THEN user_id END) AS step2_cart,
COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'checkout' THEN user_id END) AS step3_checkout
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
""")
row = cursor.fetchone()
view, cart, checkout = row[0], row[1], row[2]
print(f"\n漏斗分析({biz_date}):")
print(f" 浏览首页: {view} 人")
if view > 0:
print(f" 加入购物车: {cart} 人 (转化率 {cart/view*100:.0f}%)")
print(f" 进入结算: {checkout} 人 (转化率 {checkout/view*100:.0f}%)")
# ── 6. fetchmany 分批读取 ────────────────────────────────────────────────
cursor.execute(f"""
SELECT event_id, user_id, event_type, page, duration
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
ORDER BY event_id
""")
print(f"\nfetchmany 分批读取(每批 3 行):")
while True:
batch = cursor.fetchmany(3)
if not batch:
break
print(f" 本批 {len(batch)} 行: event_id {batch[0][0]} ~ {batch[-1][0]}")
# ── 7. 写入汇总结果 ──────────────────────────────────────────────────────
cursor.executemany(
"INSERT INTO doc_connector_demo.doc_funnel_daily VALUES (?, ?, ?, ?, ?)",
[(biz_date, view, cart, checkout, datetime.datetime.now())]
)
print(f"\n汇总结果已写入 doc_funnel_daily")
cursor.close()
conn.close()
print("完成")
创建并执行任务
Studio UI
- 进入 数据开发 → 新建任务,选择 Python 类型,填写任务名称
- 将上方脚本粘贴到编辑器
- 点击右侧 参数 按钮,系统自动识别
${biz_date}
${biz_date}
,赋值为 $[yyyy-MM-dd, -1d]
$[yyyy-MM-dd, -1d]
(取昨天日期)
- 点击 调度 按钮,配置 VCluster(选通用型
DEFAULT
DEFAULT
)和 Cron 表达式(如 0 3 * * *
0 3 * * *
)
- 点击 发布,再点击 运行 → 在弹窗里输入
biz_date=2024-12-01
biz_date=2024-12-01
验证
cz-cli(适合 CI/CD 或批量管理场景,详见 Studio 任务开发与运维)
# 创建任务
cz-cli task create connector_funnel --type python --profile <your-profile>
# 上传脚本并配置参数(biz_date 取昨天日期)
cz-cli task save-content connector_funnel --file connector_funnel.py \
--params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \
--profile <your-profile>
# 配置调度(每天凌晨 3 点执行)
cz-cli task save-config connector_funnel --vcluster default --retry-count 1 --profile <your-profile>
cz-cli task save-cron connector_funnel --cron "0 3 * * *" --profile <your-profile>
# 发布并临时执行验证
cz-cli task online connector_funnel -y --profile <your-profile>
cz-cli task execute connector_funnel --param "biz_date=2024-12-01" --profile <your-profile>
执行结果
处理日期:2024-12-01
连接成功
建表完成
写入 10 条事件记录
2024-12-01 事件统计(列:['event_type', 'cnt', 'avg_duration']):
view 次数=6 平均时长=70.0s
click 次数=4 平均时长=7.8s
漏斗分析(2024-12-01):
浏览首页: 4 人
加入购物车: 2 人 (转化率 50%)
进入结算: 1 人 (转化率 25%)
fetchmany 分批读取(每批 3 行):
本批 3 行: event_id 1 ~ 3
本批 3 行: event_id 4 ~ 6
本批 3 行: event_id 7 ~ 9
本批 1 行: event_id 10 ~ 10
汇总结果已写入 doc_funnel_daily
完成
验证写入结果:
SELECT biz_date, step1_view, step2_cart, step3_checkout
FROM doc_connector_demo.doc_funnel_daily
ORDER BY biz_date DESC
LIMIT 5;
biz_date step1_view step2_cart step3_checkout
2024-12-01 4 2 1
常用接口速查
| 接口 | 说明 |
|---|
cursor.execute(sql)
cursor.execute(sql) | 执行单条 SQL |
cursor.executemany(sql, data)
cursor.executemany(sql, data) | 批量写入,data
data 为 list of tuple |
cursor.fetchall()
cursor.fetchall() | 获取全部结果行 |
cursor.fetchone()
cursor.fetchone() | 获取单行 |
cursor.fetchmany(n)
cursor.fetchmany(n) | 分批获取,每次 n 行,结果耗尽返回 []
[] |
cursor.description
cursor.description | 列元数据,col[0]
col[0] 为列名 |
executemany
executemany
占位符:用
?
?
表示参数位置,对应 tuple 中的值按顺序绑定。TIMESTAMP 列传
datetime.datetime
datetime.datetime
对象,DATE 列传
datetime.date
datetime.date
对象。
与 ZettaPark 的选型对比
| Python Connector | ZettaPark |
|---|
| 接口风格 | PEP 249 cursor/SQL | DataFrame 链式操作 |
| 适合场景 | 批量写入、精确 SQL 控制 | 数据处理、聚合、pandas 集成 |
| 写入方式 | executemany()
executemany() | create_dataframe().write.save_as_table()
create_dataframe().write.save_as_table() |
| 读取方式 | fetchall()
fetchall() / fetchmany()
fetchmany() | to_pandas()
to_pandas() / show()
show() |
| 依赖 | clickzetta-connector
clickzetta-connector (内置) | clickzetta-zettapark-python
clickzetta-zettapark-python (内置) |
两者可以在同一个 Python 任务里混用:用 connector 写入原始数据,用 ZettaPark 做聚合分析。
相关文档