外部数据源增量同步实践

本文演示一个完整的两阶段增量同步管道:Shell 任务负责从外部 HTTP API 拉取原始数据写入 Lakehouse raw 层,Python 任务负责读取 raw 层做清洗和标准化,写入 clean 层。两个任务通过依赖关系串联,每天自动执行。

GitHub Releases API 为数据源,同步 dbt-clickzetta 的版本发布记录为例。


管道架构

外部 HTTP API(GitHub Releases) ↓ Shell 任务(每天拉取,写入 raw 层) doc_github_raw(原始 JSON 字段,保留全量) ↓ Python 任务(去重、版本号解析,写入 clean 层) doc_github_clean(标准化字段,可直接查询分析)

为什么分两个任务而不是一个?

  • raw 层保留原始数据,出问题可以重跑清洗任务而不需要重新拉 API
  • Shell 和 Python 各司其职:Shell 擅长调用外部命令和落文件,Python 擅长数据处理逻辑
  • 两个任务可以独立调试和重跑

第一步:Shell 任务——拉取原始数据

脚本

#!/bin/bash # 任务参数:biz_date = $[yyyy-MM-dd] BIZ_DATE='${biz_date}' echo "拉取日期:$BIZ_DATE" python3 - << PYEOF import urllib.request, json from clickzetta_dbutils import get_active_lakehouse_engine from sqlalchemy import text biz_date = '$BIZ_DATE' # ── 1. 建表(幂等) ────────────────────────────────────────────────────── engine = get_active_lakehouse_engine(schema="doc_connector_demo") with engine.connect() as conn: conn.execute(text("CREATE SCHEMA IF NOT EXISTS doc_connector_demo")) conn.execute(text(""" CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_github_raw ( load_date STRING, repo STRING, tag_name STRING, name STRING, published_at STRING, body STRING, raw_json STRING ) """)) # ── 2. 拉取 GitHub Releases API ────────────────────────────────────────── url = "https://api.github.com/repos/clickzetta/dbt-clickzetta/releases?per_page=10" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=15) as r: releases = json.loads(r.read()) print(f"API 返回 {len(releases)} 条记录") # ── 3. 幂等写入 raw 层 ─────────────────────────────────────────────────── with engine.connect() as conn: conn.execute(text(f"DELETE FROM doc_connector_demo.doc_github_raw WHERE load_date = '{biz_date}'")) for rel in releases: tag = rel['tag_name'].replace("'", "''") name = rel['name'].replace("'", "''")[:100] pub = rel['published_at'][:10] body = (rel.get('body') or '').replace("'", "''")[:200].replace('\n', ' ') raw = json.dumps({'id': rel['id'], 'tag': rel['tag_name']}).replace("'", "''") conn.execute(text( f"INSERT INTO doc_connector_demo.doc_github_raw VALUES " f"('{biz_date}', 'clickzetta/dbt-clickzetta', '{tag}', '{name}', '{pub}', '{body}', '{raw}')" )) print(f"写入 {len(releases)} 条原始记录,load_date={biz_date}") # ── 4. 验证 ────────────────────────────────────────────────────────────── with engine.connect() as conn: result = conn.execute(text( f"SELECT tag_name, published_at FROM doc_connector_demo.doc_github_raw " f"WHERE load_date = '{biz_date}' ORDER BY published_at DESC LIMIT 3" )) for row in result: print(f" {row[0]:15s} {row[1]}") PYEOF

创建并执行任务

Studio UI

  1. 进入 数据开发 → 新建任务,选择 Shell 类型,命名为
    github_raw_fetch
    github_raw_fetch
  2. 粘贴上方脚本
  3. 点击 参数 按钮,
    biz_date
    biz_date
    赋值为
    $[yyyy-MM-dd]
    $[yyyy-MM-dd]
  4. 点击 调度,配置 Cron
    0 1 * * *
    0 1 * * *
    (每天凌晨 1 点)
  5. 点击 发布,点击 运行 输入
    biz_date=2024-12-01
    biz_date=2024-12-01
    验证

cz-cli(详见 Studio 任务开发与运维

cz-cli task create github_raw_fetch --type shell --profile <your-profile> cz-cli task save-content github_raw_fetch --file github_raw_fetch.sh \ --params '{"biz_date": "$[yyyy-MM-dd]"}' --profile <your-profile> cz-cli task save-config github_raw_fetch --vcluster default --retry-count 1 --profile <your-profile> cz-cli task save-cron github_raw_fetch --cron "0 1 * * *" --profile <your-profile> cz-cli task online github_raw_fetch -y --profile <your-profile> cz-cli task execute github_raw_fetch --param "biz_date=2024-12-01" --profile <your-profile>

执行结果

拉取日期:2024-12-01 API 返回 10 条记录 写入 10 条原始记录,load_date=2024-12-01 v1.7.10 2026-05-31 v1.7.9 2026-05-31 v1.7.8 2026-05-31


第二步:Python 任务——清洗和标准化

脚本

from clickzetta_dbutils import get_active_lakehouse_engine from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F from urllib.parse import urlparse, parse_qs import re # ── 任务参数 ───────────────────────────────────────────────────────────── # biz_date = $[yyyy-MM-dd] biz_date = '${biz_date}' print(f"清洗日期:{biz_date}") # ── 1. 创建 ZettaPark Session ──────────────────────────────────────────── engine = get_active_lakehouse_engine(schema="doc_connector_demo") url_str = str(engine.url) parsed = urlparse(url_str.replace('clickzetta://', 'https://')) params = parse_qs(parsed.query) parts = parsed.hostname.split('.', 1) session = Session.builder.configs({ "service": parts[1], "instance": parts[0], "magic_token": params['magic_token'][0], "workspace": parsed.path.lstrip('/'), "schema": params.get('schema', ['public'])[0], "vcluster": params.get('virtualcluster', ['default'])[0], }).getOrCreate() print(f"Session 就绪:{session.get_current_catalog()}.{session.get_current_schema()}") # ── 2. 读取 raw 层 ──────────────────────────────────────────────────────── raw = session.table("doc_connector_demo.doc_github_raw").filter( F.col("load_date") == biz_date ) print(f"raw 层记录数:{raw.count()}") # ── 3. 转为 pandas 做清洗 ──────────────────────────────────────────────── df = raw.to_pandas() # 去重:同一 tag_name 只保留一条 df = df.drop_duplicates(subset=['tag_name'], keep='first') print(f"去重后:{len(df)} 条") # 解析版本号:v1.7.5 → major=1, minor=7, patch=5 def parse_version(tag): m = re.match(r'v?(\d+)\.(\d+)\.(\d+)', tag) if m: return int(m.group(1)), int(m.group(2)), int(m.group(3)) return None, None, None df[['major', 'minor', 'patch']] = df['tag_name'].apply( lambda t: parse_version(t) ).apply(lambda x: x if x else (None, None, None)).tolist() # 过滤无效版本(tag 格式不符合 vX.Y.Z 的记录) df = df.dropna(subset=['major']) df['major'] = df['major'].astype(int) df['minor'] = df['minor'].astype(int) df['patch'] = df['patch'].astype(int) print(f"有效版本:{len(df)} 条") # ── 4. 建表并写回 clean 层 ──────────────────────────────────────────────── session.sql(""" CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_github_clean ( load_date STRING, repo STRING, tag_name STRING, name STRING, published_at STRING, major INT, minor INT, patch INT ) """).collect() session.sql(f"DELETE FROM doc_connector_demo.doc_github_clean WHERE load_date = '{biz_date}'").collect() result_df = session.create_dataframe( df[['load_date', 'repo', 'tag_name', 'name', 'published_at', 'major', 'minor', 'patch']] ) result_df.write.mode("append").save_as_table("doc_connector_demo.doc_github_clean") print(f"写入 clean 层:{len(df)} 条,load_date={biz_date}") # ── 5. 汇总输出 ────────────────────────────────────────────────────────── summary = ( session.table("doc_connector_demo.doc_github_clean") .filter(F.col("load_date") == biz_date) .sort(F.col("published_at").desc(), F.col("patch").desc()) .select("tag_name", "published_at", "major", "minor", "patch") ) print("\n最新 5 个版本:") summary.show(5) session.close()

创建并执行任务

Studio UI

  1. 进入 数据开发 → 新建任务,选择 Python 类型,命名为
    github_clean
    github_clean
  2. 粘贴上方脚本
  3. 点击 参数 按钮,
    biz_date
    biz_date
    赋值为
    $[yyyy-MM-dd]
    $[yyyy-MM-dd]
  4. 点击 调度,配置 Cron
    0 2 * * *
    0 2 * * *
    (凌晨 2 点,在 Shell 任务之后)
  5. 依赖 里添加上游任务
    github_raw_fetch
    github_raw_fetch
    ,确保 Shell 任务完成后再执行
  6. 点击 发布,点击 运行 输入
    biz_date=2024-12-01
    biz_date=2024-12-01
    验证

cz-cli

# 先获取 github_raw_fetch 的 task_id(用于配置依赖) cz-cli task list --profile <your-profile> cz-cli task create github_clean --type python --profile <your-profile> cz-cli task save-content github_clean --file github_clean.py \ --params '{"biz_date": "$[yyyy-MM-dd]"}' --profile <your-profile> cz-cli task save-config github_clean --vcluster default --retry-count 1 \ --deps replace \ --dep-tasks '[{"taskId": <github_raw_fetch_task_id>, "taskName": "github_raw_fetch"}]' \ --profile <your-profile> cz-cli task save-cron github_clean --cron "0 2 * * *" --profile <your-profile> cz-cli task online github_clean -y --profile <your-profile> cz-cli task execute github_clean --param "biz_date=2024-12-01" --profile <your-profile>

执行结果

清洗日期:2024-12-01 Session 就绪:quick_start.doc_connector_demo raw 层记录数:10 去重后:10 条 有效版本:10 条 写入 clean 层:10 条,load_date=2024-12-01 最新 5 个版本: -------------------------------------------------------------- |"TAG_NAME"|"PUBLISHED_AT"|"MAJOR"|"MINOR"|"PATCH"| -------------------------------------------------------------- |v1.7.10 |2026-05-31 |1 |7 |10 | |v1.7.9 |2026-05-31 |1 |7 |9 | |v1.7.8 |2026-05-31 |1 |7 |8 | |v1.7.7 |2026-05-30 |1 |7 |7 | |v1.7.6 |2026-05-30 |1 |7 |6 | --------------------------------------------------------------

验证 clean 层:

SELECT tag_name, published_at, major, minor, patch FROM doc_connector_demo.doc_github_clean WHERE load_date = '2024-12-01' ORDER BY published_at DESC, patch DESC LIMIT 5;

tag_name published_at major minor patch v1.7.10 2026-05-31 1 7 10 v1.7.9 2026-05-31 1 7 9 v1.7.8 2026-05-31 1 7 8 v1.7.7 2026-05-30 1 7 7 v1.7.6 2026-05-30 1 7 6


关键设计原则

幂等写入:每次执行前先

DELETE WHERE load_date = '${biz_date}'
DELETE WHERE load_date = '${biz_date}'
,再插入。重跑不会产生重复数据。

raw 层保留原始数据:raw 层不做任何转换,只落原始字段。清洗逻辑出问题时,只需重跑 Python 任务,不需要重新调用 API。

任务依赖:Python 任务配置依赖 Shell 任务,调度系统保证 Shell 任务成功后才触发 Python 任务。如果 Shell 任务失败,Python 任务不会执行。

参数一致:两个任务使用相同的

biz_date
biz_date
参数,确保 raw 层和 clean 层处理的是同一天的数据。


相关文档

联系我们
预约咨询
微信咨询
电话咨询