使用Java SDK批量上传数据

本文档主要介绍如何使用 Java SDK 的 BulkloadStream 批量将数据加载到 Lakehouse 中。它适合一次性大量数据导入,支持自定义数据源,提供了数据导入的灵活性。本次案例以读取本地文件为例。如果您的数据源在对象存储或 Lakehouse Studio 数据集成支持的范围内,推荐使用 COPY 命令或数据集成功能。

参考文档

Java SDK批量上传数据

应用场景

  • 适用于需要批量上传大量数据的业务场景。
  • 适合熟悉Java并需要自定义数据导入逻辑的开发人员。

使用限制

  • BulkloadStream不支持主键(pk)表的写入。
  • 不适用于时间间隔小于五分钟的频繁数据上传场景。

使用案例

本案例以读取本地 CSV 文件为例,使用的数据集是 巴西电子商务 公共数据集中的 olist_order_items_dataset 数据。若数据源位于对象存储或 Lakehouse Studio 数据集成功能支持的范围,推荐使用 COPY 命令或数据集成功能。

前置条件

  • 创建表

    • create table bulk_order_items ( order_id STRING, order_item_id INT, product_id STRING, seller_id STRING, shipping_limit_date STRING, price DOUBLE, freight_value DOUBLE );

  • 对目标表具有INSERT权限。

使用Java代码开发

Maven依赖

在项目的

pom.xml
pom.xml
文件中添加 Lakehouse 的 Maven 依赖。Lakehouse Maven 依赖的最新版本可以在 maven库 中找到。

<dependency> <groupId>com.clickzetta</groupId> <artifactId>clickzetta-java</artifactId> <version>1.3.1</version> </dependency>

编写Java代码

  1. 初始化 Lakehouse 客户端和 BulkloadStream:创建
    BulkloadFile
    BulkloadFile
    类,初始化 Lakehouse 连接和 BulkloadStream 对象。
  2. 读取本地 CSV 文件并写入 Lakehouse:使用 Java IO 流读取本地 CSV 文件,并将数据逐行写入 Lakehouse。

import com.clickzetta.client.BulkloadStream; import com.clickzetta.client.ClickZettaClient; import com.clickzetta.client.RowStream; import com.clickzetta.client.StreamState; import com.clickzetta.platform.client.api.Row; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.text.MessageFormat; public class BulkloadFile { private static ClickZettaClient client; private static final String password = ""; private static final String table = "bulk_order_items"; private static final String workspace = ""; private static final String schema = "public"; private static final String vc = "default"; private static final String user = ""; static BulkloadStream bulkloadStream; public static void main(String[] args) throws Exception { initialize(); File csvFile = new File("olist_order_items_dataset.csv"); BufferedReader reader = new BufferedReader(new FileReader(csvFile)); // Skip the header row reader.readLine(); // Skip the first line (header) // Insert data into the database String line; while ((line = reader.readLine()) != null) { String[] values = line.split(","); //类型转化保持和服务端类型一致 String orderId = values[0]; int orderItemId = Integer.parseInt(values[1]); // Convert order_item_id to int String productId = values[2]; String sellerId = values[3]; String shippingLimitDate = values[4]; double price = Double.parseDouble(values[5]); double freightValue = Double.parseDouble(values[6]); Row row = bulkloadStream.createRow(); // Set parameter values row.setValue(0, orderId); row.setValue(1, orderItemId); row.setValue(2, productId); row.setValue(3, sellerId); row.setValue(4, shippingLimitDate); row.setValue(5, price); row.setValue(6, freightValue); //必须调用该方法,否则无法将数据发送到服务端 bulkloadStream.apply(row); } // Close resources reader.close(); bulkloadStream.close(); waitForBulkloadCompletion(); client.close(); System.out.println("Data inserted successfully!"); } private static void initialize() throws Exception { String url = MessageFormat.format("jdbc:clickzetta://demo_instance.cn-shanghai-alicloud.api.clickzetta.com/{0}?" + "schema={1}&username={2}&password={3}&virtualcluster={4}&", workspace, schema, user, password, vc); client = ClickZettaClient.newBuilder().url(url).build(); bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table) .operate(RowStream.BulkLoadOperate.APPEND) .build(); } private static void waitForBulkloadCompletion() throws InterruptedException { while (bulkloadStream.getState() == StreamState.RUNNING) { Thread.sleep(1000); } if (bulkloadStream.getState() == StreamState.FAILED) { throw new ArithmeticException(bulkloadStream.getErrorMessage()); } } }

联系我们
预约咨询
微信咨询
电话咨询