fix(tests): stabilize messaging shutdown (#5979)

This commit is contained in:
InfinityPacer
2026-06-21 07:37:47 +08:00
committed by GitHub
parent b395d820d8
commit e02cebe16c
15 changed files with 275 additions and 40 deletions

27
.coveragerc Normal file
View File

@@ -0,0 +1,27 @@
[run]
branch = True
source = app
omit =
app/plugins/*/*
app/testing/*
app/helper/sites.py
[report]
show_missing = True
skip_empty = True
precision = 2
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
if __name__ == .__main__.:
raise NotImplementedError
pass
[html]
directory = htmlcov
[xml]
output = coverage.xml
[json]
output = coverage.json

View File

@@ -22,19 +22,20 @@ jobs:
pytest:
runs-on: ubuntu-latest
name: Unit Tests
timeout-minutes: 20
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: '3.12'
cache: 'pip'
- name: Cache pip dependencies
uses: actions/cache@v4
uses: actions/cache@v5
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.in', '**/requirements.txt') }}
@@ -44,12 +45,27 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
# 用 requirements.in 还原 CI / 全新环境(含 pytest~=8.4 与 moviepilot-rust 等可选扩展),
# 用 requirements.in 还原 CI / 全新环境(含 pytest 与 moviepilot-rust 等可选扩展),
# 与本地"干净 venv 复现"一致;测试运行器 pytest 已在 requirements.in 中声明。
pip install -r requirements.in
- name: Run tests
timeout-minutes: 10
run: |
# tests/run.py 以 pytest 跑 tests 全量tests/conftest.py 在收集前把 CONFIG_DIR
# 指向临时库并建表,测试杜绝真实网络/外部服务(详见 docs/testing.md
python tests/run.py
# 指向临时库并建表CI 额外生成覆盖率报告,便于后续补测和回归分析
python -m coverage erase
python -m coverage run tests/run.py
python -m coverage report
python -m coverage json
python -m coverage xml
- name: Upload coverage report
if: always()
uses: actions/upload-artifact@v6
with:
name: coverage-report
path: |
coverage.xml
coverage.json
retention-days: 7

3
.gitignore vendored
View File

@@ -31,6 +31,9 @@ public/
*.pyc
*.log
.coverage
coverage.xml
coverage.json
htmlcov/
.vscode
venv

View File

@@ -115,6 +115,16 @@ def _get_blocking_executor(bucket: str) -> ThreadPoolExecutor:
return executor
def shutdown_blocking_executors(*, wait: bool = True, cancel_futures: bool = False) -> None:
"""关闭 Agent 工具阻塞线程池,释放长期运行进程或测试环境中的 worker。"""
with _blocking_executor_lock:
executors = list(_blocking_executors.values())
_blocking_executors.clear()
for executor in executors:
executor.shutdown(wait=wait, cancel_futures=cancel_futures)
class ToolExecutionTimeoutError(TimeoutError):
"""Agent 工具执行超时异常。"""

View File

@@ -264,6 +264,11 @@ class Feishu:
@staticmethod
async def _disconnect_ws_client_quietly(ws_client: lark.ws.Client) -> None:
"""静默关闭飞书 WebSocket避免 SDK 在关机时打印带敏感参数的连接地址。"""
if ws_client._conn is None:
ws_client._conn_url = ""
ws_client._conn_id = ""
ws_client._service_id = ""
return
await ws_client._lock.acquire()
try:
if ws_client._conn is not None:

View File

@@ -1,6 +1,6 @@
import threading
from pyparsing import Forward, Literal, Word, alphas, infixNotation, opAssoc, alphanums, Combine, nums, ParseResults
from pyparsing import Forward, Literal, Word, alphas, infix_notation, opAssoc, alphanums, Combine, nums, ParseResults
from app.utils import rust_accel
@@ -21,16 +21,16 @@ class RuleParser:
# 原子
atom: Combine = Combine(Word(alphas, alphanums) | (Word(nums) + Word(alphas, alphanums)))
# 逻辑非操作符
operator_not: Literal = Literal('!').setParseAction(lambda t: 'not')
operator_not: Literal = Literal('!').set_parse_action(lambda t: 'not')
# 逻辑或操作符
operator_or: Literal = Literal('|').setParseAction(lambda t: 'or')
operator_or: Literal = Literal('|').set_parse_action(lambda t: 'or')
# 逻辑与操作符
operator_and: Literal = Literal('&').setParseAction(lambda t: 'and')
operator_and: Literal = Literal('&').set_parse_action(lambda t: 'and')
# 定义表达式的语法规则
expr <<= (operator_not + expr) | atom | ('(' + expr + ')')
# 运算符优先级
self.expr = infixNotation(expr,
self.expr = infix_notation(expr,
[(operator_not, 1, opAssoc.RIGHT),
(operator_and, 2, opAssoc.LEFT),
(operator_or, 2, opAssoc.LEFT)])
@@ -53,7 +53,7 @@ class RuleParser:
rust_result = rust_accel.parse_filter_rule(expression)
if rust_result is not None:
return _RustParseResults(rust_result)
return self.expr.parseString(expression)
return self.expr.parse_string(expression)
class _RustParseResults(list):

View File

@@ -14,8 +14,8 @@ from telebot.types import (
InlineKeyboardButton,
InputMediaPhoto,
)
from telegramify_markdown import standardize, telegramify # noqa
from telegramify_markdown.type import ContentTypes, SentType
from telegramify_markdown import entities_to_markdownv2, standardize, telegramify # noqa
from telegramify_markdown.content import ContentTypes, File, Photo, Text
from app.core.config import settings
from app.core.context import MediaInfo, Context
@@ -242,6 +242,18 @@ class Telegram:
logger.error(f"下载Telegram文件失败: {e}")
return None
@staticmethod
def _telegramify_item_text(item: Text) -> str:
"""将 telegramify 文本片段转换为 Telegram MarkdownV2 字符串。"""
return entities_to_markdownv2(item.text, item.entities)
@staticmethod
def _telegramify_item_caption(item: Text | File | Photo) -> str:
"""将 telegramify 文本或媒体片段转换为 Telegram MarkdownV2 caption。"""
if isinstance(item, Text):
return Telegram._telegramify_item_text(item)
return entities_to_markdownv2(item.caption_text, item.caption_entities)
@staticmethod
def _serialize_update_payload(message: Any) -> Optional[dict]:
"""
@@ -1138,7 +1150,7 @@ class Telegram:
reply_markup = kwargs.pop("reply_markup", None)
boxs: SentType = (
boxs: list[Text | File | Photo] = (
ThreadHelper()
.submit(lambda x: asyncio.run(telegramify(x)), caption)
.result()
@@ -1158,7 +1170,9 @@ class Telegram:
if disable_web_page_preview is not None:
msg_kwargs["disable_web_page_preview"] = disable_web_page_preview
ret = self._bot.send_message(
**msg_kwargs, text=item.content, reply_markup=current_reply_markup
**msg_kwargs,
text=self._telegramify_item_text(item),
reply_markup=current_reply_markup,
)
elif item.content_type == ContentTypes.PHOTO or (image and i == 0):
@@ -1168,7 +1182,7 @@ class Telegram:
getattr(item, "file_name", ""),
getattr(item, "file_data", image),
),
caption=getattr(item, "caption", item.content),
caption=self._telegramify_item_caption(item),
reply_markup=current_reply_markup,
)
@@ -1176,7 +1190,7 @@ class Telegram:
ret = self._bot.send_document(
**kwargs,
document=(item.file_name, item.file_data),
caption=item.caption,
caption=self._telegramify_item_caption(item),
reply_markup=current_reply_markup,
)

View File

@@ -1,6 +1,8 @@
[pytest]
testpaths =
tests
timeout = 120
timeout_method = thread
# 仅对「无法在本仓修复根因」的已知上游/三方弃用告警做精确忽略,保持测试输出干净、
# 让本仓自身的新告警更醒目。本仓代码引发的告警一律不在此忽略,应在源码/用例处修复。
filterwarnings =

View File

@@ -92,3 +92,5 @@ ddgs~=9.14.4
websocket-client~=1.9.0
lark-oapi~=1.6.8
pytest~=9.0.3
pytest-cov~=7.1.0
pytest-timeout~=2.4.0

View File

@@ -3,6 +3,8 @@
引导与网络守卫均复用 ``app/testing`` 的共享 harness与插件仓 conftest 同源),
引导逻辑只在 ``app/testing`` 维护一处。
"""
import sys
# 必须早于首个 import app.db其在 import 期即按 CONFIG_PATH 连库prepare_backend 内部
# 先隔离 CONFIG_DIR、补 app.helper.sites 垫片再建表。app/testing 仅依赖标准库、import 不连库,
# 故此处先 import 再调用是安全的。
@@ -12,3 +14,35 @@ prepare_backend()
# 复用共享 autouse 网络守卫;同一实现亦供各插件仓 conftest import 复用,避免逐仓维护
from app.testing.network_guard import block_real_network # noqa: E402,F401
def _report_session_cleanup_error(name: str, err: Exception) -> None:
"""测试收尾清理失败只记录诊断,不覆盖原始 pytest 退出状态。"""
sys.stderr.write(f"\npytest session cleanup failed: {name}: {err!r}\n")
def pytest_sessionfinish(session, exitstatus):
"""释放测试过程中按需创建的全局后台资源,避免解释器退出时等待非 daemon worker。"""
try:
from app.agent.tools.base import shutdown_blocking_executors
shutdown_blocking_executors(cancel_futures=True)
except Exception as err:
_report_session_cleanup_error("agent blocking executors", err)
try:
from app.helper.thread import ThreadHelper
from app.utils.singleton import Singleton
helper = Singleton._instances.get((ThreadHelper, (), frozenset()))
if helper:
helper.shutdown()
except Exception as err:
_report_session_cleanup_error("thread helper", err)
try:
from app.log import LoggerManager
LoggerManager.shutdown()
except Exception as err:
_report_session_cleanup_error("logger manager", err)

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.base import MoviePilotTool, _blocking_executors, shutdown_blocking_executors
from app.agent.tools.manager import MoviePilotToolsManager
@@ -96,6 +96,55 @@ def test_run_blocking_keeps_bucket_slot_until_worker_finishes():
asyncio.run(_run_scenario())
def test_shutdown_blocking_executors_clears_agent_tool_workers():
"""测试结束清理应关闭 Agent 工具阻塞线程池,避免全量测试退出时等待 worker。"""
async def _create_worker():
await MoviePilotTool.run_blocking("web", lambda: "done")
asyncio.run(_create_worker())
assert "web" in _blocking_executors
shutdown_blocking_executors()
assert _blocking_executors == {}
def test_shutdown_blocking_executors_cancels_queued_workers_and_is_idempotent():
"""收尾清理应取消尚未开始的排队任务,并允许重复调用。"""
shutdown_blocking_executors(wait=False, cancel_futures=True)
started = [threading.Event(), threading.Event()]
release = threading.Event()
queued_ran = threading.Event()
def _blocking_call(index: int) -> str:
started[index].set()
release.wait()
return f"done-{index}"
async def _run_scenario():
tasks = [
asyncio.create_task(MoviePilotTool.run_blocking("web", _blocking_call, index))
for index in range(2)
]
for event in started:
assert await asyncio.wait_for(asyncio.to_thread(event.wait), timeout=1)
executor = _blocking_executors["web"]
queued_future = executor.submit(queued_ran.set)
shutdown_blocking_executors(wait=False, cancel_futures=True)
shutdown_blocking_executors(wait=False, cancel_futures=True)
release.set()
assert await asyncio.wait_for(asyncio.gather(*tasks), timeout=1) == ["done-0", "done-1"]
return queued_future
queued_future = asyncio.run(_run_scenario())
assert _blocking_executors == {}
assert queued_future.cancelled()
assert not queued_ran.is_set()
def test_create_agent_config_uses_llm_max_iterations():
"""Agent 执行配置应把 LLM_MAX_ITERATIONS 传给 LangGraph recursion_limit。"""
from app.agent import MoviePilotAgent

View File

@@ -1158,15 +1158,30 @@ class TestFeishu(unittest.TestCase):
def test_run_ws_client_binds_thread_local_event_loop(self):
client = self._build_client()
original_loop = object()
fake_ws_client = MagicMock()
created_loops = []
real_new_event_loop = asyncio.new_event_loop
class _FakeWsClient:
"""显式模拟飞书 SDK 长连接客户端,避免 MagicMock 在线程清理路径污染全局 mock 锁。"""
def __init__(self):
self._auto_reconnect = True
self._conn = None
self._conn_url = "wss://msg-frontier.feishu.cn/ws/v2?access_key=secret&ticket=secret"
self._conn_id = "conn_test"
self._service_id = "service_test"
self._lock = asyncio.Lock()
self.started = False
def start(self):
self.started = True
def _new_loop():
loop = real_new_event_loop()
created_loops.append(loop)
return loop
fake_ws_client = _FakeWsClient()
with (
patch(
"app.modules.feishu.feishu.lark_ws_client_module.loop", original_loop
@@ -1182,14 +1197,11 @@ class TestFeishu(unittest.TestCase):
patch(
"app.modules.feishu.feishu.lark.ws.Client", return_value=fake_ws_client
),
patch.object(
fake_ws_client, "start", side_effect=lambda: None
) as mock_start,
):
client._run_ws_client()
self.assertIsNone(client._ws_loop)
mock_start.assert_called_once()
self.assertTrue(fake_ws_client.started)
self.assertEqual(len(created_loops), 1)
self.assertTrue(created_loops[0].is_closed())

View File

@@ -71,6 +71,35 @@ def test_shutdown_ws_client_cancels_sdk_tasks_before_quiet_disconnect():
assert client._ws_tasks == set()
def test_shutdown_ws_client_skips_disconnect_when_sdk_lock_is_busy_and_connection_gone():
"""SDK 已无连接对象时,关机清理不应等待可能长期占用的内部锁。"""
client = _build_feishu_client()
async def _run_shutdown() -> SimpleNamespace:
lock = asyncio.Lock()
await lock.acquire()
ws_client = SimpleNamespace(
_auto_reconnect=True,
_conn=None,
_conn_url="wss://msg-frontier.feishu.cn/ws/v2?access_key=secret&ticket=secret",
_conn_id="conn_test",
_service_id="service_test",
_lock=lock,
)
client._ws_client = ws_client
await asyncio.wait_for(client._shutdown_ws_client(), timeout=0.2)
return ws_client
ws_client = asyncio.run(_run_shutdown())
assert not ws_client._auto_reconnect
assert ws_client._conn is None
assert ws_client._conn_url == ""
assert ws_client._conn_id == ""
assert ws_client._service_id == ""
def test_consume_ws_task_result_suppresses_stop_exception():
"""停止过程中飞书 SDK 后台任务的异常应被取回并降为调试日志。"""
client = _build_feishu_client()

View File

@@ -25,9 +25,14 @@ def telegram():
bot_instance = MagicMock()
# get_me 用于初始化 bot 用户名,需返回带 username 的对象
bot_instance.get_me.return_value = MagicMock(username="test_bot")
# polling/stop 使用普通函数,避免后台线程执行 MagicMock 时在退出阶段产生锁竞争。
bot_instance.infinity_polling = lambda *args, **kwargs: None
bot_instance.stop_polling = lambda *args, **kwargs: None
mock_telebot_cls.return_value = bot_instance
mock_image_cls.return_value.fetch_image.return_value = b"fake-image-bytes"
yield Telegram(TELEGRAM_TOKEN="fake_token", TELEGRAM_CHAT_ID="fake_chat_id")
telegram = Telegram(TELEGRAM_TOKEN="fake_token", TELEGRAM_CHAT_ID="fake_chat_id")
yield telegram
telegram.stop()
def test_send_msg_success(telegram):

View File

@@ -1,4 +1,5 @@
import asyncio
import threading
import time
import unittest
from types import SimpleNamespace
@@ -13,6 +14,28 @@ from app.modules.telegram.telegram import Telegram
from app.schemas.types import MessageChannel
def _wait_until(predicate, timeout: float = 1.0) -> bool:
"""等待后台线程完成目标状态,避免用例依赖固定 sleep 时长。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(0.01)
return predicate()
class _FakeTelegramBot:
"""记录 typing 调用的轻量 bot避免后台线程与 Mock 内部锁交互。"""
def __init__(self):
self.chat_actions = []
self.action_event = threading.Event()
def send_chat_action(self, chat_id, action):
self.chat_actions.append((chat_id, action))
self.action_event.set()
class TestTelegramTypingLifecycle(unittest.TestCase):
def setUp(self):
self._cleanup_typing_tasks()
@@ -32,7 +55,7 @@ class TestTelegramTypingLifecycle(unittest.TestCase):
@staticmethod
def _telegram_client() -> Telegram:
telegram = Telegram.__new__(Telegram)
telegram._bot = Mock()
telegram._bot = _FakeTelegramBot()
telegram._telegram_token = "token"
telegram._telegram_chat_id = "default-chat"
# 缩短测试中的等待时间,不改变生产默认续发间隔。
@@ -48,10 +71,9 @@ class TestTelegramTypingLifecycle(unittest.TestCase):
max_duration_seconds=1,
initial_delay_seconds=0,
)
time.sleep(0.03)
self.assertIn("chat-1", Telegram._typing_tasks)
self.assertTrue(telegram._bot.send_chat_action.called)
self.assertTrue(telegram._bot.action_event.wait(1.0))
self.assertTrue(telegram.stop_typing(chat_id="chat-1"))
self.assertNotIn("chat-1", Telegram._typing_tasks)
@@ -77,8 +99,8 @@ class TestTelegramTypingLifecycle(unittest.TestCase):
max_duration_seconds=0.02,
initial_delay_seconds=0,
)
time.sleep(0.08)
self.assertTrue(_wait_until(lambda: "chat-3" not in Telegram._typing_tasks))
self.assertNotIn("chat-3", Telegram._typing_tasks)
def test_short_typing_task_can_stop_before_first_chat_action(self):
@@ -95,7 +117,7 @@ class TestTelegramTypingLifecycle(unittest.TestCase):
telegram.stop_typing(chat_id="chat-4")
time.sleep(0.08)
telegram._bot.send_chat_action.assert_not_called()
self.assertEqual(telegram._bot.chat_actions, [])
self.assertNotIn("chat-4", Telegram._typing_tasks)
def test_agent_managed_send_msg_keeps_typing_for_worker_cleanup(self):
@@ -319,19 +341,24 @@ class TestTelegramTypingLifecycle(unittest.TestCase):
"chat_id": "-100",
"metadata": {"kind": "typing"},
}
calls = []
with patch("app.agent.AgentChain") as chain_cls:
chain_cls.return_value.start_message_processing_status.return_value = status
class FakeAgentChain:
def start_message_processing_status(self, **kwargs):
calls.append(kwargs)
return status
with patch("app.agent.AgentChain", FakeAgentChain):
result = await _async_start_processing_status(task)
chain_cls.return_value.start_message_processing_status.assert_called_once_with(
channel=MessageChannel.Telegram,
source="telegram-test",
userid="10001",
message_id="10",
chat_id="-100",
text="第一条",
)
self.assertEqual(calls, [{
"channel": MessageChannel.Telegram,
"source": "telegram-test",
"userid": "10001",
"message_id": "10",
"chat_id": "-100",
"text": "第一条",
}])
self.assertEqual(result, status)
asyncio.run(_run())