Shell 任务运行在 Studio 托管的 Linux Pod 中,每次执行启动一个新 Pod,执行完毕后销毁。
项目
说明
操作系统
Linux x86_64(内核 5.10)
运行用户
system_normal
system_normal
Python 版本
Python 3.10.0
预装 CLI 工具
python3
python3
、
curl
curl
、
wget
wget
、
awk
awk
、
sed
sed
、
grep
grep
、
find
find
、
tar
tar
、
gzip
gzip
预装 Python 包
clickzetta
clickzetta
、
clickzetta_dbutils
clickzetta_dbutils
、
pandas
pandas
、
requests
requests
、
boto3
boto3
、
oss2
oss2
💡 提示:Pod 销毁后环境不保留。如需安装额外包,在脚本开头用
pip install --target /home/system_normal <pkg>
pip install --target /home/system_normal <pkg>
安装,并在 Python 代码里
sys.path.append('/home/system_normal')
sys.path.append('/home/system_normal')
。
连接 Lakehouse:通过
clickzetta_dbutils
clickzetta_dbutils
获取连接,不需要硬编码凭据:
from clickzetta_dbutils import get_active_lakehouse_engine
from sqlalchemy import text
engine = get_active_lakehouse_engine(schema="your_schema")
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
场景:已有 shell 脚本接入调度
典型场景:团队有一批用
awk
awk
/
sed
sed
处理日志或 CSV 的 shell 脚本,想直接接入 Studio 调度体系,处理完后把结果写入 Lakehouse。
以下示例模拟一个常见模式:下载 CSV 文件 → 用
awk
awk
过滤清洗 → 用
python3
python3
写入 Lakehouse。
完整脚本
#!/bin/bash
# 任务参数:biz_date = $[yyyy-MM-dd, -1d]
BIZ_DATE='${biz_date}'
echo "处理日期:$BIZ_DATE"
# ── 1. 下载原始数据文件 ──────────────────────────────────────────────────
wget -q "https://jsonplaceholder.typicode.com/posts" -O /tmp/posts.json
echo "下载完成:$(wc -c < /tmp/posts.json) bytes"
# ── 2. 用 python3 解析 JSON + awk 过滤(userId <= 3 的帖子) ─────────────
python3 -c "
import json
posts = json.load(open('/tmp/posts.json'))
for p in posts:
print(f\"{p['id']},{p['userId']},{p['title'][:30].replace(',','')}\")
" | awk -F, '$2 <= 3 {print}' > /tmp/posts_filtered.csv
echo "过滤后行数:$(wc -l < /tmp/posts_filtered.csv)"
# ── 3. 用 python3 把结果写入 Lakehouse ──────────────────────────────────
python3 - << PYEOF
from clickzetta_dbutils import get_active_lakehouse_engine
from sqlalchemy import text
biz_date = '$BIZ_DATE'
engine = get_active_lakehouse_engine(schema="doc_connector_demo")
with engine.connect() as conn:
conn.execute(text("CREATE SCHEMA IF NOT EXISTS doc_connector_demo"))
conn.execute(text("""
CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_shell_posts (
post_id INT,
user_id INT,
title STRING,
load_date STRING
)
"""))
conn.execute(text(f"DELETE FROM doc_connector_demo.doc_shell_posts WHERE load_date = '{biz_date}'"))
rows = 0
with open('/tmp/posts_filtered.csv') as f:
for line in f:
parts = line.strip().split(',', 2)
if len(parts) == 3:
post_id, user_id, title = parts
title = title.replace("'", "''")
conn.execute(text(
f"INSERT INTO doc_connector_demo.doc_shell_posts VALUES "
f"({post_id}, {user_id}, '{title}', '{biz_date}')"
))
rows += 1
print(f"写入 {rows} 行,load_date={biz_date}")
with engine.connect() as conn:
result = conn.execute(text(
f"SELECT COUNT(*) as cnt, COUNT(DISTINCT user_id) as users "
f"FROM doc_connector_demo.doc_shell_posts WHERE load_date = '{biz_date}'"
))
row = result.fetchone()
print(f"验证:{row[0]} 条记录,{row[1]} 个用户")
PYEOF