-- 创建一个测试表
CREATE TABLE test_table (id INT, name VARCHAR, age INT);
--创建table stream时必须开启
ALTER table test_table set PROPERTIES ('change_tracking' = 'true');
--创建只追加流
CREATE table stream test_stream ON TABLE test_table
WITH
PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
-- 插入一些数据到测试表
INSERT INTO test_table VALUES
(1, 'Alice', 20),
(2, 'Bob', 25),
(3, 'Charlie', 30),
(4, 'David', 35),
(5, 'Eve', 40);
-- 查询测试流,应该返回插入的数据
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 更新一些数据到测试表
UPDATE test_table SET age = age + 5 WHERE id = 1 OR id = 3;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,update的数据不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 删除一些数据到测试表
DELETE FROM test_table WHERE id = 2 OR id = 4;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,delete不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 删除原表
DELETE FROM test_table;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,delete不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
注意
创建Table Stream,必须在基表上执行
ALTER TABLE table_name set PROPERTIES ('change_tracking' = 'true');
STANDARD模式案例
-- 创建一个测试表
CREATE TABLE test_table_offset (id INT, name VARCHAR, age INT);
--创建table stream时必须开启
ALTER TABLE test_table_offset set PROPERTIES ('change_tracking' = 'true');
CREATE table stream test_table_offset_stream ON TABLE test_table_offset
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 插入一些数据到测试表
INSERT INTO test_table_offset VALUES
(1, 'Alice', 20),
(2, 'Bob', 25),
(3, 'Charlie', 30),
(4, 'David', 35),
(5, 'Eve', 40);
-- 查询测试流,应该返回插入的数据
CREATE TABLE test_table_offset_consume (id INT, name VARCHAR, age INT);
--把刚刚插入的数据也同步到目标表,保持一致
INSERT INTO test_table_offset_consume
SELECT id,name,age FROM test_table_offset_stream;
--查看strema是否有数据
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+
-- 更新一些数据到测试表
UPDATE test_table_offset SET age = age + 5 WHERE id = 1 OR id = 3;
-- 查询测试流,应该返回更新的数据,此时数据中会有两条更新前和更新后
SELECT * FROM test_table_offset_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| UPDATE_AFTER | 4 | 2025-04-28 17:41:18.507 | 1 | Alice | 25 |
| UPDATE_AFTER | 4 | 2025-04-28 17:41:18.507 | 3 | Charlie | 35 |
| UPDATE_BEFORE | 3 | 2025-04-28 17:40:54.626 | 1 | Alice | 20 |
| UPDATE_BEFORE | 3 | 2025-04-28 17:40:54.626 | 3 | Charlie | 30 |
+---------------+------------------+-------------------------+----+---------+-----+
--将更新后的数据消费,使用stream的数据更新目标表
MERGE INTO test_table_offset_consume target USING test_table_offset_stream source_stream ON target.id = source_stream.id WHEN MATCHED
AND source_stream.__change_type = 'UPDATE_AFTER' THEN update set target.age = source_stream.age WHEN MATCHED
AND source_stream.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED
AND source_stream.__change_type = 'INSERT' THEN
INSERT VALUES (target.id, target.name, target.age);
--查看更新后的表test_table_offset_consume数据是否正确
SELECT * FROM test_table_offset_consume;
+----+---------+-----+
| id | name | age |
+----+---------+-----+
| 1 | Alice | 25 |
| 3 | Charlie | 35 |
| 2 | Bob | 25 |
| 4 | David | 35 |
| 5 | Eve | 40 |
+----+---------+-----+
--查看table stream中是否还有数据,table stream的数据已经全部消费
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+
-- 删除一些数据到测试表
DELETE FROM test_table_offset WHERE id = 2 OR id = 4;
--查看table stream
SELECT * FROM test_table_offset_stream;
+---------------+------------------+-------------------------+----+-------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+-------+-----+
| DELETE | 3 | 2025-04-28 17:40:54.626 | 2 | Bob | 25 |
| DELETE | 3 | 2025-04-28 17:40:54.626 | 4 | David | 35 |
+---------------+------------------+-------------------------+----+-------+-----+
--将删除后的数据消费,使用table stream的数据更新目标表
MERGE INTO test_table_offset_consume target USING test_table_offset_stream source_stream ON target.id = source_stream.id WHEN MATCHED
AND source_stream.__change_type = 'UPDATE_AFTER' THEN update set target.age = source_stream.age WHEN MATCHED
AND source_stream.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED
AND source_stream.__change_type = 'INSERT' THEN
INSERT VALUES (target.id, target.name, target.age);
----查看更新后的表test_table_offset_consume数据是否正确
SELECT * FROM test_table_offset_consume;
+----+---------+-----+
| id | name | age |
+----+---------+-----+
| 1 | Alice | 25 |
| 3 | Charlie | 35 |
| 5 | Eve | 40 |
+----+---------+-----+
--查看table stream中是否还有数据,table stream的数据已经全部消费
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+
-- 增量识别节点:只处理 INSERT(新图)和 UPDATE_AFTER(替换图),自动推进 offset
INSERT INTO dish_recognition_results
SELECT
relative_path AS file_path,
url AS file_url,
size AS file_size,
last_modified_time,
public.fc_image_to_text('dish_recognition', url) AS recognized_content,
__change_type AS change_type,
__commit_version AS commit_version,
current_timestamp() AS processed_at
FROM str_dish_images
WHERE __change_type IN ('INSERT', 'UPDATE_AFTER') -- 只处理新增和替换,跳过 DELETE / UPDATE_BEFORE
AND (
lower(relative_path) LIKE '%.jpg'
OR lower(relative_path) LIKE '%.jpeg'
OR lower(relative_path) LIKE '%.png'
OR lower(relative_path) LIKE '%.webp'
);
5. 查询识别结果
-- 查看最新识别结果
SELECT
file_path,
recognized_content,
change_type,
processed_at
FROM dish_recognition_results
ORDER BY processed_at DESC
LIMIT 20;
-- 统计每类菜品识别数量
SELECT
recognized_content,
COUNT(*) AS cnt
FROM dish_recognition_results
GROUP BY recognized_content
ORDER BY cnt DESC;