将Snowflake的实时ETL Pipeline迁移到云器Lakehouse

亮点

本文基于云器Lakehouse,快速实现了将Snowflake的实时ETL Pipeline迁移到云器Lakehouse,并发现基于云器Lakehouse的方案具备如下独特优势:

  • 全球多云支持。本方案中云器Lakehouse和Snowflake都基于AWS,提供了云体验的一致性。同样本方案也适合在GCP上的迁移。同时云器Lakehouse还支持阿里云、腾讯云、华为云,而不仅仅是海外主流云服务提供商的支持。
  • 迁移成本低。
    • 云器Lakehouse提供了和Snowflake非常相似的产品概念,这对熟悉Snowflake的用户而言,非常容易理解和上手。
    • 云器Lakehouse的SQL语法和Snowflake高度兼容,只需要做很少的Tiny修改即可实现代码迁移。
  • 运维成本低
    • 云器Lakehouse提供了内置的Python运行环境并和SQL等任务实现统一的调度、运维和监控。不需要额外的Airflow服务、Python运行环境,大幅简化了系统架构,降低了运维难度和成本。

Real-Time Insurance Data ETL Pipeline with Snowflake项目介绍

项目概况

如果你熟悉Snowflake的实际操作和应用,那么Github上的Real-Time Insurance Data ETL Pipeline with Snowflake 对你是再熟悉不过了。该项目涉及在Snowflake中创建实时 ETL(提取、转换、加载)数据管道,以处理来自 Kaggle 的保险数据并将其加载到 Snowflake 中。数据管道使用 Apache Airflow 进行管理,涉及几个步骤来清理、规范化和转换数据,然后将其加载到 Snowflake 进行进一步处理。AWS S3 用作数据湖,用于存储原始数据和转换后的数据。

开发完该项目后,可以根据需求安排 Airflow DAG 运行,确保 ETL 流程以所需的频率执行。最终清理、规范化和转换后的数据将可在 Tableau 中进行实时可视化,从而获得最新的见解和报告。项目架构图如下:

使用的工具和服务

  • Python:用于脚本编写和数据处理。
  • Pandas:用于数据操作和分析。
  • AWS S3:作为存储原始数据和转换后数据的数据湖。
  • Snowflake:用于数据建模和存储。
  • Apache Airflow:用于协调 ETL 工作流程。
  • EC2:用于托管 Airflow 环境。
  • Kaggle API:用于从 Kaggle 提取数据。
  • Tableau:用于数据可视化。

ETL 工作流

  1. 数据提取

    • 创建 Airflow DAG 脚本,使用 Kaggle API 从 Kaggle 保险数据集中提取随机数量的行。

    • 清理、规范化并将数据转换为四个表:

      • policy_data
        policy_data
      • customers_data
        customers_data
      • vehicles_data
        vehicles_data
      • claims_data
        claims_data
  2. 数据存储

    • 将转换后的数据存储在 S3 存储桶中,并针对每种类型的规范化数据 ( 
      policy_data
      policy_data
      customers_data
      customers_data
      vehicles_data
      vehicles_data
      claims_data
      claims_data
      ) 组织到不同的文件夹中。
  3. Snowflake中的数据处理

    • 创建 Snowflake SQL 工作表来定义数据库模式。
    • 在 Snowflake 中为每种类型的规范化数据创建暂存表。
    • 定义 Snowpipes 以自动将数据从 S3 存储桶提取到暂存表。
    • 为每个暂存表创建流对象以捕获变化。
    • 创建最终表以合并来自流对象的新数据,确保仅插入不同的或新的行。
  4. 使用 Snowflake 流和任务更改数据捕获

    • 在 Snowflake 中创建任务以自动捕获变更数据。
    • 当流对象中有新数据可用时,将触发每个任务以将数据加载到最终表中。
  5. Airflow DAG 任务

    • 任务 1:检查 Kaggle API 是否可用。
    • 任务 2:将转换后的数据上传到 S3 存储桶。

迁移方案

需要什么

云器Lakehouse和Snowflake对象概念映射

云器Lakehouse概念Snowflake概念
WORSPACEDATABASE
SCHEMASCHEMA
VCLUSTERWAREHOUSE
STORAGE CONNECTIONSTORAGE INTEGRATION
VOLUMESTAGE
TABLETABLE
PIPESNOWPIPE
TABLE STREAMSTREAM
STUDIO 任务TASK
Lakehouse SQLSnowflake SQL

基于云器Lakehouse的架构

云器Lakehouse方案和Snowflake方案里的工具和服务对比

云器Lakehouse方案里工具和服务用途Snowflake方案里的工具和服务用途
Python用于脚本编写和数据处理Python用于脚本编写和数据处理
Pandas用于数据操作和分析Pandas用于数据操作和分析
AWS S3作为存储原始数据和转换后数据的数据湖AWS S3作为存储原始数据和转换后数据的数据湖
云器Lakehouse用于数据建模和存储Snowflake用于数据建模和存储
云器Lakehouse Studio IDE用于协调 ETL 工作流程Apache Airflow用于协调 ETL 工作流程
云器Lakehouse Studio Python任务用于托管 Airflow 环境EC2用于托管 Airflow 环境
云器Lakehouse JDBC Driver用于连接TableauTableau Snowflake连接器用于连接Tableau
Kaggle API用于从 Kaggle 提取数据Kaggle API用于从 Kaggle 提取数据
Tableau用于数据可视化Tableau用于数据可视化

迁移中碰到的语法差异

功能云器LakehouseSnowflake
代码注释—- or —/// or //
stream元数据__change_type字段METADATA$ACTION
创建对象DDL有些对象不支持CREATE OR REPLACE,用CREATE IF NOT EXISTS和ALTER的方式CREATE OR REPLACE

迁移步骤

任务开发

任务树

导航到Lakehouse Studio的开发->任务,

单击“+”新建如下目录:

  • Quickstarts_RealTime_Insurance_claims_Data_ETL_Pipeline

单击“+”新建如下SQL任务:

  • 00_Setup_Env
  • 02_Stages_or_Volumes
  • 03_Tables
  • 04_Pipes
  • 05_Streams

以上任务开发好后请点击“运行”完成对象创建。

单击“+”新建如下PYTHON任务:

  • 01_Data_Generate
  • 06_Tasks(目录)
    • claims_cdc_task
    • customers_cdc_task
    • policy_cdc_task
    • vehicles_cdc_task

以上任务开发好后请点击“运行”先进行测试(后续步骤请参考文档后面的调度和发布指导)。

将如下代码复制到对应的任务里,也可以从GitHub下载文件后将内容复制到对应的任务里。

任务里的参数设置

对于任务(00_Setup_Env和01_Data_Generate)里是带有参数的,比如:

请点击“调度”:

然后“加载代码中的参数”并填入实际值:

构建云器Lakehouse环境

00_Setup_Env:

-- RealTime_Insurance_ETL_VC virtual cluster CREATE VCLUSTER IF NOT EXISTS RealTime_Insurance_ETL_VC VCLUSTER_SIZE = XSMALL VCLUSTER_TYPE = GENERAL AUTO_SUSPEND_IN_SECOND = 60 AUTO_RESUME = TRUE COMMENT 'RealTime_Insurance_ETL_VC VCLUSTER for test'; -- Use our VCLUSTER for data house USE VCLUSTER RealTime_Insurance_ETL_VC; -- Create and Use SCHEMA CREATE SCHEMA IF NOT EXISTS RealTime_Insurance_ETL_SCH; USE SCHEMA RealTime_Insurance_ETL_SCH; --external Connection for data lake --创建数据湖Connection,到数据湖的连接 CREATE STORAGE CONNECTION if not exists aws_s3_connection_demo TYPE S3 ENDPOINT = 's3.us-east-1.amazonaws.com' REGION = 'us-east-1' ACCESS_KEY = ${ACCESS_KEY} SECRET_KEY = ${SECRET_KEY} comments = 'us-east-1 storage connection for etl demo';

将SNOWFLAKE_ETL.py迁移为云器Lakehouse的内置Python任务Data_Generate

云器Lakehouse提供了托管的Python运行环境,在云器Lakehouse Studio里开发的Python任务,可以直接运行,也可以通过配置调度信息实现周期性的运行。

01_Data_Generate:

import subprocess import sys import warnings import contextlib import os # Suppress warnings warnings.filterwarnings("ignore", message="A value is trying to be set on a copy of a slice from a DataFrame") # Suppress stderr @contextlib.contextmanager def suppress_stderr(): with open(os.devnull, 'w') as devnull: old_stderr = sys.stderr sys.stderr = devnull try: yield finally: sys.stderr = old_stderr with suppress_stderr(): # Install kaggle subprocess.run([sys.executable, "-m", "pip", "install", "kaggle", "--target", "/home/system_normal", "-i", "https://pypi.tuna.tsinghua.edu.cn/simple"], stderr=subprocess.DEVNULL) sys.path.append('/home/system_normal') import pandas as pd import boto3 import random import os, json, io import zipfile from datetime import datetime def load_random_sample(csv_file, sample_size): # Count total rows in the CSV file total_rows = sum(1 for line in open(csv_file, encoding='utf-8')) - 1 # Subtract header row # Calculate indices of rows to skip (non-selected) skip_indices = random.sample(range(1, total_rows + 1), total_rows - sample_size) # Load DataFrame with random sample of rows df = pd.read_csv(csv_file, skiprows=skip_indices) policy_table = df[['policy_id', 'subscription_length', 'region_code', 'segment']].copy() vehicles_table = df[['policy_id', 'vehicle_age', 'fuel_type', 'is_parking_sensors', 'is_parking_camera', 'rear_brakes_type', 'displacement', 'transmission_type', 'steering_type', 'turning_radius', 'gross_weight', 'is_front_fog_lights', 'is_rear_window_wiper', 'is_rear_window_washer', 'is_rear_window_defogger', 'is_brake_assist', 'is_central_locking', 'is_power_steering', 'is_day_night_rear_view_mirror', 'is_speed_alert', 'ncap_rating']].copy() customers_table = df[['policy_id', 'customer_age', 'region_density']].copy() claims_table = df[['policy_id', 'claim_status']].copy() vehicles_table.rename(columns={'policy_id': 'vehicle_id'}, inplace=True) customers_table.rename(columns={'policy_id': 'customer_id'}, inplace=True) claims_table.rename(columns={'policy_id': 'claim_id'}, inplace=True) return policy_table, vehicles_table, customers_table, claims_table def upload_df_to_s3(): try: with suppress_stderr(): # Setup Kaggle API # Ensure the directory exists config_dir = '/home/system_normal/tempdata/.config/kaggle' if not os.path.exists(config_dir): os.makedirs(config_dir) # Create the kaggle.json file with the given credentials kaggle_json = { "username": ${kaggle_username}, "key": ${kaggel_key} } with open(os.path.join(config_dir, 'kaggle.json'), 'w') as f: json.dump(kaggle_json, f) # Set the environment variable to the directory containing kaggle.json os.environ['KAGGLE_CONFIG_DIR'] = config_dir from kaggle.api.kaggle_api_extended import KaggleApi # Authenticate the Kaggle API api = KaggleApi() api.authenticate() # Define the dataset dataset = 'litvinenko630/insurance-claims' # Define the CSV file name csv_file = 'Insurance claims data.csv' # Download the entire dataset as a zip file api.dataset_download_files(dataset, path='/home/system_normal/tempdata') # Extract the CSV file from the downloaded zip file with zipfile.ZipFile('/home/system_normal/tempdata/insurance-claims.zip', 'r') as zip_ref: zip_ref.extract(csv_file, path='/home/system_normal/tempdata') policy_data, vehicles_data, customers_data, claims_data = load_random_sample(f'/home/system_normal/tempdata/{csv_file}', 20) # Convert DataFrame to CSV string policy = policy_data.to_csv(index=False) vehicles = vehicles_data.to_csv(index=False) customers = customers_data.to_csv(index=False) claims = claims_data.to_csv(index=False) current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S") # Ensure you have set your AWS credentials in environment variables or replace the following with your credentials s3_client = boto3.client( 's3', aws_access_key_id= ${aws_access_key_id}, aws_secret_access_key= ${aws_secret_access_key}, region_name= ${aws_region_name} ) # Define S3 bucket and keys with current date and time s3_bucket = 'insurance-data-clickzetta-etl-project' s3_key_policy = f'policy/policy_{current_datetime}.csv' s3_key_vehicles = f'vehicles/vehicles_{current_datetime}.csv' s3_key_customers = f'customers/customers_{current_datetime}.csv' s3_key_claims = f'claims/claims_{current_datetime}.csv' # Upload to S3 s3_client.put_object(Bucket=s3_bucket, Key=s3_key_policy, Body=policy) s3_client.put_object(Bucket=s3_bucket, Key=s3_key_vehicles, Body=vehicles) s3_client.put_object(Bucket=s3_bucket, Key=s3_key_customers, Body=customers) s3_client.put_object(Bucket=s3_bucket, Key=s3_key_claims, Body=claims) printf("upload_df_to_s3 down:{s3_key_policy},{s3_key_vehicles},{s3_key_customers},{s3_key_claims}") except Exception as e: pass # Ignore errors # Run the upload function upload_df_to_s3()

创建云器Lakehouse Volumes(对应Snowflake的Stages)

02_Stages_or_Volumes:

请将LOCATION 's3://insurance-data-clickzetta-etl-project/policy'里的bucket名称insurance-data-clickzetta-etl-project改为你的bucket名称。

--创建Volume,数据湖存储文件的位置 CREATE EXTERNAL VOLUME if not exists policy_data_stage LOCATION 's3://insurance-data-clickzetta-etl-project/policy' USING connection aws_s3_connection_demo -- storage Connection DIRECTORY = ( enable = TRUE ) recursive = TRUE; --同步数据湖Volume的目录到Lakehouse ALTER volume policy_data_stage refresh; --查看云器Lakehouse数据湖Volume上的文件 SELECT * from directory(volume policy_data_stage); --********************************************************************-- --创建Volume,数据湖存储文件的位置 CREATE EXTERNAL VOLUME if not exists vehicles_data_stage LOCATION 's3://insurance-data-clickzetta-etl-project/vehicles' USING connection aws_s3_connection_demo -- storage Connection DIRECTORY = ( enable = TRUE ) recursive = TRUE; --同步数据湖Volume的目录到Lakehouse ALTER volume vehicles_data_stage refresh; --查看云器Lakehouse数据湖Volume上的文件 SELECT * from directory(volume vehicles_data_stage); --********************************************************************-- --创建Volume,数据湖存储文件的位置 CREATE EXTERNAL VOLUME if not exists customers_data_stage LOCATION 's3://insurance-data-clickzetta-etl-project/customers' USING connection aws_s3_connection_demo -- storage Connection DIRECTORY = ( enable = TRUE ) recursive = TRUE; --同步数据湖Volume的目录到Lakehouse ALTER volume customers_data_stage refresh; --查看云器Lakehouse数据湖Volume上的文件 SELECT * from directory(volume customers_data_stage); --********************************************************************-- --创建Volume,数据湖存储文件的位置 CREATE EXTERNAL VOLUME if not exists claims_data_stage LOCATION 's3://insurance-data-clickzetta-etl-project/claims' USING connection aws_s3_connection_demo -- storage Connection DIRECTORY = ( enable = TRUE ) recursive = TRUE; --同步数据湖Volume的目录到Lakehouse ALTER volume claims_data_stage refresh; --查看云器Lakehouse数据湖Volume上的文件 SELECT * from directory(volume claims_data_stage);

创建云器Lakehouse Tables

03_Tables:

--- STAGING TABLES --- --creating staging tables for each normalized tables created by data pipeline CREATE TABLE IF NOT EXISTS staging_policy( policy_id VARCHAR(15) , subscription_length FLOAT, region_code VARCHAR(5), segment VARCHAR(10)); CREATE TABLE IF NOT EXISTS staging_vehicles ( vehicle_id VARCHAR(15) , vehicle_age FLOAT, fuel_type VARCHAR(10), parking_sensors VARCHAR(5), parking_camera VARCHAR(5), rear_brakes_type VARCHAR(10), displacement INT, trasmission_type VARCHAR(20), steering_type VARCHAR(15), turning_radius FLOAT, gross_weight INT, front_fog_lights VARCHAR(5), rear_window_wiper VARCHAR(5), rear_window_washer VARCHAR(5), rear_window_defogger VARCHAR(5), brake_assist VARCHAR(5), central_locking VARCHAR(5), power_steering VARCHAR(5), day_night_rear_view_mirror VARCHAR(5), is_speed_alert VARCHAR(5), ncap_rating INT); CREATE TABLE IF NOT EXISTS staging_customers( customer_id VARCHAR(20) , customer_age INT, region_density INT); CREATE TABLE IF NOT EXISTS staging_claims( claim_id VARCHAR(20) , claim_status INT); --- FINAL TABLES --- --creating final table to store transformed data captured by stream objects CREATE TABLE IF NOT EXISTS policy( policy_id VARCHAR(15) , subscription_length FLOAT, region_code VARCHAR(5), segment VARCHAR(10)); SELECT * FROM policy; TRUNCATE TABLE policy; CREATE TABLE IF NOT EXISTS vehicles ( vehicle_id VARCHAR(15) , vehicle_age FLOAT, fuel_type VARCHAR(10), parking_sensors VARCHAR(5), parking_camera VARCHAR(5), rear_brakes_type VARCHAR(10), displacement INT, trasmission_type VARCHAR(20), steering_type VARCHAR(15), turning_radius FLOAT, gross_weight INT, front_fog_lights VARCHAR(5), rear_window_wiper VARCHAR(5), rear_window_washer VARCHAR(5), rear_window_defogger VARCHAR(5), brake_assist VARCHAR(5), central_locking VARCHAR(5), power_steering VARCHAR(5), day_night_rear_view_mirror VARCHAR(5), is_speed_alert VARCHAR(5), ncap_rating INT); CREATE TABLE IF NOT EXISTS customers( customer_id VARCHAR(20) , customer_age INT, region_density INT); CREATE TABLE IF NOT EXISTS claims( claim_id VARCHAR(20) , claim_status INT);

创建云器Lakehouse Pipes

04_Pipes:

create pipe policy_pipe VIRTUAL_CLUSTER = 'RealTime_Insurance_ETL_VC' --执行获取最新文件使用扫描文件模式 INGEST_MODE = 'LIST_PURGE' as copy into staging_policy from volume policy_data_stage( policy_id VARCHAR(15) , subscription_length FLOAT, region_code VARCHAR(5), segment VARCHAR(10)) using csv OPTIONS( 'header'='true' ) --必须添加purge参数导入成功后删除数据 purge=true ; --********************************************************************-- create pipe vehicles_pipe VIRTUAL_CLUSTER = 'RealTime_Insurance_ETL_VC' --执行获取最新文件使用扫描文件模式 INGEST_MODE = 'LIST_PURGE' as copy into staging_vehicles from volume vehicles_data_stage( vehicle_id VARCHAR(15) , vehicle_age FLOAT, fuel_type VARCHAR(10), parking_sensors VARCHAR(5), parking_camera VARCHAR(5), rear_brakes_type VARCHAR(10), displacement INT, trasmission_type VARCHAR(20), steering_type VARCHAR(15), turning_radius FLOAT, gross_weight INT, front_fog_lights VARCHAR(5), rear_window_wiper VARCHAR(5), rear_window_washer VARCHAR(5), rear_window_defogger VARCHAR(5), brake_assist VARCHAR(5), central_locking VARCHAR(5), power_steering VARCHAR(5), day_night_rear_view_mirror VARCHAR(5), is_speed_alert VARCHAR(5), ncap_rating INT) using csv OPTIONS( 'header'='true' ) --必须添加purge参数导入成功后删除数据 purge=true ; --********************************************************************-- create pipe customers_pipe VIRTUAL_CLUSTER = 'RealTime_Insurance_ETL_VC' --执行获取最新文件使用扫描文件模式 INGEST_MODE = 'LIST_PURGE' as copy into staging_customers from volume customers_data_stage( customer_id VARCHAR(20), customer_age INT, region_density INT) using csv OPTIONS( 'header'='true' ) --必须添加purge参数导入成功后删除数据 purge=true ; --********************************************************************-- create pipe claims_pipe VIRTUAL_CLUSTER = 'RealTime_Insurance_ETL_VC' --执行获取最新文件使用扫描文件模式 INGEST_MODE = 'LIST_PURGE' as copy into staging_claims from volume claims_data_stage( claim_id VARCHAR(20) , claim_status INT) using csv OPTIONS( 'header'='true' ) --必须添加purge参数导入成功后删除数据 purge=true ;

创建云器Lakehouse Table Streams(对应Snowflake的Streams)

05_Streams:

--********************************************************************-- --- CREATING TABLE STREAM OBJECTS FOR EACH STAGING TABLES TO CAPTURE NEW DATA -- creating TABLE STREAM objects for staging tables CREATE TABLE STREAM IF NOT EXISTS STREAM_policy ON TABLE staging_policy WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); --********************************************************************-- CREATE TABLE STREAM IF NOT EXISTS STREAM_vehicles ON TABLE staging_vehicles WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); --********************************************************************-- CREATE TABLE STREAM IF NOT EXISTS STREAM_customers ON TABLE staging_customers WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); --********************************************************************-- CREATE TABLE STREAM IF NOT EXISTS STREAM_claims ON TABLE staging_claims WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); --********************************************************************-- -- check total streams SHOW TABLE STREAMS;

检查新创建的对象

通过以下命令检查通过SQL命令创建对象是否已成功,以及PIPES的状态。

--Lake Objects SHOW CONNECTIONS; SHOW VOLUMES; --House Objects SHOW TABLES; SHOW PIPES; SHOW TABLE STREAMS;

SHOW CONNECTIONS;

SHOW VOLUMES;

SHOW TABLES;

SHOW PIPES;

SHOW TABLE STREAMS;

创建Transform任务

06_Tasks:claims_cdc_task

MERGE INTO claims AS a USING stream_claims AS b ON a.claim_id = b.claim_id WHEN NOT MATCHED AND b.__change_type = 'INSERT' THEN INSERT (claim_id, claim_status) VALUES (b.claim_id, b.claim_status);

06_Tasks:customers_cdc_task

MERGE INTO customers AS a USING stream_customers AS b ON a.customer_id = b.customer_id WHEN NOT MATCHED AND b.__change_type = 'INSERT' THEN INSERT (customer_id, customer_age, region_density) VALUES (b.customer_id, b.customer_age, b.region_density);

06_Tasks:policy_cdc_task

MERGE INTO policy AS a USING stream_policy AS b ON a.policy_id = b.policy_id WHEN NOT MATCHED AND b.__change_type = 'INSERT' THEN INSERT (policy_id, subscription_length, region_code, segment) VALUES (b.policy_id, b.subscription_length, b.region_code, b.segment);

06_Tasks:vehicles_cdc_task

MERGE INTO vehicles AS a USING stream_vehicles AS b ON a.vehicle_id = b.vehicle_id WHEN NOT MATCHED AND b.__change_type = 'INSERT' THEN INSERT (vehicle_id, vehicle_age, fuel_type, parking_sensors, parking_camera, rear_brakes_type, displacement, trasmission_type, steering_type, turning_radius, gross_weight, front_fog_lights, rear_window_wiper, rear_window_washer, rear_window_defogger, brake_assist, central_locking, power_steering, day_night_rear_view_mirror, is_speed_alert, ncap_rating) VALUES (b.vehicle_id, b.vehicle_age,b.fuel_type, b.parking_sensors, b.parking_camera, b.rear_brakes_type, b.displacement, b.trasmission_type, b.steering_type, b.turning_radius, b.gross_weight, b.front_fog_lights, b.rear_window_wiper, b.rear_window_washer, b.rear_window_defogger, b.brake_assist, b.central_locking, b.power_steering, b.day_night_rear_view_mirror, b.is_speed_alert, b.ncap_rating);

生产运行

作业调度

导航到Lakehouse Studio的开发->任务,

配置如下任务的调度参数并提交到生成环境运行。

需要配置调度和提交上线的任务如下:

  • 01_Data_Generate
  • claims_cdc_task
  • customers_cdc_task
  • policy_cdc_task
  • vehicles_cdc_task

调度参数配置如下:

Streams和Pipes创建后会自动运行,无需调度。

通过Tableau分析云器Lakehouse里的数据

参考这个文章将Tableau通过JDBC连接到云器Lakehouse,对Lakehouse里的数据进行数据探查、分析,并创建BI报表。

周期任务运维

导航到Lakehouse Studio的运维监控->任务运维->周期任务,查看各个周期调度任务的运行状态:

周期任务的任务实例管理

Pipes运维

-- 暂停和启动PIPE -- 暂停 ALTER pipe policy_pipe SET PIPE_EXECUTION_PAUSED = true; -- 启动 ALTER pipe policy_pipe SET PIPE_EXECUTION_PAUSED = false; -- 查看pipe copy作业执行情况 -- 七天之内的,延迟半小时 SELECT * FROM INFORMATION_SCHEMA.JOB_HISTORY WHERE QUERY_TAG="pipe.qiliang_ws.realtime_insurance_etl_sch.policy_pipe"; -- 实时的 SHOW JOBS IN VCLUSTER SCD_VC WHERE QUERY_TAG="pipe.qiliang_ws.realtime_insurance_etl_sch.policy_pipe"; SHOW JOBS where length(QUERY_TAG)>10; -- 查看copy作业导入的历史文件 select * from load_history('RealTime_Insurance_ETL_SCH.staging_policy'); select * from load_history('RealTime_Insurance_ETL_SCH.staging_vehicles'); select * from load_history('RealTime_Insurance_ETL_SCH.staging_customers'); select * from load_history('RealTime_Insurance_ETL_SCH.staging_claims');

查看staging table、table stream和final table表里的数据

SELECT '01staging' AS table_type, 'staging_policy' AS table_name, COUNT(*) AS row_count FROM staging_policy UNION ALL SELECT '01staging' AS table_type, 'staging_vehicles' AS table_name, COUNT(*) AS row_count FROM staging_vehicles UNION ALL SELECT '01staging' AS table_type, 'staging_customers' AS table_name, COUNT(*) AS row_count FROM staging_customers UNION ALL SELECT '01staging' AS table_type, 'staging_claims' AS table_name, COUNT(*) AS row_count FROM staging_claims UNION ALL SELECT '02stream' AS table_type, 'STREAM_policy' AS table_name, COUNT(*) AS row_count FROM STREAM_policy UNION ALL SELECT '02stream' AS table_type, 'STREAM_vehicles' AS table_name, COUNT(*) AS row_count FROM STREAM_vehicles UNION ALL SELECT '02stream' AS table_type, 'STREAM_customers' AS table_name, COUNT(*) AS row_count FROM STREAM_customers UNION ALL SELECT '02stream' AS table_type, 'STREAM_claims' AS table_name, COUNT(*) AS row_count FROM STREAM_claims UNION ALL SELECT '03final' AS table_type, 'policy' AS table_name, COUNT(*) AS row_count FROM policy UNION ALL SELECT '03final' AS table_type, 'vehicles' AS table_name, COUNT(*) AS row_count FROM vehicles UNION ALL SELECT '03final' AS table_type, 'customers' AS table_name, COUNT(*) AS row_count FROM customers UNION ALL SELECT '03final' AS table_type, 'claims' AS table_name, COUNT(*) AS row_count FROM claims ORDER BY table_type;

资料

云器Lakehouse基本概念

云器Lakehouse JDBC Driver

云器Lakehouse Python任务

云器云器Lakehouse调度与运维

联系我们
预约咨询
微信咨询
电话咨询