From e02cebe16ca7919a7b3a944d24a287f27b5d0e2e Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sun, 21 Jun 2026 07:37:47 +0800 Subject: [PATCH] fix(tests): stabilize messaging shutdown (#5979) --- .coveragerc | 27 ++++++++++++ .github/workflows/test.yml | 28 +++++++++--- .gitignore | 3 ++ app/agent/tools/base.py | 10 +++++ app/modules/feishu/feishu.py | 5 +++ app/modules/filter/RuleParser.py | 12 +++--- app/modules/telegram/telegram.py | 26 ++++++++--- pytest.ini | 2 + requirements.in | 2 + tests/conftest.py | 34 +++++++++++++++ tests/test_agent_tool_timeouts.py | 51 +++++++++++++++++++++- tests/test_feishu.py | 22 +++++++--- tests/test_feishu_ws_lifecycle.py | 29 +++++++++++++ tests/test_telegram.py | 7 ++- tests/test_telegram_typing_lifecycle.py | 57 ++++++++++++++++++------- 15 files changed, 275 insertions(+), 40 deletions(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..0d35fe00 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,27 @@ +[run] +branch = True +source = app +omit = + app/plugins/*/* + app/testing/* + app/helper/sites.py + +[report] +show_missing = True +skip_empty = True +precision = 2 +exclude_lines = + pragma: no cover + if TYPE_CHECKING: + if __name__ == .__main__.: + raise NotImplementedError + pass + +[html] +directory = htmlcov + +[xml] +output = coverage.xml + +[json] +output = coverage.json diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index eb25f713..64e5dc3d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,19 +22,20 @@ jobs: pytest: runs-on: ubuntu-latest name: Unit Tests + timeout-minutes: 20 steps: - name: Checkout code - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Set up Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: '3.12' cache: 'pip' - name: Cache pip dependencies - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ~/.cache/pip key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.in', '**/requirements.txt') }} @@ -44,12 +45,27 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip setuptools wheel - # 用 requirements.in 还原 CI / 全新环境(含 pytest~=8.4 与 moviepilot-rust 等可选扩展), + # 用 requirements.in 还原 CI / 全新环境(含 pytest 与 moviepilot-rust 等可选扩展), # 与本地"干净 venv 复现"一致;测试运行器 pytest 已在 requirements.in 中声明。 pip install -r requirements.in - name: Run tests + timeout-minutes: 10 run: | # tests/run.py 以 pytest 跑 tests 全量;tests/conftest.py 在收集前把 CONFIG_DIR - # 指向临时库并建表,测试杜绝真实网络/外部服务(详见 docs/testing.md)。 - python tests/run.py + # 指向临时库并建表;CI 额外生成覆盖率报告,便于后续补测和回归分析。 + python -m coverage erase + python -m coverage run tests/run.py + python -m coverage report + python -m coverage json + python -m coverage xml + + - name: Upload coverage report + if: always() + uses: actions/upload-artifact@v6 + with: + name: coverage-report + path: | + coverage.xml + coverage.json + retention-days: 7 diff --git a/.gitignore b/.gitignore index 37ee7810..94a55856 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,9 @@ public/ *.pyc *.log .coverage +coverage.xml +coverage.json +htmlcov/ .vscode venv diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index efdb683c..36eebaf0 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -115,6 +115,16 @@ def _get_blocking_executor(bucket: str) -> ThreadPoolExecutor: return executor +def shutdown_blocking_executors(*, wait: bool = True, cancel_futures: bool = False) -> None: + """关闭 Agent 工具阻塞线程池,释放长期运行进程或测试环境中的 worker。""" + with _blocking_executor_lock: + executors = list(_blocking_executors.values()) + _blocking_executors.clear() + + for executor in executors: + executor.shutdown(wait=wait, cancel_futures=cancel_futures) + + class ToolExecutionTimeoutError(TimeoutError): """Agent 工具执行超时异常。""" diff --git a/app/modules/feishu/feishu.py b/app/modules/feishu/feishu.py index 8996a951..165487b8 100644 --- a/app/modules/feishu/feishu.py +++ b/app/modules/feishu/feishu.py @@ -264,6 +264,11 @@ class Feishu: @staticmethod async def _disconnect_ws_client_quietly(ws_client: lark.ws.Client) -> None: """静默关闭飞书 WebSocket,避免 SDK 在关机时打印带敏感参数的连接地址。""" + if ws_client._conn is None: + ws_client._conn_url = "" + ws_client._conn_id = "" + ws_client._service_id = "" + return await ws_client._lock.acquire() try: if ws_client._conn is not None: diff --git a/app/modules/filter/RuleParser.py b/app/modules/filter/RuleParser.py index 164a6980..7cb00e21 100644 --- a/app/modules/filter/RuleParser.py +++ b/app/modules/filter/RuleParser.py @@ -1,6 +1,6 @@ import threading -from pyparsing import Forward, Literal, Word, alphas, infixNotation, opAssoc, alphanums, Combine, nums, ParseResults +from pyparsing import Forward, Literal, Word, alphas, infix_notation, opAssoc, alphanums, Combine, nums, ParseResults from app.utils import rust_accel @@ -21,16 +21,16 @@ class RuleParser: # 原子 atom: Combine = Combine(Word(alphas, alphanums) | (Word(nums) + Word(alphas, alphanums))) # 逻辑非操作符 - operator_not: Literal = Literal('!').setParseAction(lambda t: 'not') + operator_not: Literal = Literal('!').set_parse_action(lambda t: 'not') # 逻辑或操作符 - operator_or: Literal = Literal('|').setParseAction(lambda t: 'or') + operator_or: Literal = Literal('|').set_parse_action(lambda t: 'or') # 逻辑与操作符 - operator_and: Literal = Literal('&').setParseAction(lambda t: 'and') + operator_and: Literal = Literal('&').set_parse_action(lambda t: 'and') # 定义表达式的语法规则 expr <<= (operator_not + expr) | atom | ('(' + expr + ')') # 运算符优先级 - self.expr = infixNotation(expr, + self.expr = infix_notation(expr, [(operator_not, 1, opAssoc.RIGHT), (operator_and, 2, opAssoc.LEFT), (operator_or, 2, opAssoc.LEFT)]) @@ -53,7 +53,7 @@ class RuleParser: rust_result = rust_accel.parse_filter_rule(expression) if rust_result is not None: return _RustParseResults(rust_result) - return self.expr.parseString(expression) + return self.expr.parse_string(expression) class _RustParseResults(list): diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 689ca96c..1703d128 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -14,8 +14,8 @@ from telebot.types import ( InlineKeyboardButton, InputMediaPhoto, ) -from telegramify_markdown import standardize, telegramify # noqa -from telegramify_markdown.type import ContentTypes, SentType +from telegramify_markdown import entities_to_markdownv2, standardize, telegramify # noqa +from telegramify_markdown.content import ContentTypes, File, Photo, Text from app.core.config import settings from app.core.context import MediaInfo, Context @@ -242,6 +242,18 @@ class Telegram: logger.error(f"下载Telegram文件失败: {e}") return None + @staticmethod + def _telegramify_item_text(item: Text) -> str: + """将 telegramify 文本片段转换为 Telegram MarkdownV2 字符串。""" + return entities_to_markdownv2(item.text, item.entities) + + @staticmethod + def _telegramify_item_caption(item: Text | File | Photo) -> str: + """将 telegramify 文本或媒体片段转换为 Telegram MarkdownV2 caption。""" + if isinstance(item, Text): + return Telegram._telegramify_item_text(item) + return entities_to_markdownv2(item.caption_text, item.caption_entities) + @staticmethod def _serialize_update_payload(message: Any) -> Optional[dict]: """ @@ -1138,7 +1150,7 @@ class Telegram: reply_markup = kwargs.pop("reply_markup", None) - boxs: SentType = ( + boxs: list[Text | File | Photo] = ( ThreadHelper() .submit(lambda x: asyncio.run(telegramify(x)), caption) .result() @@ -1158,7 +1170,9 @@ class Telegram: if disable_web_page_preview is not None: msg_kwargs["disable_web_page_preview"] = disable_web_page_preview ret = self._bot.send_message( - **msg_kwargs, text=item.content, reply_markup=current_reply_markup + **msg_kwargs, + text=self._telegramify_item_text(item), + reply_markup=current_reply_markup, ) elif item.content_type == ContentTypes.PHOTO or (image and i == 0): @@ -1168,7 +1182,7 @@ class Telegram: getattr(item, "file_name", ""), getattr(item, "file_data", image), ), - caption=getattr(item, "caption", item.content), + caption=self._telegramify_item_caption(item), reply_markup=current_reply_markup, ) @@ -1176,7 +1190,7 @@ class Telegram: ret = self._bot.send_document( **kwargs, document=(item.file_name, item.file_data), - caption=item.caption, + caption=self._telegramify_item_caption(item), reply_markup=current_reply_markup, ) diff --git a/pytest.ini b/pytest.ini index f5b2d24a..85c4473a 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,8 @@ [pytest] testpaths = tests +timeout = 120 +timeout_method = thread # 仅对「无法在本仓修复根因」的已知上游/三方弃用告警做精确忽略,保持测试输出干净、 # 让本仓自身的新告警更醒目。本仓代码引发的告警一律不在此忽略,应在源码/用例处修复。 filterwarnings = diff --git a/requirements.in b/requirements.in index 8ecbd4cb..80828459 100644 --- a/requirements.in +++ b/requirements.in @@ -92,3 +92,5 @@ ddgs~=9.14.4 websocket-client~=1.9.0 lark-oapi~=1.6.8 pytest~=9.0.3 +pytest-cov~=7.1.0 +pytest-timeout~=2.4.0 diff --git a/tests/conftest.py b/tests/conftest.py index 518b72ba..78a67f4d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,8 @@ 引导与网络守卫均复用 ``app/testing`` 的共享 harness(与插件仓 conftest 同源), 引导逻辑只在 ``app/testing`` 维护一处。 """ +import sys + # 必须早于首个 import app.db(其在 import 期即按 CONFIG_PATH 连库):prepare_backend 内部 # 先隔离 CONFIG_DIR、补 app.helper.sites 垫片,再建表。app/testing 仅依赖标准库、import 不连库, # 故此处先 import 再调用是安全的。 @@ -12,3 +14,35 @@ prepare_backend() # 复用共享 autouse 网络守卫;同一实现亦供各插件仓 conftest import 复用,避免逐仓维护 from app.testing.network_guard import block_real_network # noqa: E402,F401 + + +def _report_session_cleanup_error(name: str, err: Exception) -> None: + """测试收尾清理失败只记录诊断,不覆盖原始 pytest 退出状态。""" + sys.stderr.write(f"\npytest session cleanup failed: {name}: {err!r}\n") + + +def pytest_sessionfinish(session, exitstatus): + """释放测试过程中按需创建的全局后台资源,避免解释器退出时等待非 daemon worker。""" + try: + from app.agent.tools.base import shutdown_blocking_executors + + shutdown_blocking_executors(cancel_futures=True) + except Exception as err: + _report_session_cleanup_error("agent blocking executors", err) + + try: + from app.helper.thread import ThreadHelper + from app.utils.singleton import Singleton + + helper = Singleton._instances.get((ThreadHelper, (), frozenset())) + if helper: + helper.shutdown() + except Exception as err: + _report_session_cleanup_error("thread helper", err) + + try: + from app.log import LoggerManager + + LoggerManager.shutdown() + except Exception as err: + _report_session_cleanup_error("logger manager", err) diff --git a/tests/test_agent_tool_timeouts.py b/tests/test_agent_tool_timeouts.py index 50e4af4b..fb675cbe 100644 --- a/tests/test_agent_tool_timeouts.py +++ b/tests/test_agent_tool_timeouts.py @@ -4,7 +4,7 @@ from unittest.mock import patch import pytest -from app.agent.tools.base import MoviePilotTool +from app.agent.tools.base import MoviePilotTool, _blocking_executors, shutdown_blocking_executors from app.agent.tools.manager import MoviePilotToolsManager @@ -96,6 +96,55 @@ def test_run_blocking_keeps_bucket_slot_until_worker_finishes(): asyncio.run(_run_scenario()) +def test_shutdown_blocking_executors_clears_agent_tool_workers(): + """测试结束清理应关闭 Agent 工具阻塞线程池,避免全量测试退出时等待 worker。""" + + async def _create_worker(): + await MoviePilotTool.run_blocking("web", lambda: "done") + + asyncio.run(_create_worker()) + assert "web" in _blocking_executors + + shutdown_blocking_executors() + + assert _blocking_executors == {} + + +def test_shutdown_blocking_executors_cancels_queued_workers_and_is_idempotent(): + """收尾清理应取消尚未开始的排队任务,并允许重复调用。""" + shutdown_blocking_executors(wait=False, cancel_futures=True) + started = [threading.Event(), threading.Event()] + release = threading.Event() + queued_ran = threading.Event() + + def _blocking_call(index: int) -> str: + started[index].set() + release.wait() + return f"done-{index}" + + async def _run_scenario(): + tasks = [ + asyncio.create_task(MoviePilotTool.run_blocking("web", _blocking_call, index)) + for index in range(2) + ] + for event in started: + assert await asyncio.wait_for(asyncio.to_thread(event.wait), timeout=1) + executor = _blocking_executors["web"] + queued_future = executor.submit(queued_ran.set) + shutdown_blocking_executors(wait=False, cancel_futures=True) + shutdown_blocking_executors(wait=False, cancel_futures=True) + release.set() + + assert await asyncio.wait_for(asyncio.gather(*tasks), timeout=1) == ["done-0", "done-1"] + return queued_future + + queued_future = asyncio.run(_run_scenario()) + + assert _blocking_executors == {} + assert queued_future.cancelled() + assert not queued_ran.is_set() + + def test_create_agent_config_uses_llm_max_iterations(): """Agent 执行配置应把 LLM_MAX_ITERATIONS 传给 LangGraph recursion_limit。""" from app.agent import MoviePilotAgent diff --git a/tests/test_feishu.py b/tests/test_feishu.py index dcbcb8e5..48c5dd2a 100644 --- a/tests/test_feishu.py +++ b/tests/test_feishu.py @@ -1158,15 +1158,30 @@ class TestFeishu(unittest.TestCase): def test_run_ws_client_binds_thread_local_event_loop(self): client = self._build_client() original_loop = object() - fake_ws_client = MagicMock() created_loops = [] real_new_event_loop = asyncio.new_event_loop + class _FakeWsClient: + """显式模拟飞书 SDK 长连接客户端,避免 MagicMock 在线程清理路径污染全局 mock 锁。""" + + def __init__(self): + self._auto_reconnect = True + self._conn = None + self._conn_url = "wss://msg-frontier.feishu.cn/ws/v2?access_key=secret&ticket=secret" + self._conn_id = "conn_test" + self._service_id = "service_test" + self._lock = asyncio.Lock() + self.started = False + + def start(self): + self.started = True + def _new_loop(): loop = real_new_event_loop() created_loops.append(loop) return loop + fake_ws_client = _FakeWsClient() with ( patch( "app.modules.feishu.feishu.lark_ws_client_module.loop", original_loop @@ -1182,14 +1197,11 @@ class TestFeishu(unittest.TestCase): patch( "app.modules.feishu.feishu.lark.ws.Client", return_value=fake_ws_client ), - patch.object( - fake_ws_client, "start", side_effect=lambda: None - ) as mock_start, ): client._run_ws_client() self.assertIsNone(client._ws_loop) - mock_start.assert_called_once() + self.assertTrue(fake_ws_client.started) self.assertEqual(len(created_loops), 1) self.assertTrue(created_loops[0].is_closed()) diff --git a/tests/test_feishu_ws_lifecycle.py b/tests/test_feishu_ws_lifecycle.py index b4983de7..7d1bb62d 100644 --- a/tests/test_feishu_ws_lifecycle.py +++ b/tests/test_feishu_ws_lifecycle.py @@ -71,6 +71,35 @@ def test_shutdown_ws_client_cancels_sdk_tasks_before_quiet_disconnect(): assert client._ws_tasks == set() +def test_shutdown_ws_client_skips_disconnect_when_sdk_lock_is_busy_and_connection_gone(): + """SDK 已无连接对象时,关机清理不应等待可能长期占用的内部锁。""" + client = _build_feishu_client() + + async def _run_shutdown() -> SimpleNamespace: + lock = asyncio.Lock() + await lock.acquire() + ws_client = SimpleNamespace( + _auto_reconnect=True, + _conn=None, + _conn_url="wss://msg-frontier.feishu.cn/ws/v2?access_key=secret&ticket=secret", + _conn_id="conn_test", + _service_id="service_test", + _lock=lock, + ) + client._ws_client = ws_client + + await asyncio.wait_for(client._shutdown_ws_client(), timeout=0.2) + return ws_client + + ws_client = asyncio.run(_run_shutdown()) + + assert not ws_client._auto_reconnect + assert ws_client._conn is None + assert ws_client._conn_url == "" + assert ws_client._conn_id == "" + assert ws_client._service_id == "" + + def test_consume_ws_task_result_suppresses_stop_exception(): """停止过程中飞书 SDK 后台任务的异常应被取回并降为调试日志。""" client = _build_feishu_client() diff --git a/tests/test_telegram.py b/tests/test_telegram.py index 4ebbd7a3..9f917811 100644 --- a/tests/test_telegram.py +++ b/tests/test_telegram.py @@ -25,9 +25,14 @@ def telegram(): bot_instance = MagicMock() # get_me 用于初始化 bot 用户名,需返回带 username 的对象 bot_instance.get_me.return_value = MagicMock(username="test_bot") + # polling/stop 使用普通函数,避免后台线程执行 MagicMock 时在退出阶段产生锁竞争。 + bot_instance.infinity_polling = lambda *args, **kwargs: None + bot_instance.stop_polling = lambda *args, **kwargs: None mock_telebot_cls.return_value = bot_instance mock_image_cls.return_value.fetch_image.return_value = b"fake-image-bytes" - yield Telegram(TELEGRAM_TOKEN="fake_token", TELEGRAM_CHAT_ID="fake_chat_id") + telegram = Telegram(TELEGRAM_TOKEN="fake_token", TELEGRAM_CHAT_ID="fake_chat_id") + yield telegram + telegram.stop() def test_send_msg_success(telegram): diff --git a/tests/test_telegram_typing_lifecycle.py b/tests/test_telegram_typing_lifecycle.py index 431a075a..be15fdba 100644 --- a/tests/test_telegram_typing_lifecycle.py +++ b/tests/test_telegram_typing_lifecycle.py @@ -1,4 +1,5 @@ import asyncio +import threading import time import unittest from types import SimpleNamespace @@ -13,6 +14,28 @@ from app.modules.telegram.telegram import Telegram from app.schemas.types import MessageChannel +def _wait_until(predicate, timeout: float = 1.0) -> bool: + """等待后台线程完成目标状态,避免用例依赖固定 sleep 时长。""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(0.01) + return predicate() + + +class _FakeTelegramBot: + """记录 typing 调用的轻量 bot,避免后台线程与 Mock 内部锁交互。""" + + def __init__(self): + self.chat_actions = [] + self.action_event = threading.Event() + + def send_chat_action(self, chat_id, action): + self.chat_actions.append((chat_id, action)) + self.action_event.set() + + class TestTelegramTypingLifecycle(unittest.TestCase): def setUp(self): self._cleanup_typing_tasks() @@ -32,7 +55,7 @@ class TestTelegramTypingLifecycle(unittest.TestCase): @staticmethod def _telegram_client() -> Telegram: telegram = Telegram.__new__(Telegram) - telegram._bot = Mock() + telegram._bot = _FakeTelegramBot() telegram._telegram_token = "token" telegram._telegram_chat_id = "default-chat" # 缩短测试中的等待时间,不改变生产默认续发间隔。 @@ -48,10 +71,9 @@ class TestTelegramTypingLifecycle(unittest.TestCase): max_duration_seconds=1, initial_delay_seconds=0, ) - time.sleep(0.03) self.assertIn("chat-1", Telegram._typing_tasks) - self.assertTrue(telegram._bot.send_chat_action.called) + self.assertTrue(telegram._bot.action_event.wait(1.0)) self.assertTrue(telegram.stop_typing(chat_id="chat-1")) self.assertNotIn("chat-1", Telegram._typing_tasks) @@ -77,8 +99,8 @@ class TestTelegramTypingLifecycle(unittest.TestCase): max_duration_seconds=0.02, initial_delay_seconds=0, ) - time.sleep(0.08) + self.assertTrue(_wait_until(lambda: "chat-3" not in Telegram._typing_tasks)) self.assertNotIn("chat-3", Telegram._typing_tasks) def test_short_typing_task_can_stop_before_first_chat_action(self): @@ -95,7 +117,7 @@ class TestTelegramTypingLifecycle(unittest.TestCase): telegram.stop_typing(chat_id="chat-4") time.sleep(0.08) - telegram._bot.send_chat_action.assert_not_called() + self.assertEqual(telegram._bot.chat_actions, []) self.assertNotIn("chat-4", Telegram._typing_tasks) def test_agent_managed_send_msg_keeps_typing_for_worker_cleanup(self): @@ -319,19 +341,24 @@ class TestTelegramTypingLifecycle(unittest.TestCase): "chat_id": "-100", "metadata": {"kind": "typing"}, } + calls = [] - with patch("app.agent.AgentChain") as chain_cls: - chain_cls.return_value.start_message_processing_status.return_value = status + class FakeAgentChain: + def start_message_processing_status(self, **kwargs): + calls.append(kwargs) + return status + + with patch("app.agent.AgentChain", FakeAgentChain): result = await _async_start_processing_status(task) - chain_cls.return_value.start_message_processing_status.assert_called_once_with( - channel=MessageChannel.Telegram, - source="telegram-test", - userid="10001", - message_id="10", - chat_id="-100", - text="第一条", - ) + self.assertEqual(calls, [{ + "channel": MessageChannel.Telegram, + "source": "telegram-test", + "userid": "10001", + "message_id": "10", + "chat_id": "-100", + "text": "第一条", + }]) self.assertEqual(result, status) asyncio.run(_run())