mirror of
https://github.com/snailyp/gemini-balance.git
synced 2026-05-06 20:32:47 +08:00
219 lines
7.2 KiB
Python
219 lines
7.2 KiB
Python
"""
|
|
Proxy detection service module
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
from pydantic import BaseModel
|
|
|
|
from app.log.logger import get_config_routes_logger
|
|
|
|
logger = get_config_routes_logger()
|
|
|
|
|
|
class ProxyCheckResult(BaseModel):
|
|
"""Proxy check result model"""
|
|
proxy: str
|
|
is_available: bool
|
|
response_time: Optional[float] = None
|
|
error_message: Optional[str] = None
|
|
checked_at: float
|
|
|
|
|
|
class ProxyCheckService:
|
|
"""Proxy detection service class"""
|
|
|
|
# Target URL for checking
|
|
CHECK_URL = "https://www.google.com"
|
|
# Timeout in seconds
|
|
TIMEOUT_SECONDS = 10
|
|
# Cache duration in seconds
|
|
CACHE_DURATION = 10 # 10s
|
|
|
|
def __init__(self):
|
|
self._cache: Dict[str, ProxyCheckResult] = {}
|
|
|
|
def _is_valid_proxy_format(self, proxy: str) -> bool:
|
|
"""Validate proxy format"""
|
|
try:
|
|
parsed = urlparse(proxy)
|
|
return parsed.scheme in ['http', 'https', 'socks5'] and parsed.hostname
|
|
except Exception:
|
|
return False
|
|
|
|
def _get_cached_result(self, proxy: str) -> Optional[ProxyCheckResult]:
|
|
"""Get cached check result"""
|
|
if proxy in self._cache:
|
|
result = self._cache[proxy]
|
|
# Check if cache is expired
|
|
if time.time() - result.checked_at < self.CACHE_DURATION:
|
|
logger.debug(f"Using cached proxy check result: {proxy}")
|
|
return result
|
|
else:
|
|
# Remove expired cache
|
|
del self._cache[proxy]
|
|
return None
|
|
|
|
def _cache_result(self, result: ProxyCheckResult) -> None:
|
|
"""Cache check result"""
|
|
self._cache[result.proxy] = result
|
|
|
|
async def check_single_proxy(self, proxy: str, use_cache: bool = True) -> ProxyCheckResult:
|
|
"""
|
|
Check if a single proxy is available
|
|
|
|
Args:
|
|
proxy: Proxy address in format like http://host:port or socks5://host:port
|
|
use_cache: Whether to use cached results
|
|
|
|
Returns:
|
|
ProxyCheckResult: Check result
|
|
"""
|
|
# Check cache first
|
|
if use_cache:
|
|
cached = self._get_cached_result(proxy)
|
|
if cached:
|
|
return cached
|
|
|
|
# Validate proxy format
|
|
if not self._is_valid_proxy_format(proxy):
|
|
result = ProxyCheckResult(
|
|
proxy=proxy,
|
|
is_available=False,
|
|
error_message="Invalid proxy format",
|
|
checked_at=time.time()
|
|
)
|
|
self._cache_result(result)
|
|
return result
|
|
|
|
# Perform check
|
|
start_time = time.time()
|
|
try:
|
|
logger.info(f"Starting proxy check: {proxy}")
|
|
|
|
timeout = httpx.Timeout(self.TIMEOUT_SECONDS, read=self.TIMEOUT_SECONDS)
|
|
async with httpx.AsyncClient(timeout=timeout, proxy=proxy) as client:
|
|
response = await client.head(self.CHECK_URL)
|
|
|
|
response_time = time.time() - start_time
|
|
|
|
# Check response status
|
|
is_available = response.status_code in [200, 204, 301, 302, 307, 308]
|
|
|
|
result = ProxyCheckResult(
|
|
proxy=proxy,
|
|
is_available=is_available,
|
|
response_time=round(response_time, 3),
|
|
error_message=None if is_available else f"HTTP {response.status_code}",
|
|
checked_at=time.time()
|
|
)
|
|
|
|
logger.info(f"Proxy check completed: {proxy}, available: {is_available}, response_time: {response_time:.3f}s")
|
|
|
|
except asyncio.TimeoutError:
|
|
result = ProxyCheckResult(
|
|
proxy=proxy,
|
|
is_available=False,
|
|
error_message="Connection timeout",
|
|
checked_at=time.time()
|
|
)
|
|
logger.warning(f"Proxy check timeout: {proxy}")
|
|
|
|
except Exception as e:
|
|
result = ProxyCheckResult(
|
|
proxy=proxy,
|
|
is_available=False,
|
|
error_message=str(e),
|
|
checked_at=time.time()
|
|
)
|
|
logger.error(f"Proxy check failed: {proxy}, error: {str(e)}")
|
|
|
|
# Cache result
|
|
self._cache_result(result)
|
|
return result
|
|
|
|
async def check_multiple_proxies(
|
|
self,
|
|
proxies: List[str],
|
|
use_cache: bool = True,
|
|
max_concurrent: int = 5
|
|
) -> List[ProxyCheckResult]:
|
|
"""
|
|
Check multiple proxies concurrently
|
|
|
|
Args:
|
|
proxies: List of proxy addresses
|
|
use_cache: Whether to use cached results
|
|
max_concurrent: Maximum concurrent check count
|
|
|
|
Returns:
|
|
List[ProxyCheckResult]: List of check results
|
|
"""
|
|
if not proxies:
|
|
return []
|
|
|
|
logger.info(f"Starting batch proxy check for {len(proxies)} proxies")
|
|
|
|
# Use semaphore to limit concurrency
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def check_with_semaphore(proxy: str) -> ProxyCheckResult:
|
|
async with semaphore:
|
|
return await self.check_single_proxy(proxy, use_cache)
|
|
|
|
# Execute checks concurrently
|
|
tasks = [check_with_semaphore(proxy) for proxy in proxies]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Handle exception results
|
|
final_results = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
logger.error(f"Proxy check task exception: {proxies[i]}, error: {str(result)}")
|
|
final_results.append(ProxyCheckResult(
|
|
proxy=proxies[i],
|
|
is_available=False,
|
|
error_message=f"Check task exception: {str(result)}",
|
|
checked_at=time.time()
|
|
))
|
|
else:
|
|
final_results.append(result)
|
|
|
|
available_count = sum(1 for r in final_results if r.is_available)
|
|
logger.info(f"Batch proxy check completed: {available_count}/{len(proxies)} proxies available")
|
|
|
|
return final_results
|
|
|
|
def get_cache_stats(self) -> Dict[str, int]:
|
|
"""Get cache statistics"""
|
|
current_time = time.time()
|
|
valid_cache_count = sum(
|
|
1 for result in self._cache.values()
|
|
if current_time - result.checked_at < self.CACHE_DURATION
|
|
)
|
|
|
|
return {
|
|
"total_cached": len(self._cache),
|
|
"valid_cached": valid_cache_count,
|
|
"expired_cached": len(self._cache) - valid_cache_count
|
|
}
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear all cache"""
|
|
self._cache.clear()
|
|
logger.info("Proxy check cache cleared")
|
|
|
|
|
|
# Global instance
|
|
_proxy_check_service: Optional[ProxyCheckService] = None
|
|
|
|
|
|
def get_proxy_check_service() -> ProxyCheckService:
|
|
"""Get proxy check service instance"""
|
|
global _proxy_check_service
|
|
if _proxy_check_service is None:
|
|
_proxy_check_service = ProxyCheckService()
|
|
return _proxy_check_service |