CREATE SCHEMA IF NOT EXISTS best_practice_fraud_graph;
ODS 层:原始节点与边表
建表
-- 账户节点:记录每个账户的注册信息和风险标签
CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_account_node (
account_id STRING,
register_time TIMESTAMP,
register_ip STRING,
phone_tail STRING,
id_cert_hash STRING,
account_age_days INT,
is_verified INT,
risk_label INT -- 0: 正常 1: 已知欺诈
);
-- 设备节点:记录设备基础属性
CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_device_node (
device_id STRING,
device_type STRING,
os_type STRING,
first_seen TIMESTAMP,
account_count INT
);
-- IP 节点:记录 IP 基础属性和风险评分
CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_ip_node (
ip_addr STRING,
isp STRING,
city STRING,
risk_score DOUBLE,
account_count INT
);
-- 交易边:账户间的资金转移关系
CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_transaction_edge (
txn_id STRING,
src_account_id STRING,
dst_account_id STRING,
amount DOUBLE,
txn_time TIMESTAMP,
channel STRING,
status STRING,
is_suspicious INT
);
-- 账户-设备关联边:账户登录设备的绑定关系
CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_account_device_edge (
account_id STRING,
device_id STRING,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
login_count INT
);
创建 Bloomfilter Index 和 Inverted Index
doc_transaction_edge.src_account_id
doc_transaction_edge.src_account_id
和
doc_account_device_edge.device_id
doc_account_device_edge.device_id
均为高基数列,点查频繁,适合 Bloomfilter Index。
-- 交易边:按发起账户精确过滤
CREATE BLOOMFILTER INDEX IF NOT EXISTS best_practice_fraud_graph.idx_bf_txn_src
ON TABLE best_practice_fraud_graph.doc_transaction_edge (src_account_id);
-- 账户-设备边:按设备 ID 精确过滤
CREATE BLOOMFILTER INDEX IF NOT EXISTS best_practice_fraud_graph.idx_bf_device_id
ON TABLE best_practice_fraud_graph.doc_account_device_edge (device_id);
-- IP 节点:按城市关键字精确匹配
CREATE INVERTED INDEX IF NOT EXISTS best_practice_fraud_graph.idx_inv_city
ON TABLE best_practice_fraud_graph.doc_ip_node (city)
PROPERTIES ('analyzer'='keyword');
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/doc_account_node.csv' TO USER VOLUME FILE 'doc_account_node.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_fraud_graph.doc_account_node
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_account_node.csv');
SELECT COUNT(*) AS account_count FROM best_practice_fraud_graph.doc_account_node;
account_count
-------------
20
SELECT COUNT(*) AS txn_count FROM best_practice_fraud_graph.doc_transaction_edge;
SELECT COUNT(*) AS edge_count FROM best_practice_fraud_graph.doc_account_device_edge;
txn_count
---------
25
edge_count
----------
20
数据结构说明:模拟数据中,A001/A002/A003 三个账户在注册时均使用了同一 IP
192.168.10.1
192.168.10.1
,且通过同一台设备 D001 登录;A001、A002、A003 之间存在多笔快速转账交易,
is_suspicious=1
is_suspicious=1
。这构成一个典型的欺诈团伙模式。
通过 MERGE INTO 增量更新边表
在生产环境中,新的登录事件持续产生,账户与设备的绑定关系需要增量更新而非全量替换。MERGE INTO 可以在发现已有
(account_id, device_id)
(account_id, device_id)
组合时更新
last_seen
last_seen
和
login_count
login_count
,首次出现时插入新行:
MERGE INTO best_practice_fraud_graph.doc_account_device_edge AS t
USING (
SELECT 'A001' AS account_id, 'D001' AS device_id,
CAST('2025-06-01 10:00:00' AS TIMESTAMP) AS last_seen,
1 AS new_logins
) AS s
ON t.account_id = s.account_id AND t.device_id = s.device_id
WHEN MATCHED THEN
UPDATE SET
last_seen = s.last_seen,
login_count = t.login_count + s.new_logins
WHEN NOT MATCHED THEN
INSERT (account_id, device_id, first_seen, last_seen, login_count)
VALUES (s.account_id, s.device_id, s.last_seen, s.last_seen, s.new_logins);
💡 提示:在 Kafka CDC 接入场景下,
USING
USING
子查询替换为从 Kafka Topic 实时解析的登录事件流,Dynamic Table 上游保持不变,MERGE INTO 逻辑同样适用。
CREATE OR REPLACE FUNCTION best_practice_fraud_graph.calc_gang_risk_score(
suspicious_rate DOUBLE,
shared_device_pairs INT,
ip_risk_score DOUBLE,
is_verified INT
)
RETURNS DOUBLE
AS GREATEST(0.0, LEAST(100.0,
suspicious_rate * 40.0
+ CASE WHEN shared_device_pairs >= 2 THEN 30.0
WHEN shared_device_pairs = 1 THEN 15.0
ELSE 0.0 END
+ ip_risk_score * 20.0
+ CASE WHEN is_verified = 0 THEN 10.0 ELSE 0.0 END
));
验证函数——高风险账户(可疑率 100%、共用 2 个设备对、高风险 IP、未实名):
SELECT best_practice_fraud_graph.calc_gang_risk_score(1.0, 2, 0.85, 0) AS sample_score;
sample_score
------------
97
该账户评分 97 分,属于 HIGH 风险区间,系统可直接拦截。
DWD 层 Dynamic Table:关系图谱边表
DWD 层做两件事:第一,将账户-设备关联边 SELF JOIN 找出"共设备账户对";第二,将交易边与账户节点 JOIN 补充双方的 IP 风险信息。
共设备账户对
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dwd_shared_device_pairs
AS
SELECT
a1.account_id AS account_id_1,
a2.account_id AS account_id_2,
a1.device_id AS shared_device_id,
a1.login_count AS login_count_1,
a2.login_count AS login_count_2,
LEAST(a1.last_seen, a2.last_seen) AS last_shared_time
FROM best_practice_fraud_graph.doc_account_device_edge a1
JOIN best_practice_fraud_graph.doc_account_device_edge a2
ON a1.device_id = a2.device_id
AND a1.account_id < a2.account_id;
交易图谱边(含风险标签)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dwd_txn_graph_edge
AS
SELECT
t.txn_id,
t.src_account_id,
t.dst_account_id,
t.amount,
t.txn_time,
t.channel,
t.status,
t.is_suspicious,
a_src.register_ip AS src_register_ip,
a_dst.register_ip AS dst_register_ip,
a_src.risk_label AS src_risk_label,
a_dst.risk_label AS dst_risk_label
FROM best_practice_fraud_graph.doc_transaction_edge t
JOIN best_practice_fraud_graph.doc_account_node a_src ON t.src_account_id = a_src.account_id
JOIN best_practice_fraud_graph.doc_account_node a_dst ON t.dst_account_id = a_dst.account_id;
手动触发首次刷新:
REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_shared_device_pairs;
REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_txn_graph_edge;
SELECT COUNT(*) AS pair_count FROM best_practice_fraud_graph.dwd_shared_device_pairs;
SELECT COUNT(*) AS edge_count FROM best_practice_fraud_graph.dwd_txn_graph_edge;
pair_count
----------
11
edge_count
----------
25
查看 D001 设备下的账户对——这是最典型的团伙特征:
SELECT account_id_1, account_id_2, shared_device_id, login_count_1, login_count_2
FROM best_practice_fraud_graph.dwd_shared_device_pairs
WHERE shared_device_id = 'D001';
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dws_device_cluster_stats
AS
SELECT
shared_device_id AS device_id,
COUNT(DISTINCT account_id_1)
+ COUNT(DISTINCT account_id_2) AS approx_account_count,
SUM(login_count_1 + login_count_2) AS total_login_count,
MIN(last_shared_time) AS earliest_shared,
MAX(last_shared_time) AS latest_shared,
COUNT(*) AS pair_count
FROM best_practice_fraud_graph.dwd_shared_device_pairs
GROUP BY shared_device_id;
账户交易风险统计
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dws_account_txn_risk
AS
SELECT
src_account_id AS account_id,
COUNT(*) AS total_txn_count,
SUM(CASE WHEN is_suspicious = 1 THEN 1 ELSE 0 END) AS suspicious_count,
ROUND(SUM(amount), 2) AS total_amount,
ROUND(AVG(amount), 2) AS avg_amount,
COUNT(DISTINCT dst_account_id) AS unique_dst_count,
COUNT(DISTINCT channel) AS channel_diversity,
ROUND(
SUM(CASE WHEN is_suspicious = 1 THEN 1 ELSE 0 END) * 1.0 / COUNT(*),
4
) AS suspicious_rate
FROM best_practice_fraud_graph.dwd_txn_graph_edge
GROUP BY src_account_id;
SELECT device_id, approx_account_count, total_login_count, pair_count
FROM best_practice_fraud_graph.dws_device_cluster_stats
ORDER BY pair_count DESC;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.ads_high_risk_account_blacklist
AS
SELECT
r.account_id,
r.total_txn_count,
r.suspicious_count,
r.suspicious_rate,
r.total_amount,
COALESCE(dc.pair_count, 0) AS shared_device_pair_count,
COALESCE(ip.risk_score, 0.0) AS register_ip_risk,
an.is_verified,
an.risk_label AS original_risk_label,
ROUND(
best_practice_fraud_graph.calc_gang_risk_score(
r.suspicious_rate,
CAST(COALESCE(dc.pair_count, 0) AS INT),
COALESCE(ip.risk_score, 0.0),
an.is_verified
), 2
) AS gang_risk_score,
CASE
WHEN best_practice_fraud_graph.calc_gang_risk_score(
r.suspicious_rate,
CAST(COALESCE(dc.pair_count, 0) AS INT),
COALESCE(ip.risk_score, 0.0),
an.is_verified
) >= 80 THEN 'HIGH'
WHEN best_practice_fraud_graph.calc_gang_risk_score(
r.suspicious_rate,
CAST(COALESCE(dc.pair_count, 0) AS INT),
COALESCE(ip.risk_score, 0.0),
an.is_verified
) >= 50 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_level,
CURRENT_TIMESTAMP() AS score_time
FROM best_practice_fraud_graph.dws_account_txn_risk r
JOIN best_practice_fraud_graph.doc_account_node an ON r.account_id = an.account_id
LEFT JOIN best_practice_fraud_graph.doc_account_device_edge ade ON r.account_id = ade.account_id
LEFT JOIN (
SELECT account_id_1 AS account_id, COUNT(*) AS pair_count
FROM best_practice_fraud_graph.dwd_shared_device_pairs GROUP BY account_id_1
UNION ALL
SELECT account_id_2 AS account_id, COUNT(*) AS pair_count
FROM best_practice_fraud_graph.dwd_shared_device_pairs GROUP BY account_id_2
) dc ON r.account_id = dc.account_id
LEFT JOIN best_practice_fraud_graph.doc_ip_node ip ON an.register_ip = ip.ip_addr;
SELECT account_id, gang_risk_score, risk_level, suspicious_rate, shared_device_pair_count
FROM best_practice_fraud_graph.ads_high_risk_account_blacklist
WHERE risk_level = 'HIGH'
GROUP BY account_id, gang_risk_score, risk_level, suspicious_rate, shared_device_pair_count
ORDER BY gang_risk_score DESC;
SELECT risk_level, COUNT(DISTINCT account_id) AS account_count
FROM best_practice_fraud_graph.ads_high_risk_account_blacklist
GROUP BY risk_level
ORDER BY account_count DESC;
risk_level | account_count
-----------+--------------
MEDIUM | 14
HIGH | 3
LOW | 3