是高基数字符串列,安全分析师频繁以特定 IP 为条件点查,适合 Bloomfilter Index。
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_source_ip
ON TABLE doc_raw_logs (source_ip);
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_dest_ip
ON TABLE doc_raw_logs (dest_ip);
⚠️ 注意:
CREATE BLOOMFILTER INDEX
CREATE BLOOMFILTER INDEX
要求索引与目标表在同一 Schema 上下文,执行前先
USE SCHEMA best_practice_soc_log
USE SCHEMA best_practice_soc_log
或通过
-s best_practice_soc_log
-s best_practice_soc_log
参数切换,否则报"index and table must in the same schema"错误。
创建 Inverted Index
攻击工具通常在
user_agent
user_agent
中留下特征字符串(如
SQLMap
SQLMap
、
Nmap
Nmap
、
Metasploit
Metasploit
),
request_path
request_path
包含注入载荷(如
/admin?id=1 OR 1=1
/admin?id=1 OR 1=1
)。两列建立 Inverted Index 支持全文检索:
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_user_agent
ON TABLE doc_raw_logs (user_agent)
PROPERTIES('analyzer'='english');
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_request_path
ON TABLE doc_raw_logs (request_path)
PROPERTIES('analyzer'='keyword');
user_agent
user_agent
使用
english
english
分词器,按单词切分;
request_path
request_path
使用
keyword
keyword
,整体匹配不切分,保留 URL 语义。
验证索引创建结果:
SHOW INDEXES FROM best_practice_soc_log.doc_raw_logs;
ALTER PIPE pipe_raw_logs SET INGEST_MODE = LIST_PURGE
ALTER PIPE pipe_raw_logs SET INGEST_MODE = LIST_PURGE
。
方式二:INSERT 模拟(无 OSS 环境时)
若暂未配置 OSS,可通过
INSERT INTO
INSERT INTO
直接写入,模拟 OSS PIPE 已解析写入的效果:
从本地 CSV 导入数据(推荐):
-- 第一步:通过 SQL PUT 将本地 CSV 文件上传到 User Volume
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- 第二步:从 User Volume COPY INTO 表
COPY INTO best_practice_soc_log.doc_raw_logs
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
SELECT threat_label, log_type, COUNT(*) AS cnt
FROM best_practice_soc_log.doc_raw_logs
GROUP BY threat_label, log_type
ORDER BY threat_label, cnt DESC;
Bloomfilter Index 在 block 级别快速排除不含该 IP 的数据块,只扫描命中块。
IP 风险分级 UDF
将 IP 风险判定逻辑封装为 SQL UDF,Silver 和 Gold 层均可复用:
CREATE OR REPLACE FUNCTION best_practice_soc_log.classify_ip_risk(
ip STRING,
threat_label STRING,
is_attack_tool INT
)
RETURNS STRING
AS CASE
WHEN threat_label = 'malicious' THEN 'HIGH'
WHEN threat_label = 'suspicious' AND is_attack_tool = 1 THEN 'HIGH'
WHEN threat_label = 'suspicious' THEN 'MEDIUM'
WHEN is_attack_tool = 1 THEN 'MEDIUM'
ELSE 'LOW'
END;
SELECT source_ip, threat_label, is_attack_tool,
best_practice_soc_log.classify_ip_risk(source_ip, threat_label, is_attack_tool) AS ip_risk_level
FROM best_practice_soc_log.doc_silver_normalized_logs
WHERE threat_label != 'benign'
ORDER BY ip_risk_level
LIMIT 8;
source_ip | threat_label | is_attack_tool | ip_risk_level
-----------------|--------------|----------------|---------------
103.22.200.174 | suspicious | 1 | HIGH
45.142.213.99 | suspicious | 1 | HIGH
185.220.101.33 | malicious | 1 | HIGH
198.199.119.1 | malicious | 0 | HIGH
91.240.118.172 | malicious | 1 | HIGH
104.21.58.152 | malicious | 0 | HIGH
45.95.147.236 | malicious | 1 | HIGH
177.52.183.80 | suspicious | 0 | MEDIUM
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_silver_normalized_logs
AS
SELECT
log_id,
log_timestamp,
DATE(log_timestamp) AS log_date,
HOUR(log_timestamp) AS log_hour,
source_ip,
dest_ip,
protocol,
action,
threat_label,
log_type,
bytes_transferred,
user_agent,
request_path,
ingest_time,
-- 是否内网源 IP(RFC 1918 地址段)
CASE
WHEN source_ip LIKE '192.168.%'
OR source_ip LIKE '10.%'
OR source_ip LIKE '172.16.%'
THEN 1 ELSE 0
END AS is_internal_src,
-- 威胁等级数字化
CASE threat_label
WHEN 'malicious' THEN 3
WHEN 'suspicious' THEN 2
WHEN 'benign' THEN 1
ELSE 0
END AS threat_level,
-- 攻击工具指纹
CASE WHEN user_agent IN (
'SQLMap/1.6-dev','Nmap Scripting Engine',
'Metasploit v6.3','Cobalt Strike','Havoc/0.7')
THEN 1 ELSE 0
END AS is_attack_tool
FROM best_practice_soc_log.doc_raw_logs;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_gold_threat_summary
AS
SELECT
source_ip,
log_date,
log_type,
COUNT(*) AS total_events,
SUM(CASE WHEN threat_label = 'malicious' THEN 1 ELSE 0 END) AS malicious_cnt,
SUM(CASE WHEN threat_label = 'suspicious' THEN 1 ELSE 0 END) AS suspicious_cnt,
SUM(CASE WHEN action = 'blocked' THEN 1 ELSE 0 END) AS blocked_cnt,
SUM(is_attack_tool) AS attack_tool_cnt,
MAX(threat_level) AS max_threat_level,
SUM(bytes_transferred) AS total_bytes,
COUNT(DISTINCT dest_ip) AS unique_dest_count
FROM best_practice_soc_log.doc_silver_normalized_logs
GROUP BY source_ip, log_date, log_type;
高风险外网 IP 排行
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_gold_high_risk_ips
AS
SELECT
source_ip,
COUNT(DISTINCT log_date) AS active_days,
COUNT(*) AS total_events,
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1 ELSE 0 END) AS threat_events,
SUM(is_attack_tool) AS attack_tool_hits,
ROUND(
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1.0 ELSE 0.0 END)
/ COUNT(*) * 100, 2
) AS threat_rate_pct,
MAX(log_timestamp) AS last_seen,
COUNT(DISTINCT dest_ip) AS targets_count
FROM best_practice_soc_log.doc_silver_normalized_logs
WHERE is_internal_src = 0
GROUP BY source_ip
HAVING threat_events > 0;
-- 回溯 2026-06-06 23:37:30 时刻的数据状态
SELECT COUNT(*) AS row_count
FROM best_practice_soc_log.doc_raw_logs
TIMESTAMP AS OF '2026-06-06 23:37:30';
row_count
---------
30
溯源攻击路径
针对已知恶意 IP,回溯其在攻击时间窗口内的所有操作记录:
SELECT log_timestamp, source_ip, dest_ip, protocol, action, request_path
FROM best_practice_soc_log.doc_raw_logs
TIMESTAMP AS OF '2026-06-06 23:39:07'
WHERE source_ip IN ('185.220.101.33', '198.199.119.1', '45.95.147.236')
ORDER BY log_timestamp;
Time Travel 保留周期默认 7 天,在此窗口内可随时回溯任意历史版本,无需额外备份。
攻击路径分析
结合 Gold 层聚合数据,快速定位高频攻击路径:
SELECT
request_path,
COUNT(*) AS total_hits,
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1 ELSE 0 END) AS threat_hits
FROM best_practice_soc_log.doc_raw_logs
GROUP BY request_path
ORDER BY threat_hits DESC, total_hits DESC
LIMIT 8;