Zettapark 常用函数参考
本文列出 Zettapark
functions使用前导入:
from clickzetta.zettapark import functions as F from clickzetta.zettapark.session import Session session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.clickzetta.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()
各章节示例使用
create_dataframe字符串函数
创建示例数据
df = session.create_dataframe( [(1, "Alice"), (2, " hello world "), (3, "bob")], schema=["id", "name"] ) F.upper(F.col("name")) # 转大写 F.lower(F.col("name")) # 转小写 F.trim(F.col("name")) # 去除首尾空格 F.length(F.col("name")) # 字符串长度 F.substr(F.col("name"), 1, 3) # 截取子串(起始位置从1开始) F.replace(F.col("name"), "old", "new") # 替换字符串 F.concat(F.lit("Hello, "), F.col("name")) # 拼接字符串 F.concat_ws("-", F.col("a"), F.col("b")) # 用分隔符拼接 F.regexp_replace(F.col("name"), r"\s+", "_") # 正则替换 F.lpad(F.col("name"), 10, " ") # 左填充 F.rpad(F.col("name"), 10, " ") # 右填充 F.ltrim(F.col("name")) # 去除左侧空格 F.rtrim(F.col("name")) # 去除右侧空格 F.initcap(F.col("name")) # 首字母大写 F.reverse(F.col("name")) # 反转字符串 F.repeat(F.col("name"), 3) # 重复 N 次 F.startswith(F.col("name"), "A") # 是否以指定字符串开头 F.endswith(F.col("name"), "z") # 是否以指定字符串结尾 F.contains(F.col("name"), "abc") # 是否包含子串 F.charindex("abc", F.col("name")) # 子串位置(从1开始,不存在返回0) F.md5(F.col("name")) # MD5 哈希 F.sha1(F.col("name")) # SHA1 哈希
示例:
data = [(1, " hello world "), (2, "Alice"), (3, "bob")] df = session.create_dataframe(data, schema=["id", "name"]) df.select( F.col("id"), F.upper(F.col("name")).alias("upper"), F.trim(F.col("name")).alias("trim"), F.length(F.trim(F.col("name"))).alias("len"), F.replace(F.col("name"), "world", "Lakehouse").alias("replaced"), F.regexp_replace(F.col("name"), r"\s+", "_").alias("no_space"), ).show()
+---+---------------+------------+---+------------------+---------------+ | id| upper| trim|len| replaced| no_space| +---+---------------+------------+---+------------------+---------------+ | 1| HELLO WORLD | hello world| 11| hello Lakehouse |_hello_world_ | | 2| ALICE| Alice| 5| Alice| Alice| | 3| BOB| bob| 3| bob| bob| +---+---------------+------------+---+------------------+---------------+
数值函数
F.round(F.col("price"), 2) # 四舍五入,保留2位小数 F.ceil(F.col("price")) # 向上取整 F.floor(F.col("price")) # 向下取整 F.abs(F.col("amount")) # 绝对值 F.sqrt(F.col("value")) # 平方根 F.pow(F.col("base"), F.col("exp")) # 幂运算 F.log(F.lit(10.0), F.col("value")) # 对数 F.exp(F.col("value")) # e 的幂 F.sign(F.col("amount")) # 符号(-1/0/1) F.greatest(F.col("a"), F.col("b")) # 多列最大值 F.least(F.col("a"), F.col("b")) # 多列最小值 F.div0(F.col("a"), F.col("b")) # 除法(除数为0时返回0,不报错)
示例:
data = [(1, 3.14159, 100, 200), (2, -2.718, 50, 300)] df = session.create_dataframe(data, schema=["id", "num", "a", "b"]) df.select( F.round(F.col("num"), 2).alias("round"), F.ceil(F.col("num")).alias("ceil"), F.floor(F.col("num")).alias("floor"), F.abs(F.col("num")).alias("abs"), F.greatest(F.col("a"), F.col("b")).alias("max_ab"), F.least(F.col("a"), F.col("b")).alias("min_ab"), ).show()
+-----+----+-----+-------+------+------+ |round|ceil|floor| abs|max_ab|min_ab| +-----+----+-----+-------+------+------+ | 3.14| 4| 3|3.14159| 200| 100| |-2.72| -2| -3| 2.718| 300| 50| +-----+----+-----+-------+------+------+
日期和时间函数
F.current_date() # 当前日期 F.current_timestamp() # 当前时间戳 F.to_date(F.col("dt_str")) # 字符串转日期 F.to_timestamp(F.col("ts_str")) # 字符串转时间戳 F.year(F.col("dt")) # 提取年 F.month(F.col("dt")) # 提取月 F.dayofmonth(F.col("dt")) # 提取日 F.dayofweek(F.col("dt")) # 星期几(1=周日) F.hour(F.col("ts")) # 提取小时 F.minute(F.col("ts")) # 提取分钟 F.second(F.col("ts")) # 提取秒 F.date_add(F.col("dt"), 7) # 日期加 N 天 F.date_sub(F.col("dt"), 7) # 日期减 N 天 F.add_months(F.col("dt"), 1) # 日期加 N 月 F.datediff(F.col("end_dt"), F.col("start_dt")) # 日期差(天数) F.date_format(F.col("dt"), "yyyy/MM/dd") # 日期格式化 F.last_day(F.col("dt")) # 当月最后一天 F.months_between(F.col("dt1"), F.col("dt2")) # 月份差 F.unix_timestamp(F.col("ts")) # 转 Unix 时间戳(秒) F.from_unixtime(F.col("ts_sec")) # Unix 时间戳转字符串
示例:
data = [("2024-01-15",), ("2024-03-31",)] df = session.create_dataframe(data, schema=["dt_str"]) df.select( F.to_date(F.col("dt_str")).alias("date"), F.year(F.to_date(F.col("dt_str"))).alias("year"), F.month(F.to_date(F.col("dt_str"))).alias("month"), F.date_add(F.to_date(F.col("dt_str")), 7).alias("plus_7d"), F.date_format(F.to_date(F.col("dt_str")), "yyyy/MM/dd").alias("formatted"), F.last_day(F.to_date(F.col("dt_str"))).alias("last_day"), F.add_months(F.to_date(F.col("dt_str")), 1).alias("next_month"), ).show()
+----------+----+-----+----------+-----------+----------+----------+ | date|year|month| plus_7d| formatted| last_day|next_month| +----------+----+-----+----------+-----------+----------+----------+ |2024-01-15|2024| 1|2024-01-22|2024/01/15 |2024-01-31|2024-02-15| |2024-03-31|2024| 3|2024-04-07|2024/03/31 |2024-03-31|2024-04-30| +----------+----+-----+----------+-----------+----------+----------+
条件函数
when / otherwise — 多条件分支(类似 CASE WHEN)
F.when(F.col("score") >= 90, "A") \ .when(F.col("score") >= 80, "B") \ .when(F.col("score") >= 60, "C") \ .otherwise("F")
iff — 简单二元条件(类似三元运算符)
F.iff(F.col("amount") > 0, "正数", "非正数")
coalesce — 返回第一个非 NULL 值
F.coalesce(F.col("value"), F.col("default_value"), F.lit(0))
is_null / is_not_null
F.is_null(F.col("name"))
示例:
data = [(1, 95), (2, 82), (3, 67), (4, 45)] df = session.create_dataframe(data, schema=["id", "score"]) df.select( F.col("id"), F.col("score"), F.when(F.col("score") >= 90, "A") .when(F.col("score") >= 80, "B") .when(F.col("score") >= 60, "C") .otherwise("F").alias("grade"), F.iff(F.col("score") >= 60, "pass", "fail").alias("result"), ).show()
+---+-----+-----+------+ | id|score|grade|result| +---+-----+-----+------+ | 1| 95| A| pass| | 2| 82| B| pass| | 3| 67| C| pass| | 4| 45| F| fail| +---+-----+-----+------+
聚合函数
F.count(F.col("id")) # 计数(不含 NULL) F.count(F.lit(1)) # 总行数 F.count_distinct(F.col("user_id")) # 去重计数 F.sum(F.col("amount")) # 求和 F.avg(F.col("amount")) # 平均值 F.max(F.col("amount")) # 最大值 F.min(F.col("amount")) # 最小值 F.stddev(F.col("amount")) # 标准差 F.variance(F.col("amount")) # 方差 F.median(F.col("amount")) # 中位数 F.approx_count_distinct(F.col("id")) # 近似去重计数(大数据量时更快) F.listagg(F.col("name"), ",") # 字符串聚合(类似 GROUP_CONCAT) F.any_value(F.col("name")) # 返回组内任意一个值
示例:
data = [(1,"A",100),(2,"A",200),(3,"B",300),(4,"B",150),(5,"A",50)] df = session.create_dataframe(data, schema=["id","category","amount"]) df.group_by("category").agg( F.count(F.col("id")).alias("cnt"), F.sum(F.col("amount")).alias("total"), F.avg(F.col("amount")).alias("avg"), F.max(F.col("amount")).alias("max"), F.min(F.col("amount")).alias("min"), ).show()
+--------+---+-----+-----+---+---+ |category|cnt|total| avg|max|min| +--------+---+-----+-----+---+---+ | A| 3| 350|116.7|200| 50| | B| 2| 450|225.0|300|150| +--------+---+-----+-----+---+---+
JSON 函数
F.get_json_object(F.col("data"), "$.name") # 提取 JSON 字段 F.get_json_object(F.col("data"), "$.addr.city") # 提取嵌套字段 F.parse_json(F.col("json_str")) # 解析 JSON 字符串 F.to_json(F.col("struct_col")) # 转为 JSON 字符串
示例:
data = [(1, '{"name":"Alice","age":30,"addr":{"city":"Beijing"}}')] df = session.create_dataframe(data, schema=["id", "data"]) df.select( F.get_json_object(F.col("data"), "$.name").alias("name"), F.get_json_object(F.col("data"), "$.age").alias("age"), F.get_json_object(F.col("data"), "$.addr.city").alias("city"), ).show()
+-----+---+-------+ | name|age| city| +-----+---+-------+ |Alice| 30|Beijing| +-----+---+-------+
其他常用函数
F.lit(42) # 字面量 F.col("name") # 引用列 F.expr("amount * 1.13") # 原生 SQL 表达式 F.cast(F.col("str_col"), "int") # 类型转换 F.try_cast(F.col("str_col"), "int") # 安全类型转换(失败返回 NULL) F.hash(F.col("id")) # 哈希值 F.random() # 随机数(0~1) F.monotonically_increasing_id() # 单调递增 ID(不保证连续) F.typeof(F.col("value")) # 返回列的数据类型名称
注意事项
- 始终用
F.col("列名")F.col("列名")引用列,避免直接传字符串(某些函数传字符串会被当作列值而非列名) F.split()F.split()在当前版本(0.1.5)有已知问题,建议用session.sql()session.sql()替代F.date_trunc()F.date_trunc()在当前版本有已知问题,建议用F.date_format()F.date_format()+F.to_date()F.to_date()组合替代
相关文档
| 文档 | 说明 |
|---|---|
| Zettapark DataFrame API 指南 | DataFrame 操作完整指南 |
| Zettapark 快速上手 | 安装和基础示例 |
| SQL 函数参考 | Lakehouse 内置 SQL 函数完整列表 |
