diff --git a/app/modules/themoviedb/tmdbapi.py b/app/modules/themoviedb/tmdbapi.py index 5f78e5a8..101a0e94 100644 --- a/app/modules/themoviedb/tmdbapi.py +++ b/app/modules/themoviedb/tmdbapi.py @@ -2222,4 +2222,17 @@ class TmdbApi: """ 关闭连接 """ - self.tmdb.close() + for client in ( + self.tmdb, + self.search, + self.movie, + self.tv, + self.season_obj, + self.episode_obj, + self.discover, + self.trending, + self.person, + self.collection, + ): + if client: + client.close() diff --git a/app/modules/themoviedb/tmdbv3api/tmdb.py b/app/modules/themoviedb/tmdbv3api/tmdb.py index a632325c..078216c8 100644 --- a/app/modules/themoviedb/tmdbv3api/tmdb.py +++ b/app/modules/themoviedb/tmdbv3api/tmdb.py @@ -32,16 +32,44 @@ class TMDb(object): self._total_results = None self._total_pages = None - if not self._session: - self._session = requests.Session() - self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session, proxies=self.proxies) + self._owns_session = session is None + self._init_session(session=session) - self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT, proxies=self.proxies) + # TMDB 在部分代理和运营商链路下的 HTTP/2 长连接偶发卡死,识别路径优先保证稳定性。 + self._async_req = AsyncRequestUtils( + ua=settings.NORMAL_USER_AGENT, + proxies=self.proxies, + http2=False, + ) self._remaining = 40 self._reset = None self._timeout = 15 + def _init_session(self, session=None): + """ + 初始化同步请求会话。 + """ + self._session = session or requests.Session() + self._req = RequestUtils( + ua=settings.NORMAL_USER_AGENT, + session=self._session, + proxies=self.proxies, + ) + + def _reset_owned_session(self): + """ + 重建当前实例自持有的同步请求会话。 + """ + if not self._owns_session: + return + if self._session: + try: + self._session.close() + except Exception as err: + logger.debug(f"关闭TMDB同步会话失败:{err!r}") + self._init_session() + @property def page(self): return self._page @@ -110,14 +138,23 @@ class TMDb(object): @cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True) def request(self, method, url, data, json, **kwargs): - if method == "GET": - req = self._req.get_res(url, params=data, json=json) - else: - req = self._req.post_res(url, data=data, json=json) + req = self._request_once(method, url, data, json) + if req is None and method == "GET" and self._owns_session: + logger.debug("TMDB同步请求失败,重建会话后重试一次") + self._reset_owned_session() + req = self._request_once(method, url, data, json) if req is None: raise TMDbException("无法连接TheMovieDb,请检查网络连接!") return self._snapshot_response(req) + def _request_once(self, method, url, data, json): + """ + 执行一次TMDB同步请求,调用方负责决定是否重建会话后重试。 + """ + if method == "GET": + return self._req.get_res(url, params=data, json=json) + return self._req.post_res(url, data=data, json=json) + @cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True) async def async_request(self, method, url, data, json, **kwargs): if method == "GET": diff --git a/tests/test_tmdb_response_cache.py b/tests/test_tmdb_response_cache.py index 383adfe4..d316f082 100644 --- a/tests/test_tmdb_response_cache.py +++ b/tests/test_tmdb_response_cache.py @@ -1,14 +1,24 @@ import asyncio import pickle from threading import RLock -from unittest import TestCase +from unittest.mock import Mock, patch -from app.modules.themoviedb.tmdbv3api.tmdb import TMDb +import pytest + +from app.modules.themoviedb.tmdbapi import TmdbApi from app.modules.themoviedb.tmdbv3api.exceptions import TMDbException +from app.modules.themoviedb.tmdbv3api.tmdb import TMDb class _FakeResponse: + """ + 测试用响应对象,模拟 requests/httpx 响应的最小接口。 + """ + def __init__(self, payload, headers: dict, status_code: int = 200, text: str = ""): + """ + 初始化响应内容。 + """ self._payload = payload self.headers = headers self.status_code = status_code @@ -16,6 +26,9 @@ class _FakeResponse: self._lock = RLock() def json(self): + """ + 返回预置JSON内容。 + """ return self._payload @@ -40,199 +53,302 @@ class _UnicodeDecodeErrorResponse: raise UnicodeDecodeError("utf-8", b"\x8b", 1, 2, "invalid start byte") -class TmdbResponseCacheTest(TestCase): - def test_request_returns_pickleable_snapshot(self): - tmdb = TMDb() - response = _FakeResponse( - payload={"id": 1, "page": 2}, - headers={"X-RateLimit-Remaining": "39", "X-RateLimit-Reset": "1234567890"}, - ) - tmdb._req.get_res = lambda *args, **kwargs: response +def test_request_returns_pickleable_snapshot(): + """ + TMDB同步响应应转换为可序列化快照。 + """ + tmdb = TMDb() + response = _FakeResponse( + payload={"id": 1, "page": 2}, + headers={"X-RateLimit-Remaining": "39", "X-RateLimit-Reset": "1234567890"}, + ) + tmdb._req.get_res = lambda *args, **kwargs: response - result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) + result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) - self.assertTrue(result[TMDb._RESPONSE_SNAPSHOT_MARKER]) - self.assertEqual(result["json"], {"id": 1, "page": 2}) - self.assertEqual(result["headers"]["X-RateLimit-Remaining"], "39") - pickle.dumps(result) + assert result[TMDb._RESPONSE_SNAPSHOT_MARKER] + assert result["json"] == {"id": 1, "page": 2} + assert result["headers"]["X-RateLimit-Remaining"] == "39" + pickle.dumps(result) - def test_request_rejects_scalar_json_response(self): + +def test_request_rebuilds_owned_session_after_connection_failure(): + """ + 自持有Session的TMDB同步请求失败后,应重建会话并重试一次。 + """ + tmdb = TMDb() + response = _FakeResponse(payload={"id": 1}, headers={}) + request_results = [None, response] + sessions = [] + requests = [] + + def _fake_init_session(session=None): """ - 标量JSON响应不应进入TMDB响应缓存,避免后续按对象解析崩溃。 + 构造可观测的假Session和RequestUtils。 """ - tmdb = TMDb() - response = _FakeResponse(payload="upstream error", headers={}) - tmdb._req.get_res = lambda *args, **kwargs: response + fake_session = Mock() + fake_request = Mock() + fake_request.get_res.side_effect = lambda *args, **kwargs: request_results.pop(0) + tmdb._session = fake_session + tmdb._req = fake_request + sessions.append(fake_session) + requests.append(fake_request) - with self.assertRaisesRegex(TMDbException, "返回数据格式异常"): - TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) + tmdb._init_session = _fake_init_session + tmdb._init_session() + tmdb._request_once = lambda method, url, data, json: tmdb._req.get_res(url) - def test_request_rejects_invalid_json_response(self): - """ - 非JSON响应应转换为TMDbException,调用方可按连接异常统一处理。 - """ - class _InvalidJsonResponse: - headers = {"Content-Type": "text/html"} - status_code = 502 - text = "bad gateway" + result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) - def json(self): - """ - 模拟上游返回无法解析为JSON的响应体。 - """ - raise ValueError("invalid json") + assert result["json"] == {"id": 1} + assert len(sessions) == 2 + sessions[0].close.assert_called_once_with() + assert requests[0].get_res.call_count == 1 + assert requests[1].get_res.call_count == 1 - tmdb = TMDb() - tmdb._req.get_res = lambda *args, **kwargs: _InvalidJsonResponse() - with self.assertRaisesRegex(TMDbException, "不是有效JSON.*HTTP状态码:502.*bad gateway"): - TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) +def test_async_request_utils_disables_http2_for_tmdb(): + """ + TMDB异步请求客户端应关闭HTTP/2,避免共享h2长连接异常影响媒体识别。 + """ + with patch("app.modules.themoviedb.tmdbv3api.tmdb.AsyncRequestUtils") as async_request_utils: + TMDb() - def test_request_rejects_unicode_decode_error_response(self): - """ - 错误编码的响应体也应转换为TMDbException,避免UnicodeDecodeError直接冒泡。 - """ - tmdb = TMDb() - tmdb._req.get_res = lambda *args, **kwargs: _UnicodeDecodeErrorResponse( - text="乱码内容不应进入日志" - ) + assert async_request_utils.call_args.kwargs["http2"] is False - with self.assertRaisesRegex( - TMDbException, - "不是有效JSON.*Content-Encoding:gzip.*响应内容编码异常,已省略原始内容", - ) as cm: - TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) - self.assertNotIn("乱码内容", str(cm.exception)) - def test_get_response_json_rejects_invalid_live_response(self): - """ - 未缓存的实时响应解析失败时也应输出统一诊断信息。 - """ - class _InvalidJsonResponse: - headers = {} - status_code = 200 - text = "" +def test_request_rejects_scalar_json_response(): + """ + 标量JSON响应不应进入TMDB响应缓存,避免后续按对象解析崩溃。 + """ + tmdb = TMDb() + response = _FakeResponse(payload="upstream error", headers={}) + tmdb._req.get_res = lambda *args, **kwargs: response - def json(self): - """ - 模拟HTTP 200但响应体为空的情况。 - """ - raise ValueError("empty") + with pytest.raises(TMDbException, match="返回数据格式异常"): + TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) - with self.assertRaisesRegex(TMDbException, "不是有效JSON.*响应内容为空"): - TMDb._get_response_json(_InvalidJsonResponse()) - def test_async_request_returns_pickleable_snapshot(self): - tmdb = TMDb() - response = _FakeResponse( - payload={"id": 2, "page": 3}, - headers={"x-ratelimit-remaining": "38", "x-ratelimit-reset": "1234567891"}, - ) +def test_request_rejects_invalid_json_response(): + """ + 非JSON响应应转换为TMDbException,调用方可按连接异常统一处理。 + """ - async def _fake_get_res(*args, **kwargs): - return response + class _InvalidJsonResponse: + headers = {"Content-Type": "text/html"} + status_code = 502 + text = "bad gateway" - tmdb._async_req.get_res = _fake_get_res - - result = asyncio.run( - TMDb.async_request.__wrapped__(tmdb, "GET", "https://example.com", None, None) - ) - - self.assertTrue(result[TMDb._RESPONSE_SNAPSHOT_MARKER]) - self.assertEqual(result["json"], {"id": 2, "page": 3}) - self.assertEqual(result["headers"]["x-ratelimit-remaining"], "38") - pickle.dumps(result) - - def test_handle_headers_accepts_snapshot_headers(self): - tmdb = TMDb() - - tmdb._handle_headers({"x-ratelimit-remaining": "7", "x-ratelimit-reset": "99"}) - - self.assertEqual(tmdb._remaining, 7) - self.assertEqual(tmdb._reset, 99) - - def test_get_response_json_returns_snapshot_copy(self): - snapshot = { - TMDb._RESPONSE_SNAPSHOT_MARKER: True, - "headers": {}, - "json": { - "results": [ - {"id": 1, "media_type": "movie"}, - {"id": 2, "media_type": "tv"}, - ] - }, - } - - first_json = TMDb._get_response_json(snapshot) - first_json["results"][0]["media_type"] = "电影" - - second_json = TMDb._get_response_json(snapshot) - - self.assertEqual(second_json["results"][0]["media_type"], "movie") - self.assertIsNot(first_json, second_json) - self.assertIsNot(first_json["results"][0], second_json["results"][0]) - - def test_async_request_obj_returns_copied_key_from_snapshot(self): - tmdb = TMDb() - snapshot = { - TMDb._RESPONSE_SNAPSHOT_MARKER: True, - "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, - "json": { - "page": 1, - "results": [ - {"id": 1, "media_type": "movie"}, - {"id": 2, "media_type": "tv"}, - ], - }, - } - - async def _fake_async_request(*args, **kwargs): - return snapshot - - tmdb.async_request = _fake_async_request - - first_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results")) - first_results[0]["media_type"] = "电影" - - second_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results")) - - self.assertEqual(second_results[0]["media_type"], "movie") - self.assertIsNot(first_results, second_results) - self.assertIsNot(first_results[0], second_results[0]) - - def test_request_obj_rejects_scalar_snapshot_before_key_lookup(self): - """ - 旧缓存中的标量快照不应在读取results字段时触发AttributeError。 - """ - tmdb = TMDb() - snapshot = { - TMDb._RESPONSE_SNAPSHOT_MARKER: True, - "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, - "json": "upstream error", - } - tmdb.request = lambda *args, **kwargs: snapshot - - with self.assertRaisesRegex(TMDbException, "返回数据格式异常"): - tmdb._request_obj("/search/movie", key="results") - - def test_async_request_obj_rejects_scalar_snapshot_before_key_lookup(self): - """ - 异步对象请求读取旧标量快照时也应走统一TMDB异常路径。 - """ - tmdb = TMDb() - snapshot = { - TMDb._RESPONSE_SNAPSHOT_MARKER: True, - "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, - "json": "upstream error", - } - - async def _fake_async_request(*args, **kwargs): + def json(self): """ - 模拟异步请求命中已缓存的异常快照。 + 模拟上游返回无法解析为JSON的响应体。 """ - return snapshot + raise ValueError("invalid json") - tmdb.async_request = _fake_async_request + tmdb = TMDb() + tmdb._req.get_res = lambda *args, **kwargs: _InvalidJsonResponse() - with self.assertRaisesRegex(TMDbException, "返回数据格式异常"): - asyncio.run(tmdb._async_request_obj("/search/movie", key="results")) + with pytest.raises(TMDbException, match="不是有效JSON.*HTTP状态码:502.*bad gateway"): + TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) + + +def test_request_rejects_unicode_decode_error_response(): + """ + 错误编码的响应体也应转换为TMDbException,避免UnicodeDecodeError直接冒泡。 + """ + tmdb = TMDb() + tmdb._req.get_res = lambda *args, **kwargs: _UnicodeDecodeErrorResponse( + text="乱码内容不应进入日志" + ) + + with pytest.raises( + TMDbException, + match="不是有效JSON.*Content-Encoding:gzip.*响应内容编码异常,已省略原始内容", + ) as exc_info: + TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None) + assert "乱码内容" not in str(exc_info.value) + + +def test_get_response_json_rejects_invalid_live_response(): + """ + 未缓存的实时响应解析失败时也应输出统一诊断信息。 + """ + + class _InvalidJsonResponse: + headers = {} + status_code = 200 + text = "" + + def json(self): + """ + 模拟HTTP 200但响应体为空的情况。 + """ + raise ValueError("empty") + + with pytest.raises(TMDbException, match="不是有效JSON.*响应内容为空"): + TMDb._get_response_json(_InvalidJsonResponse()) + + +def test_async_request_returns_pickleable_snapshot(): + """ + TMDB异步响应应转换为可序列化快照。 + """ + tmdb = TMDb() + response = _FakeResponse( + payload={"id": 2, "page": 3}, + headers={"x-ratelimit-remaining": "38", "x-ratelimit-reset": "1234567891"}, + ) + + async def _fake_get_res(*args, **kwargs): + """ + 返回预置异步响应。 + """ + return response + + tmdb._async_req.get_res = _fake_get_res + + result = asyncio.run( + TMDb.async_request.__wrapped__(tmdb, "GET", "https://example.com", None, None) + ) + + assert result[TMDb._RESPONSE_SNAPSHOT_MARKER] + assert result["json"] == {"id": 2, "page": 3} + assert result["headers"]["x-ratelimit-remaining"] == "38" + pickle.dumps(result) + + +def test_handle_headers_accepts_snapshot_headers(): + """ + 快照响应头应能参与限流信息解析。 + """ + tmdb = TMDb() + + tmdb._handle_headers({"x-ratelimit-remaining": "7", "x-ratelimit-reset": "99"}) + + assert tmdb._remaining == 7 + assert tmdb._reset == 99 + + +def test_get_response_json_returns_snapshot_copy(): + """ + 缓存快照读取时应返回副本,避免调用方原地修改污染缓存。 + """ + snapshot = { + TMDb._RESPONSE_SNAPSHOT_MARKER: True, + "headers": {}, + "json": { + "results": [ + {"id": 1, "media_type": "movie"}, + {"id": 2, "media_type": "tv"}, + ] + }, + } + + first_json = TMDb._get_response_json(snapshot) + first_json["results"][0]["media_type"] = "电影" + + second_json = TMDb._get_response_json(snapshot) + + assert second_json["results"][0]["media_type"] == "movie" + assert first_json is not second_json + assert first_json["results"][0] is not second_json["results"][0] + + +def test_async_request_obj_returns_copied_key_from_snapshot(): + """ + 异步对象请求从快照取子字段时也应返回副本。 + """ + tmdb = TMDb() + snapshot = { + TMDb._RESPONSE_SNAPSHOT_MARKER: True, + "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, + "json": { + "page": 1, + "results": [ + {"id": 1, "media_type": "movie"}, + {"id": 2, "media_type": "tv"}, + ], + }, + } + + async def _fake_async_request(*args, **kwargs): + """ + 返回预置异步快照。 + """ + return snapshot + + tmdb.async_request = _fake_async_request + + first_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results")) + first_results[0]["media_type"] = "电影" + + second_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results")) + + assert second_results[0]["media_type"] == "movie" + assert first_results is not second_results + assert first_results[0] is not second_results[0] + + +def test_request_obj_rejects_scalar_snapshot_before_key_lookup(): + """ + 旧缓存中的标量快照不应在读取results字段时触发AttributeError。 + """ + tmdb = TMDb() + snapshot = { + TMDb._RESPONSE_SNAPSHOT_MARKER: True, + "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, + "json": "upstream error", + } + tmdb.request = lambda *args, **kwargs: snapshot + + with pytest.raises(TMDbException, match="返回数据格式异常"): + tmdb._request_obj("/search/movie", key="results") + + +def test_async_request_obj_rejects_scalar_snapshot_before_key_lookup(): + """ + 异步对象请求读取旧标量快照时也应走统一TMDB异常路径。 + """ + tmdb = TMDb() + snapshot = { + TMDb._RESPONSE_SNAPSHOT_MARKER: True, + "headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"}, + "json": "upstream error", + } + + async def _fake_async_request(*args, **kwargs): + """ + 模拟异步请求命中已缓存的异常快照。 + """ + return snapshot + + tmdb.async_request = _fake_async_request + + with pytest.raises(TMDbException, match="返回数据格式异常"): + asyncio.run(tmdb._async_request_obj("/search/movie", key="results")) + + +def test_tmdb_api_close_closes_all_clients(): + """ + TmdbApi关闭时应释放所有子客户端的同步连接池。 + """ + tmdb_api = TmdbApi() + clients = [ + tmdb_api.tmdb, + tmdb_api.search, + tmdb_api.movie, + tmdb_api.tv, + tmdb_api.season_obj, + tmdb_api.episode_obj, + tmdb_api.discover, + tmdb_api.trending, + tmdb_api.person, + tmdb_api.collection, + ] + for client in clients: + client.close = Mock() + + tmdb_api.close() + + for client in clients: + client.close.assert_called_once_with() diff --git a/version.py b/version.py index 5242b50f..3a1dbe06 100644 --- a/version.py +++ b/version.py @@ -1,2 +1,2 @@ -APP_VERSION = 'v2.13.6' -FRONTEND_VERSION = 'v2.13.6' +APP_VERSION = 'v2.13.7' +FRONTEND_VERSION = 'v2.13.7'