Zettapark Volume 与文件操作
Zettapark 提供完整的 Volume 文件操作能力,支持上传、下载、读取和导出,将文件处理与 DataFrame 操作无缝衔接。
前置准备
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.clickzetta.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
Volume 路径格式:
volume://volume_name/path/to/file
volume://volume_name/path/to/file
两种 Volume 的能力对比:
| 操作 | Named Volume(内部) | External Volume(OSS/S3/COS) |
|---|
session.file.put
session.file.put 上传 | ✅(上传后需 REFRESH) | ✅ |
session.file.get
session.file.get 下载 | ✅ | ✅ |
session.file.list_
session.file.list_ 列目录 | ✅ | ✅ |
session.file.delete
session.file.delete 删除 | ✅ | ✅ |
session.read.csv/parquet/json
session.read.csv/parquet/json | ✅(上传后需 REFRESH) | ✅ |
df.write.copy_into_volume
df.write.copy_into_volume 导出 | ✅ | ✅ |
本文示例使用 Named Volume(内部 Volume),无需挂载外部存储,直接创建即可使用:
创建 Named Volume(一次性操作)
session.sql("CREATE VOLUME IF NOT EXISTS my_named_vol").collect()
⚠️
session.file.put
session.file.put
上传文件后,需执行
ALTER VOLUME my_named_vol REFRESH
ALTER VOLUME my_named_vol REFRESH
刷新目录索引,
session.read
session.read
系列方法才能读到新文件。
💡 如需挂载已有的 OSS/S3/COS 对象存储,使用 External Volume,详见 外部 Volume。
文件上传与下载
上传本地文件到 Volume
上传单个文件
result = session.file.put(
"/local/path/data.csv",
"volume://my_named_vol/data/data.csv",
auto_compress=False
)
print(result)
[PutResult(source='/local/path/data.csv', target='/data/data.csv', source_size=1024, target_size=1024)]
上传后刷新目录索引,session.read 才能读到新文件
session.sql("ALTER VOLUME my_named_vol REFRESH").collect()
下载 Volume 文件到本地
result = session.file.get(
"volume://my_named_vol/data/data.csv",
"/local/output/"
)
print(result)
[GetResult(file='data.csv', size=1024)]
列出 Volume 目录
files = session.file.list_("volume://my_named_vol/data/")
for f in files:
print(f.file, f.size)
data/orders.csv 2048
data/users.csv 1024
删除 Volume 文件
删除单个文件
session.file.delete("volume://my_named_vol/data/old_file.csv")
删除目录(删除目录下所有文件)
session.file.delete("volume://my_named_vol/archive/")
从 Volume 读取数据
读取 CSV
df = session.read \
.option("header", True) \
.option("infer_schema", True) \
.csv("volume://my_named_vol/data/orders.csv")
df.show()
df.printSchema()
读取 Parquet
读取目录下所有 Parquet 文件
df = session.read.parquet("volume://my_named_vol/data/parquet_dir/")
df.show()
读取 JSON(NDJSON 格式,每行一个 JSON 对象)
df = session.read.json("volume://my_named_vol/data/events.json")
df.show()
将 DataFrame 导出到 Volume
Named Volume 和 External Volume 均支持导出。
导出为 CSV
df = session.table("orders")
df.write.copy_into_volume(
"volume://my_named_vol/export/orders/",
file_format_type="csv",
header=True
)
导出为 Parquet
df.write.copy_into_volume(
"volume://my_named_vol/export/orders_parquet/",
file_format_type="parquet"
)
完整示例:ETL 流水线
从 Volume 读取原始文件,处理后写入表,再将结果导出回 Volume:
Step 1:上传原始数据到 Volume
import csv
raw_data = [
["order_id", "user_id", "product", "amount", "status"],
[1001, 101, "iPhone", 7999.0, "paid"],
[1002, 102, "MacBook", 14999.0, "paid"],
[1003, 101, "AirPods", 1799.0, "pending"],
]
with open("/tmp/raw_orders.csv", "w", newline="") as f:
csv.writer(f).writerows(raw_data)
session.file.put(
"/tmp/raw_orders.csv",
"volume://my_named_vol/raw/orders.csv",
auto_compress=False
)
刷新目录索引,确保 session.read 能读到新上传的文件
session.sql("ALTER VOLUME my_named_vol REFRESH").collect()
Step 2:从 Volume 读取,做数据清洗
df = session.read \
.option("header", True) \
.option("infer_schema", True) \
.csv("volume://my_named_vol/raw/orders.csv")
过滤、转换
paid_df = df.filter(F.col("status") == "paid") \
.with_column("amount_with_tax", F.col("amount") * 1.13)
Step 3:写入 Lakehouse 表
paid_df.write.save_as_table("paid_orders", mode="overwrite")
print(f"写入 {paid_df.count()} 条记录")
Step 4:将处理结果导出回 Volume
session.table("paid_orders") \
.write.copy_into_volume(
"volume://my_named_vol/processed/paid_orders/",
file_format_type="parquet"
)
验证导出结果
files = session.file.list_("volume://my_named_vol/processed/paid_orders/")
print(f"导出文件: {[f.file for f in files]}")
Named Volume 操作
Named Volume 是内部存储,无需外部依赖,适合临时存储和导出场景。
创建 Named Volume
session.sql("CREATE VOLUME IF NOT EXISTS my_named_vol").collect()
上传文件(使用 SQL PUT 命令)
session.sql("PUT '/local/path/file.csv' TO VOLUME my_named_vol FILE 'data/file.csv'").collect()
列出文件
files = session.file.list_("volume://my_named_vol/")
for f in files:
print(f.file, f.size)
下载文件
session.file.get("volume://my_named_vol/data/file.csv", "/local/output/")
删除文件
session.file.delete("volume://my_named_vol/data/file.csv")
导出 DataFrame 到 Named Volume
df.write.copy_into_volume(
"volume://my_named_vol/export/",
file_format_type="csv",
header=True
)
User Volume 操作
User Volume 是每个用户的个人存储空间,
session.file
session.file
方法不支持,需通过 SQL 命令操作:
上传到 User Volume
session.sql("PUT '/local/path/file.csv' TO USER VOLUME FILE 'subdir/file.csv'").collect()
查看 User Volume 文件列表
files = session.sql("SHOW USER VOLUME DIRECTORY").collect()
for f in files:
print(f["relative_path"], f["size"])
从 User Volume 读取(通过 SQL SELECT FROM VOLUME)
df = session.sql("""
SELECT * FROM USER VOLUME
USING CSV
OPTIONS('header'='true')
FILES('subdir/file.csv')
""")
df.show()
下载 User Volume 文件
session.sql("GET USER VOLUME FILE 'subdir/file.csv' TO '/local/output/'").collect()
注意事项
- 路径格式:Volume 路径使用
volume://volume_name/path
volume://volume_name/path
,不支持 @vol_name
@vol_name
或相对路径
- 上传后需 REFRESH:
session.file.put
session.file.put
上传文件后,需执行 ALTER VOLUME name REFRESH
ALTER VOLUME name REFRESH
刷新目录索引,session.read
session.read
系列方法才能读到新文件
- 新建 Volume 需等待:刚创建的 Named Volume 需等待约 1 分钟初始化完成后再上传和读取文件
- Parquet 读取:传入目录路径(以
/
/
结尾),会读取目录下所有 Parquet 文件
- 导出路径:
copy_into_volume
copy_into_volume
目标路径建议以 /
/
结尾,文件名由系统自动生成(如 part00001.csv
part00001.csv
)
相关文档