Zettapark Volume 与文件操作

Zettapark 提供完整的 Volume 文件操作能力,支持上传、下载、读取和导出,将文件处理与 DataFrame 操作无缝衔接。


前置准备

from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()

Volume 路径格式

volume://volume_name/path/to/file
volume://volume_name/path/to/file

两种 Volume 的能力对比:

操作Named Volume(内部)External Volume(OSS/S3/COS)
session.file.put
session.file.put
上传
✅(上传后需 REFRESH)
session.file.get
session.file.get
下载
session.file.list_
session.file.list_
列目录
session.file.delete
session.file.delete
删除
session.read.csv/parquet/json
session.read.csv/parquet/json
✅(上传后需 REFRESH)
df.write.copy_into_volume
df.write.copy_into_volume
导出

本文示例使用 Named Volume(内部 Volume),无需挂载外部存储,直接创建即可使用:

创建 Named Volume(一次性操作)

session.sql("CREATE VOLUME IF NOT EXISTS my_named_vol").collect()


文件上传与下载

上传本地文件到 Volume

上传单个文件

result = session.file.put( "/local/path/data.csv", "volume://my_named_vol/data/data.csv", auto_compress=False ) print(result)

[PutResult(source='/local/path/data.csv', target='/data/data.csv', source_size=1024, target_size=1024)]

上传后刷新目录索引,session.read 才能读到新文件

session.sql("ALTER VOLUME my_named_vol REFRESH").collect()

下载 Volume 文件到本地

result = session.file.get( "volume://my_named_vol/data/data.csv", "/local/output/" ) print(result)

[GetResult(file='data.csv', size=1024)]

列出 Volume 目录

files = session.file.list_("volume://my_named_vol/data/") for f in files: print(f.file, f.size)

data/orders.csv 2048 data/users.csv 1024

删除 Volume 文件

删除单个文件

session.file.delete("volume://my_named_vol/data/old_file.csv")

删除目录(删除目录下所有文件)

session.file.delete("volume://my_named_vol/archive/")


从 Volume 读取数据

读取 CSV

df = session.read \ .option("header", True) \ .option("infer_schema", True) \ .csv("volume://my_named_vol/data/orders.csv") df.show() df.printSchema()

读取 Parquet

读取目录下所有 Parquet 文件

df = session.read.parquet("volume://my_named_vol/data/parquet_dir/") df.show()

读取 JSON(NDJSON 格式,每行一个 JSON 对象)

df = session.read.json("volume://my_named_vol/data/events.json") df.show()


将 DataFrame 导出到 Volume

Named Volume 和 External Volume 均支持导出。

导出为 CSV

df = session.table("orders") df.write.copy_into_volume( "volume://my_named_vol/export/orders/", file_format_type="csv", header=True )

导出为 Parquet

df.write.copy_into_volume( "volume://my_named_vol/export/orders_parquet/", file_format_type="parquet" )


完整示例:ETL 流水线

从 Volume 读取原始文件,处理后写入表,再将结果导出回 Volume:

Step 1:上传原始数据到 Volume

import csv raw_data = [ ["order_id", "user_id", "product", "amount", "status"], [1001, 101, "iPhone", 7999.0, "paid"], [1002, 102, "MacBook", 14999.0, "paid"], [1003, 101, "AirPods", 1799.0, "pending"], ] with open("/tmp/raw_orders.csv", "w", newline="") as f: csv.writer(f).writerows(raw_data) session.file.put( "/tmp/raw_orders.csv", "volume://my_named_vol/raw/orders.csv", auto_compress=False )

刷新目录索引,确保 session.read 能读到新上传的文件

session.sql("ALTER VOLUME my_named_vol REFRESH").collect()

Step 2:从 Volume 读取,做数据清洗

df = session.read \ .option("header", True) \ .option("infer_schema", True) \ .csv("volume://my_named_vol/raw/orders.csv")

过滤、转换

paid_df = df.filter(F.col("status") == "paid") \ .with_column("amount_with_tax", F.col("amount") * 1.13)

Step 3:写入 Lakehouse 表

paid_df.write.save_as_table("paid_orders", mode="overwrite") print(f"写入 {paid_df.count()} 条记录")

Step 4:将处理结果导出回 Volume

session.table("paid_orders") \ .write.copy_into_volume( "volume://my_named_vol/processed/paid_orders/", file_format_type="parquet" )

验证导出结果

files = session.file.list_("volume://my_named_vol/processed/paid_orders/") print(f"导出文件: {[f.file for f in files]}")


Named Volume 操作

Named Volume 是内部存储,无需外部依赖,适合临时存储和导出场景。

创建 Named Volume

session.sql("CREATE VOLUME IF NOT EXISTS my_named_vol").collect()

上传文件(使用 SQL PUT 命令)

session.sql("PUT '/local/path/file.csv' TO VOLUME my_named_vol FILE 'data/file.csv'").collect()

列出文件

files = session.file.list_("volume://my_named_vol/") for f in files: print(f.file, f.size)

下载文件

session.file.get("volume://my_named_vol/data/file.csv", "/local/output/")

删除文件

session.file.delete("volume://my_named_vol/data/file.csv")

导出 DataFrame 到 Named Volume

df.write.copy_into_volume( "volume://my_named_vol/export/", file_format_type="csv", header=True )


User Volume 操作

User Volume 是每个用户的个人存储空间,

session.file
session.file
方法不支持,需通过 SQL 命令操作:

上传到 User Volume

session.sql("PUT '/local/path/file.csv' TO USER VOLUME FILE 'subdir/file.csv'").collect()

查看 User Volume 文件列表

files = session.sql("SHOW USER VOLUME DIRECTORY").collect() for f in files: print(f["relative_path"], f["size"])

从 User Volume 读取(通过 SQL SELECT FROM VOLUME)

df = session.sql(""" SELECT * FROM USER VOLUME USING CSV OPTIONS('header'='true') FILES('subdir/file.csv') """) df.show()

下载 User Volume 文件

session.sql("GET USER VOLUME FILE 'subdir/file.csv' TO '/local/output/'").collect()


注意事项

  • 路径格式:Volume 路径使用
    volume://volume_name/path
    volume://volume_name/path
    ,不支持
    @vol_name
    @vol_name
    或相对路径
  • 上传后需 REFRESH
    session.file.put
    session.file.put
    上传文件后,需执行
    ALTER VOLUME name REFRESH
    ALTER VOLUME name REFRESH
    刷新目录索引,
    session.read
    session.read
    系列方法才能读到新文件
  • 新建 Volume 需等待:刚创建的 Named Volume 需等待约 1 分钟初始化完成后再上传和读取文件
  • Parquet 读取:传入目录路径(以
    /
    /
    结尾),会读取目录下所有 Parquet 文件
  • 导出路径
    copy_into_volume
    copy_into_volume
    目标路径建议以
    /
    /
    结尾,文件名由系统自动生成(如
    part00001.csv
    part00001.csv

相关文档

文档说明
外部 Volume挂载 OSS/S3/COS 创建 External Volume
内部 VolumeNamed Volume 和 User Volume 说明
Zettapark DataFrame API 指南DataFrame 操作完整参考
Zettapark 数据工程实战多表 join、窗口函数等场景
COPY INTOSQL 方式从 Volume 导入数据
联系我们
预约咨询
微信咨询
电话咨询