fix(tmdb): stabilize tmdb connection reuse

This commit is contained in:
jxxghp
2026-06-11 12:48:32 +08:00
parent c18e145b90
commit d26225b998
4 changed files with 353 additions and 187 deletions

View File

@@ -2222,4 +2222,17 @@ class TmdbApi:
"""
关闭连接
"""
self.tmdb.close()
for client in (
self.tmdb,
self.search,
self.movie,
self.tv,
self.season_obj,
self.episode_obj,
self.discover,
self.trending,
self.person,
self.collection,
):
if client:
client.close()

View File

@@ -32,16 +32,44 @@ class TMDb(object):
self._total_results = None
self._total_pages = None
if not self._session:
self._session = requests.Session()
self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session, proxies=self.proxies)
self._owns_session = session is None
self._init_session(session=session)
self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT, proxies=self.proxies)
# TMDB 在部分代理和运营商链路下的 HTTP/2 长连接偶发卡死,识别路径优先保证稳定性。
self._async_req = AsyncRequestUtils(
ua=settings.NORMAL_USER_AGENT,
proxies=self.proxies,
http2=False,
)
self._remaining = 40
self._reset = None
self._timeout = 15
def _init_session(self, session=None):
"""
初始化同步请求会话。
"""
self._session = session or requests.Session()
self._req = RequestUtils(
ua=settings.NORMAL_USER_AGENT,
session=self._session,
proxies=self.proxies,
)
def _reset_owned_session(self):
"""
重建当前实例自持有的同步请求会话。
"""
if not self._owns_session:
return
if self._session:
try:
self._session.close()
except Exception as err:
logger.debug(f"关闭TMDB同步会话失败{err!r}")
self._init_session()
@property
def page(self):
return self._page
@@ -110,14 +138,23 @@ class TMDb(object):
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def request(self, method, url, data, json, **kwargs):
if method == "GET":
req = self._req.get_res(url, params=data, json=json)
else:
req = self._req.post_res(url, data=data, json=json)
req = self._request_once(method, url, data, json)
if req is None and method == "GET" and self._owns_session:
logger.debug("TMDB同步请求失败重建会话后重试一次")
self._reset_owned_session()
req = self._request_once(method, url, data, json)
if req is None:
raise TMDbException("无法连接TheMovieDb请检查网络连接")
return self._snapshot_response(req)
def _request_once(self, method, url, data, json):
"""
执行一次TMDB同步请求调用方负责决定是否重建会话后重试。
"""
if method == "GET":
return self._req.get_res(url, params=data, json=json)
return self._req.post_res(url, data=data, json=json)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
async def async_request(self, method, url, data, json, **kwargs):
if method == "GET":

View File

@@ -1,14 +1,24 @@
import asyncio
import pickle
from threading import RLock
from unittest import TestCase
from unittest.mock import Mock, patch
from app.modules.themoviedb.tmdbv3api.tmdb import TMDb
import pytest
from app.modules.themoviedb.tmdbapi import TmdbApi
from app.modules.themoviedb.tmdbv3api.exceptions import TMDbException
from app.modules.themoviedb.tmdbv3api.tmdb import TMDb
class _FakeResponse:
"""
测试用响应对象,模拟 requests/httpx 响应的最小接口。
"""
def __init__(self, payload, headers: dict, status_code: int = 200, text: str = ""):
"""
初始化响应内容。
"""
self._payload = payload
self.headers = headers
self.status_code = status_code
@@ -16,6 +26,9 @@ class _FakeResponse:
self._lock = RLock()
def json(self):
"""
返回预置JSON内容。
"""
return self._payload
@@ -40,199 +53,302 @@ class _UnicodeDecodeErrorResponse:
raise UnicodeDecodeError("utf-8", b"\x8b", 1, 2, "invalid start byte")
class TmdbResponseCacheTest(TestCase):
def test_request_returns_pickleable_snapshot(self):
tmdb = TMDb()
response = _FakeResponse(
payload={"id": 1, "page": 2},
headers={"X-RateLimit-Remaining": "39", "X-RateLimit-Reset": "1234567890"},
)
tmdb._req.get_res = lambda *args, **kwargs: response
def test_request_returns_pickleable_snapshot():
"""
TMDB同步响应应转换为可序列化快照。
"""
tmdb = TMDb()
response = _FakeResponse(
payload={"id": 1, "page": 2},
headers={"X-RateLimit-Remaining": "39", "X-RateLimit-Reset": "1234567890"},
)
tmdb._req.get_res = lambda *args, **kwargs: response
result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
self.assertTrue(result[TMDb._RESPONSE_SNAPSHOT_MARKER])
self.assertEqual(result["json"], {"id": 1, "page": 2})
self.assertEqual(result["headers"]["X-RateLimit-Remaining"], "39")
pickle.dumps(result)
assert result[TMDb._RESPONSE_SNAPSHOT_MARKER]
assert result["json"] == {"id": 1, "page": 2}
assert result["headers"]["X-RateLimit-Remaining"] == "39"
pickle.dumps(result)
def test_request_rejects_scalar_json_response(self):
def test_request_rebuilds_owned_session_after_connection_failure():
"""
自持有Session的TMDB同步请求失败后应重建会话并重试一次。
"""
tmdb = TMDb()
response = _FakeResponse(payload={"id": 1}, headers={})
request_results = [None, response]
sessions = []
requests = []
def _fake_init_session(session=None):
"""
标量JSON响应不应进入TMDB响应缓存避免后续按对象解析崩溃
构造可观测的假Session和RequestUtils
"""
tmdb = TMDb()
response = _FakeResponse(payload="upstream error", headers={})
tmdb._req.get_res = lambda *args, **kwargs: response
fake_session = Mock()
fake_request = Mock()
fake_request.get_res.side_effect = lambda *args, **kwargs: request_results.pop(0)
tmdb._session = fake_session
tmdb._req = fake_request
sessions.append(fake_session)
requests.append(fake_request)
with self.assertRaisesRegex(TMDbException, "返回数据格式异常"):
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
tmdb._init_session = _fake_init_session
tmdb._init_session()
tmdb._request_once = lambda method, url, data, json: tmdb._req.get_res(url)
def test_request_rejects_invalid_json_response(self):
"""
非JSON响应应转换为TMDbException调用方可按连接异常统一处理。
"""
class _InvalidJsonResponse:
headers = {"Content-Type": "text/html"}
status_code = 502
text = "<html>bad gateway</html>"
result = TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
def json(self):
"""
模拟上游返回无法解析为JSON的响应体。
"""
raise ValueError("invalid json")
assert result["json"] == {"id": 1}
assert len(sessions) == 2
sessions[0].close.assert_called_once_with()
assert requests[0].get_res.call_count == 1
assert requests[1].get_res.call_count == 1
tmdb = TMDb()
tmdb._req.get_res = lambda *args, **kwargs: _InvalidJsonResponse()
with self.assertRaisesRegex(TMDbException, "不是有效JSON.*HTTP状态码502.*bad gateway"):
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
def test_async_request_utils_disables_http2_for_tmdb():
"""
TMDB异步请求客户端应关闭HTTP/2避免共享h2长连接异常影响媒体识别。
"""
with patch("app.modules.themoviedb.tmdbv3api.tmdb.AsyncRequestUtils") as async_request_utils:
TMDb()
def test_request_rejects_unicode_decode_error_response(self):
"""
错误编码的响应体也应转换为TMDbException避免UnicodeDecodeError直接冒泡。
"""
tmdb = TMDb()
tmdb._req.get_res = lambda *args, **kwargs: _UnicodeDecodeErrorResponse(
text="乱码内容不应进入日志"
)
assert async_request_utils.call_args.kwargs["http2"] is False
with self.assertRaisesRegex(
TMDbException,
"不是有效JSON.*Content-Encodinggzip.*响应内容编码异常,已省略原始内容",
) as cm:
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
self.assertNotIn("乱码内容", str(cm.exception))
def test_get_response_json_rejects_invalid_live_response(self):
"""
未缓存的实时响应解析失败时也应输出统一诊断信息
"""
class _InvalidJsonResponse:
headers = {}
status_code = 200
text = ""
def test_request_rejects_scalar_json_response():
"""
标量JSON响应不应进入TMDB响应缓存避免后续按对象解析崩溃
"""
tmdb = TMDb()
response = _FakeResponse(payload="upstream error", headers={})
tmdb._req.get_res = lambda *args, **kwargs: response
def json(self):
"""
模拟HTTP 200但响应体为空的情况。
"""
raise ValueError("empty")
with pytest.raises(TMDbException, match="返回数据格式异常"):
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
with self.assertRaisesRegex(TMDbException, "不是有效JSON.*响应内容为空"):
TMDb._get_response_json(_InvalidJsonResponse())
def test_async_request_returns_pickleable_snapshot(self):
tmdb = TMDb()
response = _FakeResponse(
payload={"id": 2, "page": 3},
headers={"x-ratelimit-remaining": "38", "x-ratelimit-reset": "1234567891"},
)
def test_request_rejects_invalid_json_response():
"""
非JSON响应应转换为TMDbException调用方可按连接异常统一处理。
"""
async def _fake_get_res(*args, **kwargs):
return response
class _InvalidJsonResponse:
headers = {"Content-Type": "text/html"}
status_code = 502
text = "<html>bad gateway</html>"
tmdb._async_req.get_res = _fake_get_res
result = asyncio.run(
TMDb.async_request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
)
self.assertTrue(result[TMDb._RESPONSE_SNAPSHOT_MARKER])
self.assertEqual(result["json"], {"id": 2, "page": 3})
self.assertEqual(result["headers"]["x-ratelimit-remaining"], "38")
pickle.dumps(result)
def test_handle_headers_accepts_snapshot_headers(self):
tmdb = TMDb()
tmdb._handle_headers({"x-ratelimit-remaining": "7", "x-ratelimit-reset": "99"})
self.assertEqual(tmdb._remaining, 7)
self.assertEqual(tmdb._reset, 99)
def test_get_response_json_returns_snapshot_copy(self):
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {},
"json": {
"results": [
{"id": 1, "media_type": "movie"},
{"id": 2, "media_type": "tv"},
]
},
}
first_json = TMDb._get_response_json(snapshot)
first_json["results"][0]["media_type"] = "电影"
second_json = TMDb._get_response_json(snapshot)
self.assertEqual(second_json["results"][0]["media_type"], "movie")
self.assertIsNot(first_json, second_json)
self.assertIsNot(first_json["results"][0], second_json["results"][0])
def test_async_request_obj_returns_copied_key_from_snapshot(self):
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": {
"page": 1,
"results": [
{"id": 1, "media_type": "movie"},
{"id": 2, "media_type": "tv"},
],
},
}
async def _fake_async_request(*args, **kwargs):
return snapshot
tmdb.async_request = _fake_async_request
first_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results"))
first_results[0]["media_type"] = "电影"
second_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results"))
self.assertEqual(second_results[0]["media_type"], "movie")
self.assertIsNot(first_results, second_results)
self.assertIsNot(first_results[0], second_results[0])
def test_request_obj_rejects_scalar_snapshot_before_key_lookup(self):
"""
旧缓存中的标量快照不应在读取results字段时触发AttributeError。
"""
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": "upstream error",
}
tmdb.request = lambda *args, **kwargs: snapshot
with self.assertRaisesRegex(TMDbException, "返回数据格式异常"):
tmdb._request_obj("/search/movie", key="results")
def test_async_request_obj_rejects_scalar_snapshot_before_key_lookup(self):
"""
异步对象请求读取旧标量快照时也应走统一TMDB异常路径。
"""
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": "upstream error",
}
async def _fake_async_request(*args, **kwargs):
def json(self):
"""
模拟异步请求命中已缓存的异常快照
模拟上游返回无法解析为JSON的响应体
"""
return snapshot
raise ValueError("invalid json")
tmdb.async_request = _fake_async_request
tmdb = TMDb()
tmdb._req.get_res = lambda *args, **kwargs: _InvalidJsonResponse()
with self.assertRaisesRegex(TMDbException, "返回数据格式异常"):
asyncio.run(tmdb._async_request_obj("/search/movie", key="results"))
with pytest.raises(TMDbException, match="不是有效JSON.*HTTP状态码502.*bad gateway"):
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
def test_request_rejects_unicode_decode_error_response():
"""
错误编码的响应体也应转换为TMDbException避免UnicodeDecodeError直接冒泡。
"""
tmdb = TMDb()
tmdb._req.get_res = lambda *args, **kwargs: _UnicodeDecodeErrorResponse(
text="乱码内容不应进入日志"
)
with pytest.raises(
TMDbException,
match="不是有效JSON.*Content-Encodinggzip.*响应内容编码异常,已省略原始内容",
) as exc_info:
TMDb.request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
assert "乱码内容" not in str(exc_info.value)
def test_get_response_json_rejects_invalid_live_response():
"""
未缓存的实时响应解析失败时也应输出统一诊断信息。
"""
class _InvalidJsonResponse:
headers = {}
status_code = 200
text = ""
def json(self):
"""
模拟HTTP 200但响应体为空的情况。
"""
raise ValueError("empty")
with pytest.raises(TMDbException, match="不是有效JSON.*响应内容为空"):
TMDb._get_response_json(_InvalidJsonResponse())
def test_async_request_returns_pickleable_snapshot():
"""
TMDB异步响应应转换为可序列化快照。
"""
tmdb = TMDb()
response = _FakeResponse(
payload={"id": 2, "page": 3},
headers={"x-ratelimit-remaining": "38", "x-ratelimit-reset": "1234567891"},
)
async def _fake_get_res(*args, **kwargs):
"""
返回预置异步响应。
"""
return response
tmdb._async_req.get_res = _fake_get_res
result = asyncio.run(
TMDb.async_request.__wrapped__(tmdb, "GET", "https://example.com", None, None)
)
assert result[TMDb._RESPONSE_SNAPSHOT_MARKER]
assert result["json"] == {"id": 2, "page": 3}
assert result["headers"]["x-ratelimit-remaining"] == "38"
pickle.dumps(result)
def test_handle_headers_accepts_snapshot_headers():
"""
快照响应头应能参与限流信息解析。
"""
tmdb = TMDb()
tmdb._handle_headers({"x-ratelimit-remaining": "7", "x-ratelimit-reset": "99"})
assert tmdb._remaining == 7
assert tmdb._reset == 99
def test_get_response_json_returns_snapshot_copy():
"""
缓存快照读取时应返回副本,避免调用方原地修改污染缓存。
"""
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {},
"json": {
"results": [
{"id": 1, "media_type": "movie"},
{"id": 2, "media_type": "tv"},
]
},
}
first_json = TMDb._get_response_json(snapshot)
first_json["results"][0]["media_type"] = "电影"
second_json = TMDb._get_response_json(snapshot)
assert second_json["results"][0]["media_type"] == "movie"
assert first_json is not second_json
assert first_json["results"][0] is not second_json["results"][0]
def test_async_request_obj_returns_copied_key_from_snapshot():
"""
异步对象请求从快照取子字段时也应返回副本。
"""
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": {
"page": 1,
"results": [
{"id": 1, "media_type": "movie"},
{"id": 2, "media_type": "tv"},
],
},
}
async def _fake_async_request(*args, **kwargs):
"""
返回预置异步快照。
"""
return snapshot
tmdb.async_request = _fake_async_request
first_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results"))
first_results[0]["media_type"] = "电影"
second_results = asyncio.run(tmdb._async_request_obj("/search/multi", key="results"))
assert second_results[0]["media_type"] == "movie"
assert first_results is not second_results
assert first_results[0] is not second_results[0]
def test_request_obj_rejects_scalar_snapshot_before_key_lookup():
"""
旧缓存中的标量快照不应在读取results字段时触发AttributeError。
"""
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": "upstream error",
}
tmdb.request = lambda *args, **kwargs: snapshot
with pytest.raises(TMDbException, match="返回数据格式异常"):
tmdb._request_obj("/search/movie", key="results")
def test_async_request_obj_rejects_scalar_snapshot_before_key_lookup():
"""
异步对象请求读取旧标量快照时也应走统一TMDB异常路径。
"""
tmdb = TMDb()
snapshot = {
TMDb._RESPONSE_SNAPSHOT_MARKER: True,
"headers": {"x-ratelimit-remaining": "39", "x-ratelimit-reset": "1234567890"},
"json": "upstream error",
}
async def _fake_async_request(*args, **kwargs):
"""
模拟异步请求命中已缓存的异常快照。
"""
return snapshot
tmdb.async_request = _fake_async_request
with pytest.raises(TMDbException, match="返回数据格式异常"):
asyncio.run(tmdb._async_request_obj("/search/movie", key="results"))
def test_tmdb_api_close_closes_all_clients():
"""
TmdbApi关闭时应释放所有子客户端的同步连接池。
"""
tmdb_api = TmdbApi()
clients = [
tmdb_api.tmdb,
tmdb_api.search,
tmdb_api.movie,
tmdb_api.tv,
tmdb_api.season_obj,
tmdb_api.episode_obj,
tmdb_api.discover,
tmdb_api.trending,
tmdb_api.person,
tmdb_api.collection,
]
for client in clients:
client.close = Mock()
tmdb_api.close()
for client in clients:
client.close.assert_called_once_with()

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.13.6'
FRONTEND_VERSION = 'v2.13.6'
APP_VERSION = 'v2.13.7'
FRONTEND_VERSION = 'v2.13.7'