cursor.execute("""
SELECT user_id, SUM(amount) AS total
FROM orders
WHERE status = 'paid'
GROUP BY user_id
""")
df = cursor.fetch_pandas()
print(df.dtypes)
print(df.sort_values('total', ascending=False))
high_value = df[df['total'] > 10000]
print(f"高价值用户数: {len(high_value)}")
⚠️ 需要安装 pandas:
pip install pandas
pip install pandas
TIMESTAMP 列返回
datetime64[us, UTC]
datetime64[us, UTC]
类型(UTC 时区),处理时注意时区转换。
arraysize — 控制 fetchmany 默认批次大小
cursor.arraysize
cursor.arraysize
控制不传
size
size
参数时
fetchmany()
fetchmany()
每次返回的行数,默认值为 1:
cursor.arraysize = 1000 # 每批 1000 行
cursor.execute("SELECT * FROM large_table ORDER BY id")
batch_num = 0
while True:
batch = cursor.fetchmany() # 每次取 arraysize 行
if not batch:
break
batch_num += 1
print(f"处理第 {batch_num} 批,{len(batch)} 行")
# 你的处理逻辑
get_job_id — 获取查询的 Job ID
每次执行 SQL 后,可以获取对应的 Job ID,用于后续追踪或取消:
cursor.execute("SELECT COUNT(*) FROM orders")
result = cursor.fetchall()
job_id = cursor.get_job_id()
print(f"Job ID: {job_id}")
cursor.execute("UPDATE orders SET status = 'shipped' WHERE status = 'paid'")
print(f"更新了 {cursor.rowcount} 行")
cursor.execute("DELETE FROM orders WHERE status = 'cancelled'")
print(f"删除了 {cursor.rowcount} 行")
完整示例:ETL 数据处理流水线
结合以上高级功能,实现一个完整的 ETL 流程:
from clickzetta import connect
import datetime, time
conn = connect(
username='your_username',
password='your_password',
service='cn-shanghai-alicloud.api.clickzetta.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
cursor = conn.cursor()
print("Step 1: 写入原始数据...")
cursor.execute("USE SCHEMA ods")
raw_data = [
(1001, 101, 9999.00, 'paid', datetime.date(2024, 1, 15)),
(1002, 102, 4999.00, 'paid', datetime.date(2024, 1, 15)),
(1003, 103, 299.00, 'pending', datetime.date(2024, 1, 15)),
]
cursor.executemany(
'INSERT INTO raw_orders (order_id, user_id, amount, status, created_at) VALUES (?,?,?,?,?)',
raw_data
)
print(f" 写入 {len(raw_data)} 条原始数据")
print("Step 2: 异步聚合计算...")
cursor.execute("USE SCHEMA dws")
cursor.execute_async("""
INSERT INTO order_stats
SELECT
created_at AS stat_date,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM ods.raw_orders
WHERE status = 'paid'
GROUP BY created_at
""")
job_id = cursor.get_job_id()
while not cursor.is_job_finished():
time.sleep(1)
print(f" 聚合完成,Job ID: {job_id}")
profile = conn.get_job_profile(job_id)
stats = profile.get('jobSummary', {}).get('stats', {}).get('inputOutputStats', {})
input_bytes = int(stats.get('inputBytes', 0))
cache_bytes = int(stats.get('inputCacheBytes', 0))
cache_ratio = cache_bytes / input_bytes * 100 if input_bytes > 0 else 0
print(f" 缓存命中率: {cache_ratio:.1f}%")
cursor.execute("USE SCHEMA dws")
cursor.execute("SELECT * FROM order_stats ORDER BY stat_date DESC LIMIT 7")
df = cursor.fetch_pandas()
print(f"\nStep 4: 最近 7 天汇总:\n{df}")
cursor.close()
conn.close()