-- 先建 raw 消息接收表(JSON 字符串)
CREATE TABLE IF NOT EXISTS best_practice_financial_risk.kafka_txn_raw (value STRING);
-- 创建 Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_financial_risk.pipe_txn_stream
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_financial_risk.kafka_txn_raw
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- 替换为实际 broker 地址
'bank_transactions', -- Kafka topic 名称
'',
'cz_fraud_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_financial_risk.ods_customers
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
也可直接内联插入小批量测试数据(不需要 CSV 文件):
INSERT INTO best_practice_financial_risk.ods_customers VALUES
('4532117694074009','John','Smith','M','123 Oak St','Austin','TX','78701',30.2672,-97.7431,950000,'Software Engineer','1985-06-15'),
('4716058826889367','Mary','Johnson','F','456 Elm Ave','Dallas','TX','75201',32.7767,-96.7970,1343000,'Accountant','1990-03-22'),
('4929429090508220','Robert','Williams','M','789 Pine Rd','Houston','TX','77001',29.7604,-95.3698,2300000,'Doctor','1978-11-08'),
('4532117691234567','Linda','Brown','F','321 Maple Dr','San Antonio','TX','78205',29.4241,-98.4936,1434000,'Teacher','1995-07-30'),
('4716058821111222','James','Davis','M','654 Birch Ln','Phoenix','AZ','85001',33.4484,-112.0740,1600000,'Manager','1982-09-14'),
('4929429095555666','Patricia','Miller','F','987 Cedar St','Chicago','IL','60601',41.8781,-87.6298,2700000,'Nurse','1988-04-25'),
('4532117697654321','Michael','Wilson','M','147 Spruce Ave','Los Angeles','CA','90001',34.0522,-118.2437,3980000,'Engineer','1975-12-03'),
('4716058828888999','Jennifer','Moore','F','258 Walnut Rd','New York','NY','10001',40.7128,-74.0060,8336000,'Lawyer','1992-08-17'),
('4929429093333444','David','Taylor','M','369 Hickory Dr','Seattle','WA','98101',47.6062,-122.3321,724000,'Data Scientist','1986-02-28'),
('4532117692222333','Barbara','Anderson','F','741 Ash Blvd','Miami','FL','33101',25.7617,-80.1918,460000,'Marketing','1993-11-11');
INSERT INTO best_practice_financial_risk.ods_transactions VALUES
('TXN001','4532117694074009','fraud_Kirlin and Sons','grocery_pos',9.36,'John','Smith','M','123 Oak St','Austin','TX','78701',30.2672,-97.7431,950000,'Software Engineer','1985-06-15','tx001',1325376018,30.4127,-97.8974,0,CAST('2020-01-01 00:00:18' AS TIMESTAMP)),
('TXN002','4716058826889367','fraud_Sporer-Keebler','entertainment',2529.0,'Mary','Johnson','F','456 Elm Ave','Dallas','TX','75201',32.7767,-96.7970,1343000,'Accountant','1990-03-22','tx002',1325376075,33.4897,-96.9132,1,CAST('2020-01-01 00:01:15' AS TIMESTAMP)),
('TXN007','4532117697654321','fraud_Olson, Becker and Koch','shopping_net',1987.40,'Michael','Wilson','M','147 Spruce Ave','Los Angeles','CA','90001',34.0522,-118.2437,3980000,'Engineer','1975-12-03','tx007',1325379440,34.1808,-118.4634,1,CAST('2020-01-01 00:57:20' AS TIMESTAMP)),
('TXN018','4716058828888999','fraud_Sauer-Kessler','entertainment',4500.00,'Jennifer','Moore','F','258 Walnut Rd','New York','NY','10001',40.7128,-74.0060,8336000,'Lawyer','1992-08-17','tx018',1325386200,40.9345,-74.1234,1,CAST('2020-01-01 02:50:00' AS TIMESTAMP))
-- ... 完整 20 条,此处省略
;
验证 ODS 层行数:
SELECT COUNT(*) AS total_txns,
SUM(is_fraud) AS fraud_count,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct
FROM best_practice_financial_risk.ods_transactions;
-- 创建脱敏函数
CREATE OR REPLACE FUNCTION best_practice_financial_risk.mask_cc_num(cc_num STRING)
RETURNS STRING
AS CASE
WHEN current_user() IN ('privileged_user') THEN cc_num -- 替换为实际获授权的用户名
ELSE CONCAT('****-****-****-', SUBSTRING(cc_num, LENGTH(cc_num) - 3, 4))
END;
-- 绑定到 ods_transactions.cc_num 列
ALTER TABLE best_practice_financial_risk.ods_transactions
CHANGE COLUMN cc_num
SET MASK best_practice_financial_risk.mask_cc_num;
SELECT txn_id, cc_num, merchant, category, amt, is_fraud, dist_km, city, state, age
FROM best_practice_financial_risk.dwd_txn_events
ORDER BY txn_time
LIMIT 5;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_financial_risk.dws_user_risk_features
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
cc_num,
first_name,
last_name,
state,
job,
age,
-- 交易统计(全量历史)
COUNT(*) AS txn_total,
ROUND(SUM(amt), 2) AS amt_total,
ROUND(AVG(amt), 2) AS amt_avg,
ROUND(MAX(amt), 2) AS amt_max,
ROUND(STDDEV_POP(amt), 2) AS amt_stddev,
-- 历史欺诈记录
SUM(is_fraud) AS fraud_history_count,
-- 按品类交易次数
COUNT(CASE WHEN category = 'shopping_net' THEN 1 END) AS cat_shopping_net,
COUNT(CASE WHEN category = 'entertainment' THEN 1 END) AS cat_entertainment,
COUNT(CASE WHEN category = 'grocery_pos' THEN 1 END) AS cat_grocery,
COUNT(CASE WHEN category = 'food_dining' THEN 1 END) AS cat_food_dining,
-- 最近一次交易时间
MAX(txn_time) AS last_txn_time,
-- 高金额交易次数(单笔 > 1000)
COUNT(CASE WHEN amt > 1000 THEN 1 END) AS high_amt_txn_count
FROM best_practice_financial_risk.dwd_txn_events
GROUP BY cc_num, first_name, last_name, state, job, age;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_financial_risk.ads_txn_risk_score
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
t.txn_id,
t.cc_num,
t.txn_time,
t.merchant,
t.category,
t.amt,
t.dist_km,
t.city,
t.state,
t.is_fraud,
u.amt_avg,
u.amt_stddev,
u.fraud_history_count,
u.high_amt_txn_count,
-- 实时风险评分
ROUND(best_practice_financial_risk.calc_txn_risk_score(
t.amt,
u.amt_avg,
u.amt_stddev,
t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
), 2) AS risk_score,
-- 风险等级
CASE
WHEN best_practice_financial_risk.calc_txn_risk_score(
t.amt, u.amt_avg, u.amt_stddev, t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
) >= 60 THEN 'HIGH'
WHEN best_practice_financial_risk.calc_txn_risk_score(
t.amt, u.amt_avg, u.amt_stddev, t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
) >= 30 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_level
FROM best_practice_financial_risk.dwd_txn_events t
LEFT JOIN best_practice_financial_risk.dws_user_risk_features u
ON t.cc_num = u.cc_num;
SELECT
risk_level,
COUNT(*) AS txn_count,
SUM(is_fraud) AS fraud_in_bucket,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct,
ROUND(AVG(risk_score), 1) AS avg_score
FROM best_practice_financial_risk.ads_txn_risk_score
GROUP BY risk_level
ORDER BY avg_score DESC;
MEDIUM 风险区间欺诈率达 100%,说明评分模型对高风险交易的识别效果显著。LOW 区间仍有 18.8% 的欺诈,原因是部分欺诈交易单笔金额相对较小,没有触发金额偏差因子,后续可结合序列行为特征进一步优化评分模型。
按品类的欺诈率分析
SELECT
category,
COUNT(*) AS txn_count,
SUM(is_fraud) AS fraud_count,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct,
ROUND(AVG(amt), 2) AS avg_amt,
ROUND(MAX(amt), 2) AS max_amt
FROM best_practice_financial_risk.ads_txn_risk_score
GROUP BY category
ORDER BY fraud_rate_pct DESC;
-- 创建角色
CREATE ROLE IF NOT EXISTS risk_analyst;
CREATE ROLE IF NOT EXISTS risk_interception;
CREATE ROLE IF NOT EXISTS audit_admin;
-- risk_analyst:可查 DWD 和 DWS(动态表用 DYNAMIC TABLE 关键字)
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dwd_txn_events
TO ROLE risk_analyst;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dws_user_risk_features
TO ROLE risk_analyst;
-- risk_interception:仅查 ADS 输出(动态表)
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.ads_txn_risk_score
TO ROLE risk_interception;
-- audit_admin:全层访问
GRANT SELECT ON TABLE best_practice_financial_risk.ods_transactions
TO ROLE audit_admin;
GRANT SELECT ON TABLE best_practice_financial_risk.ods_customers
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dwd_txn_events
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dws_user_risk_features
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.ads_txn_risk_score
TO ROLE audit_admin;