diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 69233555..08c96f7b 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -22,7 +22,11 @@ from app.agent.callback import StreamingHandler from app.agent.llm import LLMHelper from app.agent.memory import memory_manager from app.agent.middleware.activity_log import ActivityLogMiddleware -from app.agent.middleware.jobs import JobsMiddleware +from app.agent.middleware.jobs import ( + JobsMiddleware, + filter_active_jobs, + load_jobs_metadata, +) from app.agent.middleware.memory import MemoryMiddleware from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware from app.agent.middleware.runtime_config import RuntimeConfigMiddleware @@ -160,6 +164,9 @@ class ReplyMode(str, Enum): CAPTURE_ONLY = "capture_only" +HEARTBEAT_SESSION_PREFIX = "__agent_heartbeat_" + + class MoviePilotAgent: """ MoviePilot AI智能体(基于 LangChain v1 + LangGraph) @@ -288,6 +295,16 @@ class MoviePilotAgent: """ return self.reply_mode == ReplyMode.DISPATCH + @property + def is_heartbeat_session(self) -> bool: + """ + 是否为后台心跳会话。 + + 心跳场景只负责检查并执行待处理 job,不需要携带近期活动日志, + 否则会让这类高频后台调用持续带入无关动态上下文,影响缓存命中率。 + """ + return self.session_id.startswith(HEARTBEAT_SESSION_PREFIX) + def _should_stream(self) -> bool: """ 判断是否应启用流式输出: @@ -430,10 +447,6 @@ class MoviePilotAgent: RuntimeConfigMiddleware(), # 记忆管理 MemoryMiddleware(memory_dir=str(agent_runtime_manager.memory_dir)), - # 活动日志 - ActivityLogMiddleware( - activity_dir=str(agent_runtime_manager.activity_dir), - ), # 上下文压缩 SummarizationMiddleware( model=non_streaming_model, trigger=("fraction", 0.85) @@ -444,6 +457,14 @@ class MoviePilotAgent: UsageMiddleware(on_usage=self._record_usage), ] + if not self.is_heartbeat_session: + middlewares.insert( + 4, + ActivityLogMiddleware( + activity_dir=str(agent_runtime_manager.activity_dir), + ), + ) + # 工具选择 if max_tools > 0: middlewares.append( @@ -1085,8 +1106,17 @@ class AgentManager: 由定时调度器周期性调用,每次使用独立的会话避免上下文干扰。 """ try: + active_jobs = filter_active_jobs( + await load_jobs_metadata([str(agent_runtime_manager.jobs_dir)]) + ) + # 先在本地判断是否存在活跃任务。没有任务时直接短路,避免一次完整 + # 的后台 Agent/LLM 空调用。 + if not active_jobs: + logger.info("智能体心跳唤醒:没有活跃任务,跳过模型调用") + return + # 每次使用唯一的 session_id,避免共享上下文 - session_id = f"__agent_heartbeat_{uuid.uuid4().hex[:12]}__" + session_id = f"{HEARTBEAT_SESSION_PREFIX}{uuid.uuid4().hex[:12]}__" user_id = SYSTEM_INTERNAL_USER_ID logger.info("智能体心跳唤醒:开始检查待处理任务...") diff --git a/app/agent/middleware/jobs.py b/app/agent/middleware/jobs.py index ea12112f..2ceff3e2 100644 --- a/app/agent/middleware/jobs.py +++ b/app/agent/middleware/jobs.py @@ -21,6 +21,7 @@ from app.log import logger # JOB.md 文件最大限制为 1MB MAX_JOB_FILE_SIZE = 1 * 1024 * 1024 +ACTIVE_JOB_STATUSES = ("pending", "in_progress") class JobMetadata(TypedDict): @@ -143,6 +144,9 @@ async def _alist_jobs(source_path: AsyncPath) -> list[JobMetadata]: if not job_dirs: return [] + # 显式按目录名排序,避免文件系统返回顺序不稳定时破坏提示词缓存命中。 + job_dirs.sort(key=lambda p: p.name.casefold()) + # 解析 JOB.md for job_path in job_dirs: job_md_path = job_path / "JOB.md" @@ -161,6 +165,31 @@ async def _alist_jobs(source_path: AsyncPath) -> list[JobMetadata]: return jobs +def filter_active_jobs(jobs_metadata: list[JobMetadata]) -> list[JobMetadata]: + """筛选需要参与心跳检查的活跃任务。 + + 这里严格以任务状态为准,只保留 `pending` / `in_progress`。 + `recurring` 任务执行完成后按约定应回写为 `pending`,因此无需再额外放宽 + 到 `completed`,避免已结束任务被重复注入后台心跳。 + """ + return [ + job for job in jobs_metadata if job.get("status") in ACTIVE_JOB_STATUSES + ] + + +async def load_jobs_metadata(source_paths: list[str]) -> list[JobMetadata]: + """按顺序加载多个 jobs 目录下的任务元数据。""" + all_jobs: list[JobMetadata] = [] + for source_path_str in source_paths: + source_path = AsyncPath(source_path_str) + if not await source_path.exists(): + await source_path.mkdir(parents=True, exist_ok=True) + continue + source_jobs = await _alist_jobs(source_path) + all_jobs.extend(source_jobs) + return all_jobs + + JOBS_SYSTEM_PROMPT = """ You have a **scheduled jobs** system that allows you to track and execute long-running or recurring tasks. @@ -289,13 +318,8 @@ class JobsMiddleware(AgentMiddleware[JobsState, ContextT, ResponseT]): # noqa """将任务文档注入模型请求的系统消息中。""" jobs_metadata = request.state.get("jobs_metadata", []) # noqa - # 过滤:只展示活跃任务(pending / in_progress / recurring) - active_jobs = [ - j - for j in jobs_metadata - if j["status"] in ("pending", "in_progress") - or (j["schedule"] == "recurring" and j["status"] not in ("cancelled",)) - ] + # 仅注入真正活跃的任务,避免把已完成任务继续塞进心跳上下文。 + active_jobs = filter_active_jobs(jobs_metadata) jobs_list = self._format_jobs_list(active_jobs) jobs_location = self.sources[0] if self.sources else "" @@ -322,18 +346,9 @@ class JobsMiddleware(AgentMiddleware[JobsState, ContextT, ResponseT]): # noqa if "jobs_metadata" in state: return None - all_jobs: list[JobMetadata] = [] - - # 遍历源加载任务 - for source_path_str in self.sources: - source_path = AsyncPath(source_path_str) - if not await source_path.exists(): - await source_path.mkdir(parents=True, exist_ok=True) - continue - source_jobs = await _alist_jobs(source_path) - all_jobs.extend(source_jobs) - - return JobsStateUpdate(jobs_metadata=all_jobs) + return JobsStateUpdate( + jobs_metadata=await load_jobs_metadata(self.sources) + ) async def awrap_model_call( self, @@ -347,4 +362,10 @@ class JobsMiddleware(AgentMiddleware[JobsState, ContextT, ResponseT]): # noqa return await handler(modified_request) -__all__ = ["JobMetadata", "JobsMiddleware"] +__all__ = [ + "ACTIVE_JOB_STATUSES", + "JobMetadata", + "JobsMiddleware", + "filter_active_jobs", + "load_jobs_metadata", +] diff --git a/app/agent/middleware/skills.py b/app/agent/middleware/skills.py index 2a24325e..f1b684ce 100644 --- a/app/agent/middleware/skills.py +++ b/app/agent/middleware/skills.py @@ -227,6 +227,9 @@ async def _alist_skills(source_path: AsyncPath) -> list[SkillMetadata]: if not skill_dirs: return [] + # 显式按目录名排序,避免文件系统返回顺序不稳定时破坏提示词缓存命中。 + skill_dirs.sort(key=lambda p: p.name.casefold()) + # 解析已下载的 SKILL.md for skill_path in skill_dirs: skill_md_path = skill_path / "SKILL.md" diff --git a/app/agent/prompt/__init__.py b/app/agent/prompt/__init__.py index e90089b8..75161dd6 100644 --- a/app/agent/prompt/__init__.py +++ b/app/agent/prompt/__init__.py @@ -286,8 +286,10 @@ class PromptManager: f"{settings.DB_POSTGRESQL_TARGET}/{settings.DB_POSTGRESQL_DATABASE})" ) + # 保留日期用于提供“今天是哪天”的稳定上下文,但不再注入秒级时间, + # 避免每次请求都生成不同的 system prompt,影响 provider 侧 cache 命中率。 info_lines = [ - f"- 当前时间: {strftime('%Y-%m-%d %H:%M:%S')}", + f"- 当前日期: {strftime('%Y-%m-%d')}", f"- 运行环境: {SystemUtils.platform} {'docker' if SystemUtils.is_docker() else ''}", f"- 主机名: {hostname}", f"- IP地址: {ip_address}", @@ -426,11 +428,11 @@ class PromptManager: return text context = cls._normalize_template_context(template_context) - missing_fields = sorted(field for field in required_fields if field not in context) + missing_fields = sorted(f for f in required_fields if f not in context) if missing_fields: raise PromptConfigError( f"系统任务定义 `{task_type}` 的 `{field_name}` 缺少变量: " - + ", ".join(f"`{field}`" for field in missing_fields) + + ", ".join(f"`{f}`" for f in missing_fields) ) # 这里统一做字符串替换,让 YAML 成为后台任务文案的唯一行为来源。 diff --git a/tests/test_agent_background_output.py b/tests/test_agent_background_output.py index a32f7cb3..29d4bdb4 100644 --- a/tests/test_agent_background_output.py +++ b/tests/test_agent_background_output.py @@ -4,8 +4,14 @@ from unittest.mock import AsyncMock, patch from langchain_core.messages import AIMessage -from app.agent import MoviePilotAgent, AgentManager, ReplyMode +from app.agent import ( + HEARTBEAT_SESSION_PREFIX, + MoviePilotAgent, + AgentManager, + ReplyMode, +) from app.agent.memory import memory_manager +from app.core.config import settings from app.utils.identity import SYSTEM_INTERNAL_USER_ID @@ -38,8 +44,8 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): stop_streaming=AsyncMock(return_value=(False, "")) ) agent._should_stream = lambda: False - agent._create_agent = lambda streaming=False: _FakeAgent( - [AIMessage(content="后台结果")] + agent._create_agent = AsyncMock( + return_value=_FakeAgent([AIMessage(content="后台结果")]) ) agent.send_agent_message = AsyncMock() agent._save_agent_message_to_db = AsyncMock() @@ -66,8 +72,8 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): stop_streaming=AsyncMock(return_value=(False, "")) ) agent._should_stream = lambda: False - agent._create_agent = lambda streaming=False: _FakeAgent( - [AIMessage(content="后台结果")] + agent._create_agent = AsyncMock( + return_value=_FakeAgent([AIMessage(content="后台结果")]) ) agent.send_agent_message = AsyncMock() agent._save_agent_message_to_db = AsyncMock() @@ -94,8 +100,8 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): stop_streaming=AsyncMock(return_value=(False, "")) ) agent._should_stream = lambda: False - agent._create_agent = lambda streaming=False: _FakeAgent( - [AIMessage(content="后台结果")] + agent._create_agent = AsyncMock( + return_value=_FakeAgent([AIMessage(content="后台结果")]) ) agent.send_agent_message = AsyncMock() agent._save_agent_message_to_db = AsyncMock() @@ -114,6 +120,15 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): manager = AgentManager() with ( + patch("app.agent.load_jobs_metadata", new=AsyncMock(return_value=[{ + "id": "job-1", + "name": "测试任务", + "description": "desc", + "path": "/tmp/job-1/JOB.md", + "schedule": "once", + "status": "pending", + "last_run": None, + }])), patch.object(manager, "_build_heartbeat_prompt", return_value="HEARTBEAT"), patch.object(manager, "process_message", new=AsyncMock()) as process_message, ): @@ -125,6 +140,80 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): process_message.await_args.kwargs["reply_mode"], ) + async def test_heartbeat_check_jobs_skips_when_no_active_jobs(self): + manager = AgentManager() + + with ( + patch("app.agent.load_jobs_metadata", new=AsyncMock(return_value=[])), + patch.object(manager, "process_message", new=AsyncMock()) as process_message, + ): + await manager.heartbeat_check_jobs() + + process_message.assert_not_awaited() + + async def test_create_agent_excludes_activity_log_for_heartbeat_session(self): + agent = MoviePilotAgent( + session_id=f"{HEARTBEAT_SESSION_PREFIX}test__", + user_id="system", + ) + agent._initialize_tools = lambda: [] + + with ( + patch.object(settings, "LLM_MAX_TOOLS", 0), + patch.object(agent, "_initialize_llm", new=AsyncMock(return_value=object())), + patch("app.agent.prompt_manager.get_agent_prompt", return_value="PROMPT"), + patch( + "app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names", + return_value=[], + ), + patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"), + patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"), + patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"), + patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"), + patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"), + patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"), + patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"), + patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"), + patch("app.agent.InMemorySaver", return_value="checkpointer"), + patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs), + ): + created = await agent._create_agent(streaming=False) + + self.assertEqual( + ["skills", "jobs", "runtime", "memory", "summary", "patch", "usage"], + created["middleware"], + ) + + async def test_create_agent_keeps_activity_log_for_normal_session(self): + agent = MoviePilotAgent(session_id="normal-session", user_id="system") + agent._initialize_tools = lambda: [] + + with ( + patch.object(settings, "LLM_MAX_TOOLS", 0), + patch.object(agent, "_initialize_llm", new=AsyncMock(return_value=object())), + patch("app.agent.prompt_manager.get_agent_prompt", return_value="PROMPT"), + patch( + "app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names", + return_value=[], + ), + patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"), + patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"), + patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"), + patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"), + patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"), + patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"), + patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"), + patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"), + patch("app.agent.InMemorySaver", return_value="checkpointer"), + patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs), + ): + created = await agent._create_agent(streaming=False) + + self.assertEqual( + ["skills", "jobs", "runtime", "memory", "activity", "summary", "patch", "usage"], + created["middleware"], + ) + async def test_run_background_prompt_forces_disable_message_tools_when_capture_only(self): captured = {} diff --git a/tests/test_agent_jobs_middleware.py b/tests/test_agent_jobs_middleware.py new file mode 100644 index 00000000..0450f6e5 --- /dev/null +++ b/tests/test_agent_jobs_middleware.py @@ -0,0 +1,82 @@ +import tempfile +import unittest +from pathlib import Path + +from anyio import Path as AsyncPath + +from app.agent.middleware.jobs import _alist_jobs, filter_active_jobs + + +class JobsMiddlewareTest(unittest.TestCase): + def test_filter_active_jobs_only_keeps_pending_and_in_progress(self): + jobs_metadata = [ + { + "id": "pending-job", + "name": "待执行任务", + "description": "desc", + "path": "/tmp/pending/JOB.md", + "schedule": "once", + "status": "pending", + "last_run": None, + }, + { + "id": "running-job", + "name": "执行中任务", + "description": "desc", + "path": "/tmp/running/JOB.md", + "schedule": "recurring", + "status": "in_progress", + "last_run": "2026-05-10 10:00", + }, + { + "id": "completed-recurring-job", + "name": "已完成循环任务", + "description": "desc", + "path": "/tmp/completed/JOB.md", + "schedule": "recurring", + "status": "completed", + "last_run": "2026-05-10 11:00", + }, + { + "id": "cancelled-job", + "name": "已取消任务", + "description": "desc", + "path": "/tmp/cancelled/JOB.md", + "schedule": "once", + "status": "cancelled", + "last_run": None, + }, + ] + + active_job_ids = [job["id"] for job in filter_active_jobs(jobs_metadata)] + + self.assertEqual(["pending-job", "running-job"], active_job_ids) + + +class JobsMiddlewareAsyncTest(unittest.IsolatedAsyncioTestCase): + async def test_alist_jobs_sorts_job_directories_by_name(self): + with tempfile.TemporaryDirectory() as tempdir: + root = Path(tempdir) + + for job_id in ("z-job", "a-job", "m-job"): + job_dir = root / job_id + job_dir.mkdir() + (job_dir / "JOB.md").write_text( + f"""--- +name: {job_id} +description: test +schedule: once +status: pending +--- +# {job_id} +""", + encoding="utf-8", + ) + + jobs = await _alist_jobs(AsyncPath(str(root))) + + self.assertEqual(["a-job", "m-job", "z-job"], [job["id"] for job in jobs]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_agent_prompt_style.py b/tests/test_agent_prompt_style.py index c1b1d0b0..fe0afa0a 100644 --- a/tests/test_agent_prompt_style.py +++ b/tests/test_agent_prompt_style.py @@ -35,6 +35,8 @@ class TestAgentPromptStyle(unittest.TestCase): "Do not let user memory or persona style override this core identity", prompt, ) + self.assertIn("当前日期", prompt) + self.assertNotIn("当前时间", prompt) def test_runtime_config_middleware_injects_persona_only(self): middleware = RuntimeConfigMiddleware() diff --git a/tests/test_agent_skills_middleware.py b/tests/test_agent_skills_middleware.py new file mode 100644 index 00000000..cd8761aa --- /dev/null +++ b/tests/test_agent_skills_middleware.py @@ -0,0 +1,37 @@ +import tempfile +import unittest +from pathlib import Path + +from anyio import Path as AsyncPath + +from app.agent.middleware.skills import _alist_skills + + +class SkillsMiddlewareAsyncTest(unittest.IsolatedAsyncioTestCase): + async def test_alist_skills_sorts_skill_directories_by_name(self): + with tempfile.TemporaryDirectory() as tempdir: + root = Path(tempdir) + + for skill_id in ("z-skill", "a-skill", "m-skill"): + skill_dir = root / skill_id + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + f"""--- +name: {skill_id} +description: test +--- +# {skill_id} +""", + encoding="utf-8", + ) + + skills = await _alist_skills(AsyncPath(str(root))) + + self.assertEqual( + ["a-skill", "m-skill", "z-skill"], + [skill["id"] for skill in skills], + ) + + +if __name__ == "__main__": + unittest.main()