mirror of
https://github.com/hotyue/IP-Sentinel.git
synced 2026-06-05 08:49:46 +08:00
refactor(scripts): 重构活体新闻融合引擎,彻底解除硬编码字典,实现多语种自适应
This commit is contained in:
@@ -3,9 +3,10 @@ import xml.etree.ElementTree as ET
|
||||
import os
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
|
||||
# ====================================================
|
||||
# 脚本功能: 5+25 混合流量注入矩阵 (双栈 IPv4+IPv6 防风控版)
|
||||
# 脚本功能: 5+25 混合流量注入矩阵 (自适应解耦版)
|
||||
# 核心指标: 5条基石 + 25条活体新闻 = 30条确保双栈机型完全可达的白名单
|
||||
# ====================================================
|
||||
|
||||
@@ -65,94 +66,98 @@ RSS_FEEDS = {
|
||||
}
|
||||
|
||||
def is_dual_stack_safe(url):
|
||||
"""网络可达性过滤: 阻断任何不支持双栈(IPv4+IPv6)的域名"""
|
||||
# 强制白名单:已被确认具有完备双栈 Anycast 防线的顶级骨干站群
|
||||
"""网络可达性过滤: 阻断任何不支持双栈(IPv4+IPv6)的动态新闻域名"""
|
||||
DUAL_STACK_SAFE_DOMAINS = [
|
||||
"google.com", "wikipedia.org", "apple.com", "microsoft.com",
|
||||
"wikimedia.org", "blogspot.com", "yahoo.com"
|
||||
]
|
||||
return any(domain in url for domain in DUAL_STACK_SAFE_DOMAINS)
|
||||
|
||||
def fetch_rss_links(region_code, max_items=25):
|
||||
"""拉取 25 条支持双栈访问的活体新闻链接"""
|
||||
feeds = RSS_FEEDS.get(region_code, RSS_FEEDS["US"])
|
||||
def fetch_rss_links(lang_params, region_name, max_items=25):
|
||||
"""根据 JSON 提供的语言参数,动态拼接对应国家的 Google 新闻源"""
|
||||
# [核心解耦]:直接使用 json 文件中的 lang_params,彻底淘汰 RSS_FEEDS 字典
|
||||
url = f"https://news.google.com/rss?{lang_params}"
|
||||
links = []
|
||||
|
||||
for url in feeds:
|
||||
try:
|
||||
# 动态抽取随机UA防风控
|
||||
dynamic_headers = {'User-Agent': random.choice(USER_AGENTS)}
|
||||
req = urllib.request.Request(url, headers=dynamic_headers)
|
||||
with urllib.request.urlopen(req, timeout=10) as response:
|
||||
xml_data = response.read()
|
||||
root = ET.fromstring(xml_data)
|
||||
for item in root.findall('.//item'):
|
||||
link = item.find('link')
|
||||
if link is not None and link.text:
|
||||
clean_link = link.text.strip()
|
||||
# 注入防断网熔断判定: 必须以 http 开头且具备双栈安全性
|
||||
if clean_link.startswith('http') and is_dual_stack_safe(clean_link):
|
||||
links.append(clean_link)
|
||||
except Exception as e:
|
||||
print(f"⚠️ [{region_code}] RSS 抓取异常 ({url}): {e}")
|
||||
|
||||
# 高度去重并随机打乱,精准截取 max_items
|
||||
try:
|
||||
dynamic_headers = {'User-Agent': random.choice(USER_AGENTS)}
|
||||
req = urllib.request.Request(url, headers=dynamic_headers)
|
||||
with urllib.request.urlopen(req, timeout=10) as response:
|
||||
xml_data = response.read()
|
||||
root = ET.fromstring(xml_data)
|
||||
for item in root.findall('.//item'):
|
||||
link = item.find('link')
|
||||
if link is not None and link.text:
|
||||
clean_link = link.text.strip()
|
||||
# 仅对动态获取的不可控新闻链接应用双栈过滤
|
||||
if clean_link.startswith('http') and is_dual_stack_safe(clean_link):
|
||||
links.append(clean_link)
|
||||
except Exception as e:
|
||||
print(f"⚠️ [{region_name}] 本土 RSS 抓取异常 ({url}): {e}")
|
||||
|
||||
unique_links = list(set(links))
|
||||
random.shuffle(unique_links)
|
||||
return unique_links[:max_items]
|
||||
|
||||
def process_json_file(file_path, region_code):
|
||||
"""融合静态基石与动态新闻"""
|
||||
def process_json_file(file_path):
|
||||
"""解析单节点配置并执行流量注入"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
trust_mod = data.get("trust_module", {})
|
||||
if not trust_mod:
|
||||
google_mod = data.get("google_module", {})
|
||||
region_name = data.get("region_name", "Unknown")
|
||||
|
||||
if not trust_mod or not google_mod:
|
||||
return
|
||||
|
||||
# 1. 基石处理:提取静态配额,确保双栈安全后精准截取 5 条
|
||||
# 1. 提取语言参数 (如 hl=en-AE&gl=AE)
|
||||
lang_params = google_mod.get("lang_params", "hl=en-US&gl=US")
|
||||
|
||||
# 2. 智能提取语种前缀,用于维基百科的本土化 (如 hl=ja 提取出 ja)
|
||||
hl_match = re.search(r'hl=([a-zA-Z]+)', lang_params)
|
||||
# 如果带有地区后缀(如 zh-TW, en-GB),则只取前面代表语种的字母
|
||||
lang_prefix = hl_match.group(1).split('-')[0].lower() if hl_match else 'en'
|
||||
|
||||
# 3. 基石处理:【修复点】直接提取静态基石,不做双栈硬编码过滤,保护本地化域名!
|
||||
static_urls = trust_mod.get("static_urls", [])
|
||||
static_urls = [u for u in static_urls if is_dual_stack_safe(u)]
|
||||
|
||||
if len(static_urls) < 5:
|
||||
# 基础数据残缺时,进行高权重锚点动态扩容补齐
|
||||
static_urls += ["https://en.wikipedia.org/wiki/Special:Random", "https://www.apple.com/", "https://www.microsoft.com/"]
|
||||
# 基础数据残缺时,采用本土化的 Wikipedia 域名垫底
|
||||
static_urls += [f"https://{lang_prefix}.wikipedia.org/wiki/Special:Random", "https://www.apple.com/", "https://www.microsoft.com/"]
|
||||
random.shuffle(static_urls)
|
||||
final_static = list(set(static_urls))[:5]
|
||||
|
||||
# 2. 活体处理:拉取 25 条双栈活体新闻
|
||||
final_news = fetch_rss_links(region_code, max_items=25)
|
||||
# 4. 将本地语言参数传给 RSS 爬虫,拉取活体新闻
|
||||
final_news = fetch_rss_links(lang_params, region_name, max_items=25)
|
||||
|
||||
# 3. 战术混合:5条基石 + 25条活体 = 30条
|
||||
combined_urls = list(set(final_static + final_news))
|
||||
|
||||
# 如果依然不够 30 条,用完美支持双栈的维基百科随机锚点疯狂垫高充能
|
||||
# 5. 采用完全本土化的维基百科执行最终充能
|
||||
while len(combined_urls) < 30:
|
||||
combined_urls.append(f"https://en.wikipedia.org/wiki/Special:Random?r={random.randint(1,100000)}")
|
||||
combined_urls.append(f"https://{lang_prefix}.wikipedia.org/wiki/Special:Random?r={random.randint(1,100000)}")
|
||||
combined_urls = list(set(combined_urls))
|
||||
|
||||
# 精准切片为 30 条,彻底打碎机械顺序特征
|
||||
final_white_list = combined_urls[:30]
|
||||
random.shuffle(final_white_list)
|
||||
|
||||
# 覆写固化回 white_urls 字段
|
||||
trust_mod["white_urls"] = final_white_list
|
||||
data["trust_module"] = trust_mod
|
||||
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"✅ [信用融合] {os.path.basename(file_path)}: 固化基石 {len(final_static)} 条 + 活体新闻 {len(final_news)} 条 = 统合满编 {len(final_white_list)} 条")
|
||||
print(f"✅ [信用融合] {os.path.basename(file_path)} (语系: {lang_prefix}): 固化基石 {len(final_static)} 条 + 活体新闻 {len(final_news)} 条 = 统合满编 {len(final_white_list)} 条")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ [处理失败] {file_path}: {e}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("========== 启动 IP-Sentinel 活体新闻流融合引擎 (30轮大容量双栈版) ==========")
|
||||
print("========== 启动 IP-Sentinel 活体新闻流融合引擎 (自适应架构版) ==========")
|
||||
for root_dir, _, files in os.walk(REGIONS_DIR):
|
||||
for file in files:
|
||||
if file.endswith(".json"):
|
||||
file_path = os.path.join(root_dir, file)
|
||||
region_code = os.path.relpath(file_path, REGIONS_DIR).split(os.sep)[0]
|
||||
process_json_file(file_path, region_code)
|
||||
process_json_file(file_path)
|
||||
print("========== 融合引擎执行完毕 ==========")
|
||||
Reference in New Issue
Block a user