Python Connector 使用示例

本文提供

clickzetta-connector
clickzetta-connector
在常见业务场景下的完整可运行示例。

连接配置和基础用法见 ClickZetta Connector Python SDK


前置:建立连接

以下所有示例共用此连接:

from clickzetta import connect import datetime, csv conn = connect( username='your_username', password='your_password', service='cn-shanghai-alicloud.api.clickzetta.com', instance='your_instance', workspace='your_workspace', schema='public', vcluster='default' ) cursor = conn.cursor()


场景1:批量写入订单数据

建表:

cursor.execute(''' CREATE TABLE IF NOT EXISTS orders ( order_id BIGINT, user_id BIGINT, product STRING, amount DECIMAL(10, 2), status STRING, created_at TIMESTAMP ) ''')

批量插入:

data = [ (1001, 101, 'iPhone 15', 7999.00, 'paid', datetime.datetime(2024, 1, 15, 10, 30, 0)), (1002, 102, 'MacBook Pro', 14999.00, 'paid', datetime.datetime(2024, 1, 15, 11, 0, 0)), (1003, 101, 'AirPods Pro', 1799.00, 'pending', datetime.datetime(2024, 1, 15, 14, 20, 0)), (1004, 103, 'iPad Pro', 8999.00, 'paid', datetime.datetime(2024, 1, 16, 9, 0, 0)), (1005, 102, 'Apple Watch', 3299.00, 'cancelled', datetime.datetime(2024, 1, 16, 10, 0, 0)), ] cursor.executemany( 'INSERT INTO orders VALUES (?, ?, ?, ?, ?, ?)', data ) print(f"写入 {len(data)} 条记录")


场景2:条件查询与结果处理

cursor.execute(""" SELECT order_id, product, amount FROM orders WHERE status = 'paid' ORDER BY amount DESC """) rows = cursor.fetchall() for row in rows: print(f"订单 {row[0]}: {row[1]} - ¥{row[2]}")

输出:

订单 1002: MacBook Pro - ¥14999.00 订单 1004: iPad Pro - ¥8999.00 订单 1001: iPhone 15 - ¥7999.00

cursor.description
cursor.description
获取列名:

cursor.execute("SELECT * FROM orders LIMIT 1") col_names = [col[0] for col in cursor.description] print(col_names)

输出:

['order_id', 'user_id', 'product', 'amount', 'status', 'created_at']


场景3:聚合统计

cursor.execute(""" SELECT user_id, COUNT(*) AS order_count, SUM(amount) AS total_amount, MAX(amount) AS max_order FROM orders WHERE status = 'paid' GROUP BY user_id ORDER BY total_amount DESC """) rows = cursor.fetchall() for row in rows: print(f"用户 {row[0]}: {row[1]} 笔订单,合计 ¥{row[2]},最大单笔 ¥{row[3]}")


场景4:分批获取大结果集

查询结果较大时,用

fetchmany(size)
fetchmany(size)
分批处理,避免内存溢出:

cursor.execute("SELECT * FROM orders ORDER BY order_id") batch_size = 1000 while True: batch = cursor.fetchmany(batch_size) if not batch: break # 处理这批数据 print(f"处理 {len(batch)} 行...") for row in batch: pass # 你的处理逻辑


场景5:导出查询结果到 CSV

cursor.execute("SELECT * FROM orders WHERE status = 'paid'") rows = cursor.fetchall() col_names = [col[0] for col in cursor.description] with open('paid_orders.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(col_names) # 写表头 writer.writerows(rows) # 写数据 print(f"已导出 {len(rows)} 行到 paid_orders.csv")


场景6:UPDATE 和 DELETE

cursor.execute("UPDATE orders SET status = 'shipped' WHERE order_id = 1001") cursor.execute("DELETE FROM orders WHERE status = 'cancelled'") cursor.execute("SELECT COUNT(*) FROM orders") count = cursor.fetchone()[0] print(f"当前订单数: {count}") # 4


场景7:异步执行长查询

适合数据量大、执行时间长的查询,避免阻塞主线程:

import time cursor.execute_async(""" SELECT status, COUNT(*) AS cnt, SUM(amount) AS total FROM orders GROUP BY status """) while not cursor.is_job_finished(): print("查询执行中...") time.sleep(1) results = cursor.fetchall() for row in results: print(f"状态={row[0]}, 数量={row[1]}, 金额={row[2]}")


场景8:使用 SQL Hints 控制执行行为

设置查询超时(秒):

params = {'hints': {'sdk.job.timeout': 60}} cursor.execute('SELECT count(*) FROM large_table', parameters=params)

设置并发度:

params = {'hints': {'sdk.job.timeout': 120, 'cz.sql.shuffle.partitions': '200'}} cursor.execute('SELECT * FROM large_table GROUP BY category', parameters=params)

支持的 hints 参数见 参数管理


注意事项

  • TIMESTAMP 返回带时区:读取 TIMESTAMP 列时,返回值包含
    tzinfo=Asia/Shanghai
    tzinfo=Asia/Shanghai
    ,处理时注意时区转换
  • Decimal 类型:DECIMAL 列返回 Python
    Decimal
    Decimal
    对象,不是
    float
    float
    ,直接参与计算时注意精度
  • 不支持事务
    commit()
    commit()
    rollback()
    rollback()
    接口不支持,每条 SQL 自动提交
  • 大结果集:避免
    fetchall()
    fetchall()
    拉取超大结果集,改用
    fetchmany(size)
    fetchmany(size)
    分批处理

相关文档

文档说明
Python Connector SDK安装、连接参数、完整 API 参考
ZettaparkPython DataFrame API,类 pandas 操作
BulkLoad 批量导入百万行级高速写入
参数管理SQL hints 参数参考
联系我们
预约咨询
微信咨询
电话咨询