refactor: optimize tunnel management, add log cleanup throttling, and improve proxy connection reliability

This commit is contained in:
baoweise-bot
2026-05-29 23:41:05 +08:00
parent 88d241f595
commit 531beca616
3 changed files with 94 additions and 74 deletions

52
.gitignore vendored
View File

@@ -1,37 +1,67 @@
# Nuitka compiler intermediate folders
# ============================================
# AimiliVPN .gitignore
# ============================================
# ---- Nuitka 编译产物 ----
vpngate_manager.build/
vpngate_manager.dist/
*.build/
*.dist/
aimilivpn
# Debian building artifacts
# ---- Debian 打包产物 ----
*.deb
aimilivpn_*
# Python caches and bytecode
# ---- Python 缓存 ----
__pycache__/
*.pyc
*.pyo
*.pyd
*.egg-info/
dist/
build/
*.egg
# Local runtime data folders
# ---- 运行时数据 ----
vpngate_data/
logs/
*.log
*.tmp
*.pid
# IDE settings
# ---- IDE 和编辑器 ----
.vscode/
.idea/
*.swp
*.swo
*~
.project
.classpath
.settings/
# ---- AI 编码助手 ----
.gemini/
.cursor/
.cursorrules
.aider*
.codeium/
.continue/
.copilot/
# Scratch and test files containing credentials
scratch/
*连接配置*
# Local analysis and doc files
# ---- 本地开发文档(不上传 GitHub ----
AI开发指令.md
功能文档.md
AimiliVPN_Analysis_ZH.md
# ---- 临时和测试文件 ----
scratch/
*连接配置*
test_*.py
*.bak
*.orig
# ---- 系统文件 ----
.DS_Store
Thumbs.db
Desktop.ini

View File

@@ -183,7 +183,7 @@ def get_physical_interface() -> str | None:
except (ValueError, IndexError):
continue
if routes:
routes.sort(key=lambda x: x[2], reverse=True)
routes.sort(key=lambda x: x[2])
for gw, dev, metric in routes:
if not dev.startswith(("tun", "tap", "wg", "ppp")):
return dev

View File

@@ -155,9 +155,15 @@ def get_session_token(password: str, username: str = "admin") -> str:
salt = "aimilivpn_secure_salt_2026"
return hashlib.sha256((username + ":" + password + salt).encode("utf-8")).hexdigest()
_last_cleanup_time = 0.0
def cleanup_old_logs(logs_dir: Path) -> None:
global _last_cleanup_time
now = time.time()
if now - _last_cleanup_time < 3600:
return
_last_cleanup_time = now
try:
now = time.time()
three_days_sec = 3 * 24 * 60 * 60
for path in logs_dir.glob("*.json"):
match = re.match(r"^(\d{4}-\d{2}-\d{2})\.json$", path.name)
@@ -706,8 +712,12 @@ def test_multiple_nodes(node_ids: list[str]) -> list[dict[str, Any]]:
pass
latency = vpn_utils.ping_latency_ms(h, p, fallback_ping)
dev_name = f"tun{idx + 1}"
ok, message, _ = run_openvpn_until_ready(config_file, keep_alive=False, route_nopull=True, timeout=12, dev=dev_name)
tun_idx = get_free_test_index()
dev_name = f"tun{tun_idx}"
try:
ok, message, _ = run_openvpn_until_ready(config_file, keep_alive=False, route_nopull=True, timeout=12, dev=dev_name)
finally:
release_test_index(tun_idx)
try:
if temp_path.exists():
@@ -3292,12 +3302,16 @@ def check_proxy_health() -> dict[str, Any]:
s.settimeout(1.5)
try:
s.connect(("127.0.0.1", LOCAL_PROXY_PORT))
s.close()
except Exception as e:
return {
"ok": False,
"error": f"代理服务未运行 (端口 {LOCAL_PROXY_PORT} 连接失败,原因: {e})"
}
finally:
try:
s.close()
except Exception:
pass
# 2. 检测虚拟网卡 tun0 是否存在 (Linux 下)
tun_path = Path("/sys/class/net/tun0")
@@ -3308,46 +3322,43 @@ def check_proxy_health() -> dict[str, Any]:
}
# 3. 使用 curl 通过本地 SOCKS5 代理接口测试 IP 与实际延迟
cmd = [
"curl", "-4", "-s",
"-w", "\n%{time_total} %{http_code}",
"-x", f"socks5h://127.0.0.1:{LOCAL_PROXY_PORT}",
"http://ip.sb",
"--max-time", "5"
]
def _curl_check_ip(url: str) -> dict[str, Any] | None:
cmd = [
"curl", "-4", "-s",
"-w", "\n%{time_total} %{http_code}",
"-x", f"socks5h://127.0.0.1:{LOCAL_PROXY_PORT}",
url,
"--max-time", "5"
]
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=6)
if res.returncode == 0:
lines = res.stdout.strip().splitlines()
if len(lines) >= 2:
ip = lines[0].strip()
time_info = lines[1].strip().split()
if len(time_info) == 2:
total_time_str, http_code = time_info
if http_code == "200" and ip:
latency_ms = int(float(total_time_str) * 1000)
return {"ok": True, "ip": ip, "latency_ms": latency_ms}
except Exception:
pass
return None
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=6)
if res.returncode == 0:
lines = res.stdout.strip().splitlines()
if len(lines) >= 2:
ip = lines[0].strip()
time_info = lines[1].strip().split()
if len(time_info) == 2:
total_time_str, http_code = time_info
if http_code == "200" and ip:
latency_ms = int(float(total_time_str) * 1000)
return {"ok": True, "ip": ip, "latency_ms": latency_ms}
# 如果 ip.sb 失败,使用备用地址 http://api.ipify.org
cmd[7] = "http://api.ipify.org"
res = subprocess.run(cmd, capture_output=True, text=True, timeout=6)
if res.returncode == 0:
lines = res.stdout.strip().splitlines()
if len(lines) >= 2:
ip = lines[0].strip()
time_info = lines[1].strip().split()
if len(time_info) == 2:
total_time_str, http_code = time_info
if http_code == "200" and ip:
latency_ms = int(float(total_time_str) * 1000)
return {"ok": True, "ip": ip, "latency_ms": latency_ms}
return {"ok": False, "error": f"出口连接测试失败 (curl 返回码: {res.returncode}, stderr: {res.stderr.strip()})"}
result = _curl_check_ip("http://ip.sb")
if result:
return result
result = _curl_check_ip("http://api.ipify.org")
if result:
return result
return {"ok": False, "error": "出口连接测试失败 (ip.sb 和 api.ipify.org 均无法连通)"}
except Exception as e:
return {"ok": False, "error": f"出口连接测试异常: {e}"}
def background_proxy_checker() -> None:
time.sleep(2)
time.sleep(30)
while True:
try:
if is_connecting:
@@ -3392,7 +3403,6 @@ def background_proxy_checker() -> None:
time.sleep(30)
def active_node_pinger() -> None:
global active_openvpn_node_id, is_connecting
while True:
try:
if active_openvpn_running() and active_openvpn_node_id:
@@ -3423,28 +3433,8 @@ def active_node_pinger() -> None:
class Handler(BaseHTTPRequestHandler):
def get_secret_path(self) -> str:
auth_file = DATA_DIR / "ui_auth.json"
if not auth_file.exists():
try:
DATA_DIR.mkdir(exist_ok=True)
auth_file.write_text(json.dumps({"secret_path": "EJsW2EeBo9lY"}), encoding="utf-8")
except Exception:
pass
return "EJsW2EeBo9lY"
try:
creds = json.loads(auth_file.read_text(encoding="utf-8"))
if "secret_path" in creds:
return creds["secret_path"]
elif "password" in creds:
secret_path = creds["password"]
try:
auth_file.write_text(json.dumps({"secret_path": secret_path}), encoding="utf-8")
except Exception:
pass
return secret_path
return "EJsW2EeBo9lY"
except Exception:
return "EJsW2EeBo9lY"
ui_cfg = load_ui_config()
return ui_cfg.get("secret_path", "EJsW2EeBo9lY")
def is_authorized(self) -> bool:
ui_cfg = load_ui_config()