From 970f2cf1ca7c2933eef0a88778984886d87bc11c Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sat, 20 Jun 2026 15:20:15 +0800 Subject: [PATCH] fix: clean up feishu websocket shutdown --- app/modules/feishu/feishu.py | 79 ++++++++++++++++++++++++-- tests/test_feishu.py | 7 ++- tests/test_feishu_ws_lifecycle.py | 94 +++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 5 deletions(-) create mode 100644 tests/test_feishu_ws_lifecycle.py diff --git a/app/modules/feishu/feishu.py b/app/modules/feishu/feishu.py index 6da10e05..8996a951 100644 --- a/app/modules/feishu/feishu.py +++ b/app/modules/feishu/feishu.py @@ -5,7 +5,7 @@ import tempfile import threading import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union from urllib.parse import urlparse import lark_oapi as lark @@ -96,6 +96,7 @@ class Feishu: self._stop_event = threading.Event() self._ws_thread: Optional[threading.Thread] = None self._ws_loop: Optional[asyncio.AbstractEventLoop] = None + self._ws_tasks: Set[asyncio.Task] = set() self._user_chat_mapping: Dict[str, str] = {} self._user_receive_id_type_mapping: Dict[str, str] = {} self._chat_open_mapping: Dict[str, str] = {} @@ -161,15 +162,31 @@ class Feishu: original_select = lark_ws_client_module._select original_loop = lark_ws_client_module.loop loop = asyncio.new_event_loop() + original_create_task = loop.create_task self._ws_loop = loop asyncio.set_event_loop(loop) lark_ws_client_module.loop = loop async def _wait_for_stop() -> None: + """等待停止信号,让 SDK 的阻塞 select 可被本地生命周期控制。""" while not self._stop_event.is_set(): await asyncio.sleep(1) + def _create_tracked_task(coro, *args, **kwargs) -> asyncio.Task: + """跟踪 SDK 后台任务,避免关闭时产生未取回的任务异常。""" + task = original_create_task(coro, *args, **kwargs) + coro_name = getattr(coro, "__qualname__", "") + if coro_name in { + "Client._ping_loop", + "Client._receive_message_loop", + "Client._handle_message", + }: + self._ws_tasks.add(task) + task.add_done_callback(self._consume_ws_task_result) + return task + lark_ws_client_module._select = _wait_for_stop + loop.create_task = _create_tracked_task try: self._ws_client = lark.ws.Client( self._app_id, @@ -187,8 +204,11 @@ class Feishu: if not self._stop_event.is_set(): logger.error(f"飞书长连接服务启动失败:{err}") finally: + if not loop.is_closed(): + loop.run_until_complete(self._shutdown_ws_client()) lark_ws_client_module._select = original_select lark_ws_client_module.loop = original_loop + loop.create_task = original_create_task pending_tasks = [ task for task in asyncio.all_tasks(loop) @@ -204,6 +224,57 @@ class Feishu: asyncio.set_event_loop(None) self._ws_loop = None + def _consume_ws_task_result(self, task: asyncio.Task) -> None: + """取回飞书 SDK 后台任务结果,防止 asyncio 在关机时输出未消费异常。""" + self._ws_tasks.discard(task) + if task.cancelled(): + return + try: + err = task.exception() + except asyncio.CancelledError: + return + if not err: + return + if self._stop_event.is_set(): + logger.debug(f"飞书长连接后台任务已随停止退出:{err}") + return + logger.error(f"飞书长连接后台任务异常:{err}") + + async def _shutdown_ws_client(self) -> None: + """在飞书长连接线程内有序取消后台任务并关闭 WebSocket。""" + ws_client = self._ws_client + if ws_client: + ws_client._auto_reconnect = False + current_task = asyncio.current_task() + running_tasks = [ + task + for task in list(self._ws_tasks) + if task is not current_task and not task.done() + ] + for task in running_tasks: + task.cancel() + if running_tasks: + await asyncio.gather(*running_tasks, return_exceptions=True) + if ws_client: + try: + await self._disconnect_ws_client_quietly(ws_client) + except Exception as err: + logger.debug(f"关闭飞书长连接失败:{err}") + + @staticmethod + async def _disconnect_ws_client_quietly(ws_client: lark.ws.Client) -> None: + """静默关闭飞书 WebSocket,避免 SDK 在关机时打印带敏感参数的连接地址。""" + await ws_client._lock.acquire() + try: + if ws_client._conn is not None: + await ws_client._conn.close() + finally: + ws_client._conn = None + ws_client._conn_url = "" + ws_client._conn_id = "" + ws_client._service_id = "" + ws_client._lock.release() + def _forward_to_message_chain(self, payload: dict) -> None: """将飞书入站消息转发到统一消息入口,复用现有交互主链。""" @@ -559,11 +630,11 @@ class Feishu: try: ws_client._auto_reconnect = False if ws_loop and ws_loop.is_running(): - disconnect_future = asyncio.run_coroutine_threadsafe( - ws_client._disconnect(), + shutdown_future = asyncio.run_coroutine_threadsafe( + self._shutdown_ws_client(), ws_loop, ) - disconnect_future.result(timeout=5) + shutdown_future.result(timeout=5) except Exception as err: logger.debug(f"停止飞书客户端失败:{err}") if self._ws_thread and self._ws_thread.is_alive(): diff --git a/tests/test_feishu.py b/tests/test_feishu.py index 14f5eec0..dcbcb8e5 100644 --- a/tests/test_feishu.py +++ b/tests/test_feishu.py @@ -1205,9 +1205,14 @@ class TestFeishu(unittest.TestCase): future = MagicMock() future.result.return_value = None + def _run_threadsafe(coro, loop): + """模拟线程安全调度并关闭测试协程,避免产生未等待告警。""" + coro.close() + return future + with patch( "app.modules.feishu.feishu.asyncio.run_coroutine_threadsafe", - return_value=future, + side_effect=_run_threadsafe, ) as runner: client.stop() diff --git a/tests/test_feishu_ws_lifecycle.py b/tests/test_feishu_ws_lifecycle.py new file mode 100644 index 00000000..b4983de7 --- /dev/null +++ b/tests/test_feishu_ws_lifecycle.py @@ -0,0 +1,94 @@ +import asyncio +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +from app.testing.bootstrap import ensure_optional_stub + +# 可选三方依赖在 CI / 全新环境可能未安装,补占位避免 app.modules.feishu 导入失败 +ensure_optional_stub("psutil") +ensure_optional_stub("dateparser") +ensure_optional_stub("Pinyin2Hanzi", is_pinyin=lambda value: False) + +from app.modules.feishu.feishu import Feishu + + +def _build_feishu_client() -> Feishu: + """构造不会启动真实飞书长连接的测试客户端。""" + with ( + patch.object(Feishu, "_build_api_client", return_value=MagicMock()), + patch.object(Feishu, "_start_ws_client"), + ): + return Feishu( + FEISHU_APP_ID="cli_test_app_id", + FEISHU_APP_SECRET="cli_test_app_secret", + name="feishu-test", + ) + + +async def _wait_forever() -> None: + """模拟飞书 SDK 创建的长生命周期后台任务。""" + await asyncio.Future() + + +def test_shutdown_ws_client_cancels_sdk_tasks_before_quiet_disconnect(): + """飞书关机清理应先消费后台任务,再静默关闭 WebSocket 连接。""" + client = _build_feishu_client() + loop = asyncio.new_event_loop() + closed = False + + async def _close_conn() -> None: + """记录测试连接已被关闭。""" + nonlocal closed + closed = True + + try: + asyncio.set_event_loop(loop) + task = loop.create_task(_wait_forever()) + task.add_done_callback(client._consume_ws_task_result) + client._ws_tasks.add(task) + ws_client = SimpleNamespace( + _auto_reconnect=True, + _conn=SimpleNamespace(close=_close_conn), + _conn_url="wss://msg-frontier.feishu.cn/ws/v2?access_key=secret&ticket=secret", + _conn_id="conn_test", + _service_id="service_test", + _lock=asyncio.Lock(), + ) + client._ws_client = ws_client + + loop.run_until_complete(client._shutdown_ws_client()) + finally: + loop.close() + asyncio.set_event_loop(None) + + assert task.cancelled() + assert closed + assert not ws_client._auto_reconnect + assert ws_client._conn is None + assert ws_client._conn_url == "" + assert ws_client._conn_id == "" + assert ws_client._service_id == "" + assert client._ws_tasks == set() + + +def test_consume_ws_task_result_suppresses_stop_exception(): + """停止过程中飞书 SDK 后台任务的异常应被取回并降为调试日志。""" + client = _build_feishu_client() + loop = asyncio.new_event_loop() + try: + future = loop.create_future() + future.set_exception(RuntimeError("normal shutdown")) + client._ws_tasks.add(future) + client._stop_event.set() + + with ( + patch("app.modules.feishu.feishu.logger.debug") as debug_logger, + patch("app.modules.feishu.feishu.logger.error") as error_logger, + ): + client._consume_ws_task_result(future) + finally: + loop.close() + + debug_logger.assert_called_once() + error_logger.assert_not_called() + assert future not in client._ws_tasks