From df75f42753470d04ab7c9ed6630d7681d6a7a5b7 Mon Sep 17 00:00:00 2001 From: DDSRem <73049927+DDSRem@users.noreply.github.com> Date: Thu, 4 Jun 2026 06:55:03 +0800 Subject: [PATCH] fix: retry stale keep-alive requests (#5893) --- app/testing/bootstrap.py | 2 +- app/utils/http.py | 37 ++++++++++++- tests/test_request_utils.py | 100 ++++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 tests/test_request_utils.py diff --git a/app/testing/bootstrap.py b/app/testing/bootstrap.py index 79dc81cd..79e830fc 100644 --- a/app/testing/bootstrap.py +++ b/app/testing/bootstrap.py @@ -81,7 +81,7 @@ def ensure_sites_stub() -> None: return try: import app.helper.sites # noqa: F401 本地已拉取时用真实模块 - except ModuleNotFoundError: + except (ModuleNotFoundError, ImportError): from types import ModuleType stub = ModuleType("app.helper.sites") stub.SitesHelper = object diff --git a/app/utils/http.py b/app/utils/http.py index c2ce5323..f9a9a14a 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -70,6 +70,8 @@ _DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 _DEFAULT_MAX_CONNECTIONS = 40 # 默认的 keep-alive 连接过期时间(秒) _DEFAULT_KEEPALIVE_EXPIRY = 30 +# 同步 requests.Session 复用连接时,遇到对端或代理关闭 keep-alive 后允许重试的方法 +_REQUESTS_RETRY_IDEMPOTENT_METHODS = ("GET", "HEAD", "OPTIONS") # 持有 LRU 淘汰后正在异步关闭的 transport task,避免 fire-and-forget 被 GC 警告 _pending_eviction_tasks: set[asyncio.Task] = set() @@ -344,14 +346,47 @@ class RequestUtils: kwargs.setdefault("timeout", self._timeout) kwargs.setdefault("verify", False) kwargs.setdefault("stream", False) + method_upper = method.upper() try: return req_method(method, url, **kwargs) + except ( + requests.exceptions.ConnectionError, + requests.exceptions.ChunkedEncodingError, + requests.exceptions.ReadTimeout, + ) as e: + if ( + self._session is not None + and method_upper in _REQUESTS_RETRY_IDEMPOTENT_METHODS + ): + logger.debug(f"keep-alive 连接已失效,同步幂等请求重试一次: {e!r}") + try: + self._session.close() + return req_method(method, url, **kwargs) + except requests.exceptions.RequestException as retry_error: + error_msg = ( + str(retry_error) + if str(retry_error) + else f"未知网络错误 (URL: {url}, Method: {method_upper})" + ) + logger.debug(f"重试后同步请求仍失败: {error_msg}") + if raise_exception: + raise + return None + error_msg = ( + str(e) + if str(e) + else f"未知网络错误 (URL: {url}, Method: {method_upper})" + ) + logger.debug(f"同步请求失败(不重试): {error_msg}") + if raise_exception: + raise + return None except requests.exceptions.RequestException as e: # 获取更详细的错误信息 error_msg = ( str(e) if str(e) - else f"未知网络错误 (URL: {url}, Method: {method.upper()})" + else f"未知网络错误 (URL: {url}, Method: {method_upper})" ) logger.debug(f"请求失败: {error_msg}") if raise_exception: diff --git a/tests/test_request_utils.py b/tests/test_request_utils.py new file mode 100644 index 00000000..6db406b6 --- /dev/null +++ b/tests/test_request_utils.py @@ -0,0 +1,100 @@ +import requests + +from app.utils.http import RequestUtils + + +class _FakeSession: + """ + 测试用 requests.Session 替身,记录请求次数与连接池关闭行为。 + """ + + def __init__(self, side_effects): + """ + 初始化请求结果序列。 + + :param side_effects: 每次 request 调用要返回或抛出的对象 + """ + self.side_effects = list(side_effects) + self.calls = [] + self.close_count = 0 + + def request(self, method, url, **kwargs): + """ + 模拟 requests.Session.request。 + """ + self.calls.append((method, url, kwargs)) + effect = self.side_effects.pop(0) + if isinstance(effect, Exception): + raise effect + return effect + + def close(self): + """ + 模拟清空 session 连接池。 + """ + self.close_count += 1 + + +def _make_response(status_code: int = 200) -> requests.Response: + response = requests.Response() + response.status_code = status_code + return response + + +def test_request_utils_retries_idempotent_session_connection_error(): + """ + 同步幂等请求遇到失效 session 连接时应清理连接池并重试一次。 + """ + response = _make_response() + session = _FakeSession( + [ + requests.exceptions.ConnectionError("stale keep-alive"), + response, + ] + ) + request_utils = RequestUtils(session=session) + + result = request_utils.get_res("https://example.com/data") + + assert result is response + assert len(session.calls) == 2 + assert session.close_count == 1 + + +def test_request_utils_does_not_retry_non_idempotent_connection_error(): + """ + 非幂等请求连接异常时不应自动重试,避免重复提交副作用。 + """ + session = _FakeSession( + [ + requests.exceptions.ConnectionError("connection failed"), + _make_response(), + ] + ) + request_utils = RequestUtils(session=session) + + result = request_utils.post_res("https://example.com/data", data={"name": "demo"}) + + assert result is None + assert len(session.calls) == 1 + assert session.close_count == 0 + + +def test_request_utils_raises_retry_error_when_retry_still_fails(): + """ + 开启 raise_exception 后,重试仍失败时应抛出重试阶段的异常。 + """ + first_error = requests.exceptions.ConnectionError("stale keep-alive") + retry_error = requests.exceptions.ConnectionError("proxy still unavailable") + session = _FakeSession([first_error, retry_error]) + request_utils = RequestUtils(session=session) + + try: + request_utils.get_res("https://example.com/data", raise_exception=True) + except requests.exceptions.ConnectionError as err: + assert err is retry_error + else: + raise AssertionError("请求重试失败时应抛出异常") + + assert len(session.calls) == 2 + assert session.close_count == 1