Python Connector 高级用法

本文介绍

clickzetta-connector
clickzetta-connector
的高级功能。基础用法见 Python Connector SDK


前置:建立连接

from clickzetta import connect import datetime conn = connect( username='your_username', password='your_password', service='cn-shanghai-alicloud.api.clickzetta.com', instance='your_instance', workspace='your_workspace', schema='public', vcluster='default' ) cursor = conn.cursor()


fetch_pandas — 直接返回 DataFrame

查询结果直接转为 pandas DataFrame,无需手动转换:

cursor.execute(""" SELECT user_id, SUM(amount) AS total FROM orders WHERE status = 'paid' GROUP BY user_id """) df = cursor.fetch_pandas() print(df.dtypes) print(df.sort_values('total', ascending=False)) high_value = df[df['total'] > 10000] print(f"高价值用户数: {len(high_value)}")


arraysize — 控制 fetchmany 默认批次大小

cursor.arraysize
cursor.arraysize
控制不传
size
size
参数时
fetchmany()
fetchmany()
每次返回的行数,默认值为 1:

cursor.arraysize = 1000 # 每批 1000 行 cursor.execute("SELECT * FROM large_table ORDER BY id") batch_num = 0 while True: batch = cursor.fetchmany() # 每次取 arraysize 行 if not batch: break batch_num += 1 print(f"处理第 {batch_num} 批,{len(batch)} 行") # 你的处理逻辑


get_job_id — 获取查询的 Job ID

每次执行 SQL 后,可以获取对应的 Job ID,用于后续追踪或取消:

cursor.execute("SELECT COUNT(*) FROM orders") result = cursor.fetchall() job_id = cursor.get_job_id() print(f"Job ID: {job_id}")

输出示例:

2026052700200276167224955
2026052700200276167224955


cancel — 取消正在执行的查询

对异步查询,可以在执行过程中取消:

cursor.execute_async("SELECT * FROM very_large_table") job_id = cursor.get_job_id() cursor.cancel(job_id) print(f"已取消 Job: {job_id}")


get_job_profile — 查询性能分析

执行完查询后,获取详细的性能指标,用于排查慢查询:

cursor.execute(""" SELECT status, COUNT(*) AS cnt, SUM(amount) AS total FROM orders GROUP BY status """) cursor.fetchall() job_id = cursor.get_job_id() profile = conn.get_job_profile(job_id) stats = profile.get('jobSummary', {}).get('stats', {}).get('inputOutputStats', {}) print(f"扫描行数: {stats.get('inputRowCount')}") print(f"输出行数: {stats.get('outputRowCount')}") print(f"读取字节: {stats.get('inputBytes')}") print(f"缓存命中: {stats.get('inputCacheBytes')}") print(f"磁盘读取: {stats.get('inputDiskBytes')}")

缓存命中率 =

inputCacheBytes / inputBytes
inputCacheBytes / inputBytes
,越高说明数据已在计算集群本地缓存,查询越快。


get_full_sql_with_params — 调试参数替换

查看参数绑定后的完整 SQL,用于调试:

sql = "SELECT * FROM orders WHERE status = ? AND amount > ?" params = ('paid', 1000) full_sql = conn.get_full_sql_with_params(sql, params) print(full_sql)

输出:

SELECT * FROM orders WHERE status=paid AND amount>1000


动态切换 Schema

在同一连接中切换默认 Schema,后续查询无需写完整的

schema.table
schema.table

cursor.execute("USE SCHEMA ods") cursor.execute("SELECT * FROM raw_orders LIMIT 10") # 等价于 ods.raw_orders cursor.execute("USE SCHEMA ads") cursor.execute("SELECT * FROM order_summary LIMIT 10") # 等价于 ads.order_summary


rowcount — 获取影响行数

DML 操作(INSERT / UPDATE / DELETE)执行后,通过

rowcount
rowcount
获取影响的行数:

cursor.execute("UPDATE orders SET status = 'shipped' WHERE status = 'paid'") print(f"更新了 {cursor.rowcount} 行") cursor.execute("DELETE FROM orders WHERE status = 'cancelled'") print(f"删除了 {cursor.rowcount} 行")


完整示例:ETL 数据处理流水线

结合以上高级功能,实现一个完整的 ETL 流程:

from clickzetta import connect import datetime, time conn = connect( username='your_username', password='your_password', service='cn-shanghai-alicloud.api.clickzetta.com', instance='your_instance', workspace='your_workspace', schema='public', vcluster='default' ) cursor = conn.cursor() print("Step 1: 写入原始数据...") cursor.execute("USE SCHEMA ods") raw_data = [ (1001, 101, 9999.00, 'paid', datetime.date(2024, 1, 15)), (1002, 102, 4999.00, 'paid', datetime.date(2024, 1, 15)), (1003, 103, 299.00, 'pending', datetime.date(2024, 1, 15)), ] cursor.executemany( 'INSERT INTO raw_orders (order_id, user_id, amount, status, created_at) VALUES (?,?,?,?,?)', raw_data ) print(f" 写入 {len(raw_data)} 条原始数据") print("Step 2: 异步聚合计算...") cursor.execute("USE SCHEMA dws") cursor.execute_async(""" INSERT INTO order_stats SELECT created_at AS stat_date, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM ods.raw_orders WHERE status = 'paid' GROUP BY created_at """) job_id = cursor.get_job_id() while not cursor.is_job_finished(): time.sleep(1) print(f" 聚合完成,Job ID: {job_id}") profile = conn.get_job_profile(job_id) stats = profile.get('jobSummary', {}).get('stats', {}).get('inputOutputStats', {}) input_bytes = int(stats.get('inputBytes', 0)) cache_bytes = int(stats.get('inputCacheBytes', 0)) cache_ratio = cache_bytes / input_bytes * 100 if input_bytes > 0 else 0 print(f" 缓存命中率: {cache_ratio:.1f}%") cursor.execute("USE SCHEMA dws") cursor.execute("SELECT * FROM order_stats ORDER BY stat_date DESC LIMIT 7") df = cursor.fetch_pandas() print(f"\nStep 4: 最近 7 天汇总:\n{df}") cursor.close() conn.close()


相关文档

文档说明
Python Connector SDK安装、连接参数、基础 API
Python Connector 使用示例常见业务场景示例
ZettaparkPython DataFrame API,类 pandas 操作
BulkLoad 批量导入百万行级高速写入
联系我们
预约咨询
微信咨询
电话咨询