#!/usr/bin/env python3 from __future__ import annotations import base64 import os import secrets import select import socket import threading import urllib.parse import time from typing import Any def parse_positive_int(value: str | None, default: int) -> int: try: return max(1, int(value or default)) except (TypeError, ValueError): return default MAX_PROXY_CONNECTIONS = parse_positive_int(os.environ.get("LOCAL_PROXY_MAX_CONNECTIONS"), 256) proxy_connection_sem = threading.BoundedSemaphore(MAX_PROXY_CONNECTIONS) def parse_int(value: Any) -> int: try: return int(value) except (TypeError, ValueError): return 0 def recv_exact(sock: socket.socket, size: int) -> bytes: data = b"" while len(data) < size: chunk = sock.recv(size - len(data)) if not chunk: raise ConnectionError("Unexpected disconnect.") data += chunk return data def parse_host_port(authority: str, default_port: int) -> tuple[str, int]: authority = authority.strip() if authority.startswith("["): host_part, sep, rest = authority.partition("]") host = host_part.lstrip("[") port = default_port if sep and rest.startswith(":"): port_text = rest[1:] port = parse_int(port_text) or default_port return host, port if authority.count(":") == 1: host, _, port_text = authority.rpartition(":") return host, parse_int(port_text) or default_port return authority, default_port def get_proxy_credentials() -> tuple[str | None, str | None]: user = os.environ.get("LOCAL_PROXY_USER") or os.environ.get("LOCAL_PROXY_USERNAME") password = os.environ.get("LOCAL_PROXY_PASS") or os.environ.get("LOCAL_PROXY_PASSWORD") if user is None and password is None: return None, None return user or "", password or "" def proxy_auth_enabled() -> bool: user, password = get_proxy_credentials() return user is not None and password is not None def parse_http_basic_auth(lines: list[str]) -> tuple[str | None, str | None]: for line in lines: name, sep, value = line.partition(":") if not sep or name.strip().lower() != "proxy-authorization": continue scheme, _, token = value.strip().partition(" ") if scheme.lower() != "basic" or not token: return None, None try: decoded = base64.b64decode(token, validate=True).decode("utf-8", errors="replace") except Exception: return None, None username, sep, password = decoded.partition(":") if not sep: return None, None return username, password return None, None def check_credentials(username: str | None, password: str | None) -> bool: expected_user, expected_pass = get_proxy_credentials() if expected_user is None or expected_pass is None: return True return secrets.compare_digest(username or "", expected_user) and secrets.compare_digest(password or "", expected_pass) def dns_query_over_tun0(host: str, qtype: int, dns_server: str, timeout: float) -> str | None: import random sock = None try: tx_id = random.getrandbits(16).to_bytes(2, "big") flags = b"\x01\x00" questions = b"\x00\x01" rrs = b"\x00\x00\x00\x00\x00\x00" qname = b"" for part in host.split("."): if not part: continue part_bytes = part.encode("idna") if len(part_bytes) > 63: return None qname += len(part_bytes).to_bytes(1, "big") + part_bytes qname += b"\x00" qtype_qclass = qtype.to_bytes(2, "big") + b"\x00\x01" packet = tx_id + flags + questions + rrs + qname + qtype_qclass sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, b"tun0") except OSError as e: if "operation not permitted" in str(e).lower() or e.errno == 1: print("[DNS 绑定失败] [错误代码 3006] DNS 解析绑定 tun0 权限不足,请确保程序以 root 权限运行!", flush=True) elif "no such device" in str(e).lower() or e.errno == 19: print("[DNS 绑定失败] [错误代码 3004] DNS 解析绑定 tun0 失败,网卡设备不存在,请检查 VPN 连接!", flush=True) return None sock.sendto(packet, (dns_server, 53)) resp, _ = sock.recvfrom(4096) except Exception: return None finally: if sock is not None: try: sock.close() except Exception: pass try: if len(resp) < 12 or resp[:2] != tx_id: return None rcode = resp[3] & 0x0F if rcode != 0: return None offset = 12 while offset < len(resp): length = resp[offset] if length == 0: offset += 1 break if (length & 0xC0) == 0xC0: offset += 2 break offset += 1 + length offset += 4 answers_count = int.from_bytes(resp[6:8], "big") for _ in range(answers_count): if offset >= len(resp): break while offset < len(resp): length = resp[offset] if length == 0: offset += 1 break if (length & 0xC0) == 0xC0: offset += 2 break offset += 1 + length if offset + 10 > len(resp): break atype = int.from_bytes(resp[offset : offset + 2], "big") aclass = int.from_bytes(resp[offset + 2 : offset + 4], "big") rdlength = int.from_bytes(resp[offset + 8 : offset + 10], "big") offset += 10 if offset + rdlength > len(resp): break record = resp[offset : offset + rdlength] if atype == qtype and aclass == 1: if qtype == 1 and rdlength == 4: return socket.inet_ntoa(record) if qtype == 28 and rdlength == 16: return socket.inet_ntop(socket.AF_INET6, record) offset += rdlength except Exception: return None return None def resolve_dns_over_tun0(host: str, dns_server: str = "8.8.8.8", timeout: float = 3.0) -> str | None: try: socket.inet_aton(host) return host except OSError: pass try: socket.inet_pton(socket.AF_INET6, host) return host except OSError: pass return dns_query_over_tun0(host, 1, dns_server, timeout) or dns_query_over_tun0(host, 28, dns_server, timeout) def create_connection(address: tuple[str, int], timeout: float = 20) -> socket.socket: host, port = address resolved_ip = resolve_dns_over_tun0(host) if resolved_ip: host = resolved_ip err = None for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res sock = None try: sock = socket.socket(af, socktype, proto) sock.settimeout(timeout) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, b"tun0") sock.connect(sa) return sock except OSError as e: err = e if "operation not permitted" in str(e).lower() or e.errno == 1: err = OSError(f"[错误代码 3006] [ERR_PROXY_BIND_TUN_PERM_DENIED] 绑定虚拟网卡 tun0 失败,权限不足!必须以 root 权限运行,或者进程缺少 CAP_NET_RAW 权限。") elif "no such device" in str(e).lower() or e.errno == 19: err = OSError(f"[错误代码 3004] [ERR_ROUTE_DEV_NOT_FOUND] 绑定虚拟网卡 tun0 失败,找不到设备!这通常是因为 OpenVPN 核心未能成功连接或已被异常终止。") if sock is not None: sock.close() if err is not None: raise err else: raise OSError("getaddrinfo returns empty list") def relay(left: socket.socket, right: socket.socket) -> None: sockets = [left, right] while True: readable, _, errored = select.select(sockets, [], sockets, 120) if errored or not readable: return for source in readable: target = right if source is left else left data = source.recv(65536) if not data: return target.sendall(data) def socks5_client(client: socket.socket, first_byte: bytes) -> None: upstream = None try: methods_count = recv_exact(client, 1)[0] methods = recv_exact(client, methods_count) if proxy_auth_enabled(): if 2 not in methods: client.sendall(b"\x05\xff") return client.sendall(b"\x05\x02") auth_version = recv_exact(client, 1)[0] if auth_version != 1: client.sendall(b"\x01\x01") return username = recv_exact(client, recv_exact(client, 1)[0]).decode("utf-8", errors="replace") password = recv_exact(client, recv_exact(client, 1)[0]).decode("utf-8", errors="replace") if not check_credentials(username, password): client.sendall(b"\x01\x01") return client.sendall(b"\x01\x00") else: client.sendall(b"\x05\x00") version, command, _, address_type = recv_exact(client, 4) if version != 5 or command != 1: client.sendall(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00") return if address_type == 1: host = socket.inet_ntoa(recv_exact(client, 4)) elif address_type == 3: host = recv_exact(client, recv_exact(client, 1)[0]).decode("idna") elif address_type == 4: host = socket.inet_ntop(socket.AF_INET6, recv_exact(client, 16)) else: client.sendall(b"\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00") return port = int.from_bytes(recv_exact(client, 2), "big") try: upstream = create_connection((host, port), timeout=20) except Exception as e: print(f"[SOCKS5 代理失败] 目标 {host}:{port} 连接失败: {e}", flush=True) try: client.sendall(b"\x05\x04\x00\x01\x00\x00\x00\x00\x00\x00") except OSError: pass raise client.sendall(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") relay(client, upstream) finally: client.close() if upstream: upstream.close() def read_http_header(client: socket.socket, first_byte: bytes) -> bytes: data = first_byte while b"\r\n\r\n" not in data and len(data) < 65536: chunk = client.recv(4096) if not chunk: break data += chunk return data def http_client(client: socket.socket, first_byte: bytes) -> None: upstream = None try: header = read_http_header(client, first_byte) if b"\r\n\r\n" not in header: client.sendall(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") return head, rest = header.split(b"\r\n\r\n", 1) lines = head.decode("iso-8859-1", errors="replace").split("\r\n") try: method, target, version = lines[0].split(" ", 2) except ValueError: client.sendall(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") return if not version.startswith("HTTP/"): client.sendall(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") return if proxy_auth_enabled(): username, password = parse_http_basic_auth(lines[1:]) if not check_credentials(username, password): client.sendall( b"HTTP/1.1 407 Proxy Authentication Required\r\n" b"Proxy-Authenticate: Basic realm=\"AimiliVPN Proxy\"\r\n" b"Content-Length: 0\r\n\r\n" ) return if method.upper() == "CONNECT": host, port = parse_host_port(target, 443) upstream = create_connection((host, port), timeout=20) client.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n") if rest: upstream.sendall(rest) relay(client, upstream) return try: parsed = urllib.parse.urlsplit(target) except ValueError: client.sendall(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") return hostname = parsed.hostname port = parsed.port scheme = parsed.scheme if not hostname: # Fallback to Host header for line in lines[1:]: if line.lower().startswith("host:"): host_val = line.split(":", 1)[1].strip() if "[" in host_val and "]" in host_val: host_part, _, port_part = host_val.rpartition("]") hostname = host_part.lstrip("[") if port_part.startswith(":"): p_val = port_part.lstrip(":") port = int(p_val) if p_val.isdigit() else None else: port = None else: hostname, parsed_port = parse_host_port(host_val, 0) port = parsed_port or None break if not hostname: client.sendall(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n") return port = port or (443 if scheme == "https" else 80) path = urllib.parse.urlunsplit(("", "", parsed.path or "/", parsed.query, "")) headers = [line for line in lines[1:] if not line.lower().startswith(("proxy-connection:", "connection:", "proxy-authorization:"))] request = f"{method} {path} {version}\r\n" + "\r\n".join(headers) + "\r\nConnection: close\r\n\r\n" upstream = create_connection((hostname, port), timeout=20) upstream.sendall(request.encode("iso-8859-1") + rest) relay(client, upstream) except Exception as e: print(f"[HTTP 代理失败] 代理请求目标连接失败: {e}", flush=True) try: client.sendall(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\n\r\n") except OSError: pass finally: client.close() if upstream: upstream.close() def proxy_client(client: socket.socket, address: tuple[str, int]) -> None: try: client.settimeout(30) first = recv_exact(client, 1) if first == b"\x05": socks5_client(client, first) else: http_client(client, first) except Exception as e: err_msg = str(e) if "[错误代码" in err_msg: print(f"[代理客户端连接失败] 客户端 {address} 遭遇系统性阻碍: {err_msg}", flush=True) try: client.close() except OSError: pass def start_proxy_server(host: str, port: int) -> None: is_ipv6 = ":" in host or host == "" af = socket.AF_INET6 if is_ipv6 else socket.AF_INET server = None try: server = socket.socket(af, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if is_ipv6: try: server.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except OSError: pass server.bind((host, port)) server.listen(256) print(f"HTTP/SOCKS5 proxy listening on {host}:{port}", flush=True) except Exception as e: if server is not None: try: server.close() except Exception: pass if is_ipv6 and host in ("::", ""): print(f"[警告] 绑定 IPv6 {host}:{port} 失败 ({e}),正在尝试回退至 IPv4 0.0.0.0 ...", flush=True) try: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(("0.0.0.0", port)) server.listen(256) print(f"HTTP/SOCKS5 proxy listening on 0.0.0.0:{port} (仅 IPv4)", flush=True) except Exception as ex: import vpn_utils diag = vpn_utils.diagnose_local_obstructions(port, host="0.0.0.0") diag_msg = diag[1] if diag else str(ex) print(f"[ERROR] Failed to start HTTP/SOCKS5 proxy on 0.0.0.0:{port}: {diag_msg}", flush=True) return elif is_ipv6 and host == "::1": print(f"[警告] 绑定 IPv6 {host}:{port} 失败 ({e}),正在尝试回退至 IPv4 127.0.0.1 ...", flush=True) try: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(("127.0.0.1", port)) server.listen(256) print(f"HTTP/SOCKS5 proxy listening on 127.0.0.1:{port} (仅 IPv4)", flush=True) except Exception as ex: import vpn_utils diag = vpn_utils.diagnose_local_obstructions(port, host="127.0.0.1") diag_msg = diag[1] if diag else str(ex) print(f"[ERROR] Failed to start HTTP/SOCKS5 proxy on 127.0.0.1:{port}: {diag_msg}", flush=True) return else: import vpn_utils diag = vpn_utils.diagnose_local_obstructions(port, host=host) diag_msg = diag[1] if diag else str(e) print(f"[ERROR] Failed to start HTTP/SOCKS5 proxy on {host}:{port}: {diag_msg}", flush=True) return while True: try: client, address = server.accept() if not proxy_connection_sem.acquire(blocking=False): print(f"[代理限流] 当前连接数已达到上限 {MAX_PROXY_CONNECTIONS},拒绝客户端 {address}", flush=True) try: client.close() except OSError: pass continue def run_client() -> None: try: proxy_client(client, address) finally: proxy_connection_sem.release() threading.Thread(target=run_client, daemon=True).start() except Exception as e: print(f"[ERROR] Proxy accept failed: {e}", flush=True) time.sleep(0.5)