From 94c75eb1c75a2134f05d47b21dd1a71df3ee18c1 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 25 Mar 2026 13:02:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=99=BA=E8=83=BD=E4=BD=93=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1(Jobs)=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=92=8C=E5=BF=83=E8=B7=B3=E5=94=A4=E9=86=92=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 JobsMiddleware 中间件,支持通过 JOB.md 文件管理长期/重复性任务 - 智能体可创建一次性(once)和重复性(recurring)任务,自动跟踪执行状态 - 新增心跳唤醒机制,定时调度器周期性唤醒智能体检查并执行待处理任务 - 新增 AI_AGENT_JOB_INTERVAL 配置项控制检查间隔,默认24小时 - 每次心跳使用独立会话,执行完毕后清理资源 --- app/agent/__init__.py | 46 +++++ app/agent/middleware/jobs.py | 350 +++++++++++++++++++++++++++++++++++ app/core/config.py | 2 + app/scheduler.py | 296 +++++++++++++++-------------- 4 files changed, 558 insertions(+), 136 deletions(-) create mode 100644 app/agent/middleware/jobs.py diff --git a/app/agent/__init__.py b/app/agent/__init__.py index d2369380..49bfe2ad 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -1,5 +1,6 @@ import asyncio import traceback +import uuid from time import strftime from typing import Dict, List @@ -16,6 +17,7 @@ from langgraph.checkpoint.memory import InMemorySaver from app.agent.callback import StreamingHandler from app.agent.memory import memory_manager +from app.agent.middleware.jobs import JobsMiddleware from app.agent.middleware.memory import MemoryMiddleware from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware from app.agent.middleware.skills import SkillsMiddleware @@ -96,6 +98,10 @@ class MoviePilotAgent: SkillsMiddleware( sources=[str(settings.CONFIG_PATH / "agent" / "skills")], ), + # Jobs 任务管理 + JobsMiddleware( + sources=[str(settings.CONFIG_PATH / "agent" / "jobs")], + ), # 记忆管理 MemoryMiddleware( sources=[str(settings.CONFIG_PATH / "agent" / "MEMORY.md")] @@ -340,6 +346,46 @@ class AgentManager: memory_manager.clear_memory(session_id, user_id) logger.info(f"会话 {session_id} 的记忆已清空") + async def heartbeat_check_jobs(self): + """ + 心跳唤醒:检查并执行待处理的定时任务(Jobs)。 + 由定时调度器周期性调用,每次使用独立的会话避免上下文干扰。 + """ + try: + # 每次使用唯一的 session_id,避免共享上下文 + session_id = f"__agent_heartbeat_{uuid.uuid4().hex[:12]}__" + user_id = settings.SUPERUSER + + logger.info("智能体心跳唤醒:开始检查待处理任务...") + + # 英文提示词,便于大模型理解 + heartbeat_message = ( + "[System Heartbeat Wake-up] Please check all jobs in your jobs directory and process pending tasks:\n" + "1. List all jobs with status 'pending' or 'in_progress'\n" + "2. For 'recurring' jobs, check the 'last_run' timestamp to determine if it's time to run again\n" + "3. For 'once' jobs with status 'pending', execute them now\n" + "4. After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file\n" + "5. If there are no pending jobs, simply respond with a brief summary\n" + "IMPORTANT: Respond in Chinese (中文). Begin checking and processing jobs now." + ) + + await self.process_message( + session_id=session_id, + user_id=user_id, + message=heartbeat_message, + channel=None, + source=None, + username=settings.SUPERUSER, + ) + + logger.info("智能体心跳唤醒:任务检查完成") + + # 心跳会话用完即弃,清理资源 + await self.clear_session(session_id, user_id) + + except Exception as e: + logger.error(f"智能体心跳唤醒失败: {e}") + # 全局智能体管理器实例 agent_manager = AgentManager() diff --git a/app/agent/middleware/jobs.py b/app/agent/middleware/jobs.py new file mode 100644 index 00000000..ea12112f --- /dev/null +++ b/app/agent/middleware/jobs.py @@ -0,0 +1,350 @@ +import re +from collections.abc import Awaitable, Callable +from typing import Annotated, NotRequired, TypedDict + +import yaml # noqa +from anyio import Path as AsyncPath +from langchain.agents.middleware.types import ( + AgentMiddleware, + AgentState, + ContextT, + ModelRequest, + ModelResponse, + PrivateStateAttr, # noqa + ResponseT, +) +from langchain_core.runnables import RunnableConfig +from langgraph.runtime import Runtime + +from app.agent.middleware.utils import append_to_system_message +from app.log import logger + +# JOB.md 文件最大限制为 1MB +MAX_JOB_FILE_SIZE = 1 * 1024 * 1024 + + +class JobMetadata(TypedDict): + """Job 元数据。""" + + path: str + """JOB.md 文件路径。""" + + id: str + """Job 标识符(目录名)。""" + + name: str + """Job 名称。""" + + description: str + """Job 描述。""" + + schedule: str + """调度类型: once(一次性)/ recurring(重复性)。""" + + status: str + """当前状态: pending / in_progress / completed / cancelled。""" + + last_run: str | None + """上次执行时间。""" + + +class JobsState(AgentState): + """jobs 中间件状态。""" + + jobs_metadata: NotRequired[Annotated[list[JobMetadata], PrivateStateAttr]] + """已加载的 job 元数据列表,不传播给父 agent。""" + + +class JobsStateUpdate(TypedDict): + """jobs 中间件状态更新项。""" + + jobs_metadata: list[JobMetadata] + """待合并的 job 元数据列表。""" + + +def _parse_job_metadata( + content: str, + job_path: str, + job_id: str, +) -> JobMetadata | None: + """从 JOB.md 内容中解析 YAML 前言并验证元数据。""" + if len(content) > MAX_JOB_FILE_SIZE: + logger.warning( + "Skipping %s: content too large (%d bytes)", job_path, len(content) + ) + return None + + # 匹配 --- 分隔的 YAML 前言 + frontmatter_pattern = r"^---\s*\n(.*?)\n---\s*\n" + match = re.match(frontmatter_pattern, content, re.DOTALL) + if not match: + logger.warning("Skipping %s: no valid YAML frontmatter found", job_path) + return None + frontmatter_str = match.group(1) + + # 解析 YAML + try: + frontmatter_data = yaml.safe_load(frontmatter_str) + except yaml.YAMLError as e: + logger.warning("Invalid YAML in %s: %s", job_path, e) + return None + + if not isinstance(frontmatter_data, dict): + logger.warning("Skipping %s: frontmatter is not a mapping", job_path) + return None + + # Job 名称和描述 + name = str(frontmatter_data.get("name", "")).strip() + description = str(frontmatter_data.get("description", "")).strip() + if not name: + logger.warning("Skipping %s: missing required 'name'", job_path) + return None + + # 调度类型 + schedule = str(frontmatter_data.get("schedule", "once")).strip().lower() + if schedule not in ("once", "recurring"): + schedule = "once" + + # 状态 + status = str(frontmatter_data.get("status", "pending")).strip().lower() + if status not in ("pending", "in_progress", "completed", "cancelled"): + status = "pending" + + # 上次执行时间 + last_run = str(frontmatter_data.get("last_run", "")).strip() or None + + return JobMetadata( + id=job_id, + name=name, + description=description, + path=job_path, + schedule=schedule, + status=status, + last_run=last_run, + ) + + +async def _alist_jobs(source_path: AsyncPath) -> list[JobMetadata]: + """异步列出指定路径下的所有任务。 + + 扫描包含 JOB.md 的目录并解析其元数据。 + """ + jobs: list[JobMetadata] = [] + + if not await source_path.exists(): + return [] + + # 查找所有任务目录(包含 JOB.md 的目录) + job_dirs: list[AsyncPath] = [] + async for path in source_path.iterdir(): + if await path.is_dir() and await (path / "JOB.md").is_file(): + job_dirs.append(path) + + if not job_dirs: + return [] + + # 解析 JOB.md + for job_path in job_dirs: + job_md_path = job_path / "JOB.md" + + job_content = await job_md_path.read_text(encoding="utf-8") + + # 解析元数据 + job_metadata = _parse_job_metadata( + content=job_content, + job_path=str(job_md_path), + job_id=job_path.name, + ) + if job_metadata: + jobs.append(job_metadata) + + return jobs + + +JOBS_SYSTEM_PROMPT = """ + +You have a **scheduled jobs** system that allows you to track and execute long-running or recurring tasks. + +**Jobs Location:** `{jobs_location}` + +**Current Jobs:** + +{jobs_list} + +**Job File Format:** + +Each job is a directory containing a `JOB.md` file with YAML frontmatter followed by task details: + +```markdown +--- +name: 任务名称(简短中文描述) +description: 任务的详细描述,说明要做什么 +schedule: once 或 recurring +status: pending / in_progress / completed / cancelled +last_run: "YYYY-MM-DD HH:MM"(上次执行时间,可选) +--- +# 任务详情 + +## 目标 +详细描述这个任务要完成的目标。 + +## 执行日志 +记录每次执行的情况和结果。 + +- **2024-01-15 10:00** - 执行了XXX操作,结果:成功/失败 +- **2024-01-16 10:00** - 继续执行XXX... +``` + +**Job Lifecycle Rules:** + +1. **Creating a Job**: When a user asks you to do something periodically or at a later time: + - Create a new directory under the jobs location, directory name is the `job-id` (lowercase, hyphens, 1-64 chars) + - Write a `JOB.md` file with proper frontmatter and detailed task description + - Set `schedule: once` for one-time tasks, `schedule: recurring` for repeating tasks (e.g., daily sign-in, weekly checks) + - Set initial `status: pending` + +2. **Executing a Job**: When you work on a job: + - Update `status: in_progress` in the frontmatter + - Execute the required actions using your tools + - Log the execution result in the "执行日志" section with timestamp + - Update `last_run` in frontmatter to current time + +3. **Completing a Job**: + - For `schedule: once` tasks: set `status: completed` after successful execution + - For `schedule: recurring` tasks: keep `status: pending` after execution, only update `last_run` time. The job stays active for the next scheduled run. + - Set `status: cancelled` if the user explicitly asks to cancel/stop a task + +4. **Heartbeat Check**: You will be periodically woken up to check pending jobs. When woken up: + - Read the jobs directory to find all active jobs (status: pending or in_progress) + - Skip jobs with `status: completed` or `status: cancelled` + - For `schedule: recurring` jobs, check `last_run` to determine if it's time to run again + - Execute pending jobs and update their status/logs accordingly + +**Important Notes:** +- Each job MUST have its own separate directory and JOB.md file to avoid conflicts +- Always update the frontmatter fields (status, last_run) when executing a job +- Keep execution logs concise but informative +- For recurring jobs, maintain a rolling log (keep recent entries, you can summarize/remove old entries to keep the file manageable) +- When creating jobs, make the description detailed enough that you can understand and execute the task in future sessions without additional context + +**When to Create Jobs:** +- User says "每天帮我..." / "定期..." / "定时..." / "提醒我..." / "以后每次..." +- User requests a task that should be done repeatedly +- User asks for monitoring or periodic checking of something + +**When NOT to Create Jobs:** +- User asks for an immediate one-time action (just do it now) +- Simple questions or conversations +- Tasks that are already handled by MoviePilot's built-in scheduler services + +""" + + +class JobsMiddleware(AgentMiddleware[JobsState, ContextT, ResponseT]): # noqa + """加载并向系统提示词注入 Agent Jobs 的中间件。 + + 扫描 jobs 目录下的 JOB.md 文件,解析元数据并注入到系统提示词中, + 使智能体了解当前的长期任务及其状态。 + """ + + state_schema = JobsState + + def __init__(self, *, sources: list[str]) -> None: + """初始化 Jobs 中间件。""" + self.sources = sources + self.system_prompt_template = JOBS_SYSTEM_PROMPT + + @staticmethod + def _format_jobs_list(jobs: list[JobMetadata]) -> str: + """格式化任务元数据列表用于系统提示词。""" + if not jobs: + return "(No active jobs. You can create jobs when users request periodic or scheduled tasks.)" + + lines = [] + for job in jobs: + status_emoji = { + "pending": "⏳", + "in_progress": "🔄", + "completed": "✅", + "cancelled": "❌", + }.get(job["status"], "❓") + + schedule_label = ( + "recurring (重复)" + if job["schedule"] == "recurring" + else "once (一次性)" + ) + desc_line = ( + f"- {status_emoji} **{job['id']}**: {job['name']}" + f" [{schedule_label}] - {job['description']}" + ) + if job.get("last_run"): + desc_line += f" (上次执行: {job['last_run']})" + lines.append(desc_line) + lines.append(f" -> Read `{job['path']}` for full details") + + return "\n".join(lines) + + def modify_request(self, request: ModelRequest[ContextT]) -> ModelRequest[ContextT]: + """将任务文档注入模型请求的系统消息中。""" + jobs_metadata = request.state.get("jobs_metadata", []) # noqa + + # 过滤:只展示活跃任务(pending / in_progress / recurring) + active_jobs = [ + j + for j in jobs_metadata + if j["status"] in ("pending", "in_progress") + or (j["schedule"] == "recurring" and j["status"] not in ("cancelled",)) + ] + + jobs_list = self._format_jobs_list(active_jobs) + jobs_location = self.sources[0] if self.sources else "" + + jobs_section = self.system_prompt_template.format( + jobs_location=jobs_location, + jobs_list=jobs_list, + ) + + new_system_message = append_to_system_message( + request.system_message, jobs_section + ) + + return request.override(system_message=new_system_message) + + async def abefore_agent( # noqa + self, state: JobsState, runtime: Runtime, config: RunnableConfig + ) -> JobsStateUpdate | None: + """在 Agent 执行前异步加载任务元数据。 + + 每个会话仅加载一次。若 state 中已有则跳过。 + """ + # 如果 state 中已存在元数据则跳过 + if "jobs_metadata" in state: + return None + + all_jobs: list[JobMetadata] = [] + + # 遍历源加载任务 + for source_path_str in self.sources: + source_path = AsyncPath(source_path_str) + if not await source_path.exists(): + await source_path.mkdir(parents=True, exist_ok=True) + continue + source_jobs = await _alist_jobs(source_path) + all_jobs.extend(source_jobs) + + return JobsStateUpdate(jobs_metadata=all_jobs) + + async def awrap_model_call( + self, + request: ModelRequest[ContextT], + handler: Callable[ + [ModelRequest[ContextT]], Awaitable[ModelResponse[ResponseT]] + ], + ) -> ModelResponse[ResponseT]: + """在模型调用时注入任务文档。""" + modified_request = self.modify_request(request) + return await handler(modified_request) + + +__all__ = ["JobMetadata", "JobsMiddleware"] diff --git a/app/core/config.py b/app/core/config.py index 0d6a237f..451552b5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -529,6 +529,8 @@ class ConfigModel(BaseModel): AI_RECOMMEND_MAX_ITEMS: int = 50 # LLM工具选择中间件最大工具数量,0为不启用工具选择中间件 LLM_MAX_TOOLS: int = 0 + # AI智能体定时任务检查间隔(小时),0为不启用,默认24小时 + AI_AGENT_JOB_INTERVAL: int = 24 class Settings(BaseSettings, ConfigModel, LogConfigModel): diff --git a/app/scheduler.py b/app/scheduler.py index f6729f47..48e86461 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -47,6 +47,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): """ 定时任务管理 """ + CONFIG_WATCH = { "DEV", "COOKIECLOUD_INTERVAL", @@ -56,6 +57,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): "SUBSCRIBE_MODE", "SUBSCRIBE_RSS_INTERVAL", "SITEDATA_REFRESH_INTERVAL", + "AI_AGENT_ENABLE", + "AI_AGENT_JOB_INTERVAL", } def __init__(self): @@ -98,133 +101,134 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): "cookiecloud": { "name": "同步CookieCloud站点", "func": SiteChain().sync_cookies, - "running": False + "running": False, }, "mediaserver_sync": { "name": "同步媒体服务器", "func": MediaServerChain().sync, - "running": False + "running": False, }, "subscribe_tmdb": { "name": "订阅元数据更新", "func": SubscribeChain().check, - "running": False + "running": False, }, "subscribe_search": { "name": "订阅搜索补全", "func": SubscribeChain().search, "running": False, - "kwargs": { - "state": "R" - } + "kwargs": {"state": "R"}, }, "new_subscribe_search": { "name": "新增订阅搜索", "func": SubscribeChain().search, "running": False, - "kwargs": { - "state": "N" - } + "kwargs": {"state": "N"}, }, "subscribe_refresh": { "name": "订阅刷新", "func": SubscribeChain().refresh, - "running": False + "running": False, }, "subscribe_follow": { "name": "关注的订阅分享", "func": SubscribeChain().follow, - "running": False + "running": False, }, "transfer": { "name": "下载文件整理", "func": TransferChain().process, - "running": False + "running": False, }, "clear_cache": { "name": "缓存清理", "func": self.clear_cache, - "running": False + "running": False, }, "user_auth": { "name": "用户认证检查", "func": self.user_auth, - "running": False + "running": False, }, "scheduler_job": { "name": "公共定时服务", "func": SchedulerChain().scheduler_job, - "running": False + "running": False, }, "random_wallpager": { "name": "壁纸缓存", "func": WallpaperHelper().get_wallpapers, - "running": False + "running": False, }, "sitedata_refresh": { "name": "站点数据刷新", "func": SiteChain().refresh_userdatas, - "running": False + "running": False, }, "recommend_refresh": { "name": "推荐缓存", "func": RecommendChain().refresh_recommend, - "running": False + "running": False, }, "plugin_market_refresh": { "name": "插件市场缓存", "func": PluginManager().async_get_online_plugins, "running": False, - "kwargs": { - "force": True - } + "kwargs": {"force": True}, }, "subscribe_calendar_cache": { "name": "订阅日历缓存", "func": SubscribeChain().cache_calendar, - "running": False + "running": False, }, "full_gc": { "name": "主动内存回收", "func": self.full_gc, - "running": False - } + "running": False, + }, + "agent_heartbeat": { + "name": "智能体定时任务", + "func": self.agent_heartbeat, + "running": False, + }, } # 创建定时服务 - self._scheduler = BackgroundScheduler(timezone=settings.TZ, - executors={ - 'default': ThreadPoolExecutor(settings.CONF.scheduler) - }) + self._scheduler = BackgroundScheduler( + timezone=settings.TZ, + executors={"default": ThreadPoolExecutor(settings.CONF.scheduler)}, + ) # CookieCloud定时同步 - if settings.COOKIECLOUD_INTERVAL \ - and str(settings.COOKIECLOUD_INTERVAL).isdigit(): + if ( + settings.COOKIECLOUD_INTERVAL + and str(settings.COOKIECLOUD_INTERVAL).isdigit() + ): self._scheduler.add_job( self.start, "interval", id="cookiecloud", name="同步CookieCloud站点", minutes=int(settings.COOKIECLOUD_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), - kwargs={ - 'job_id': 'cookiecloud' - } + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + + timedelta(minutes=5), + kwargs={"job_id": "cookiecloud"}, ) # 媒体服务器同步 - if settings.MEDIASERVER_SYNC_INTERVAL \ - and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit(): + if ( + settings.MEDIASERVER_SYNC_INTERVAL + and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit() + ): self._scheduler.add_job( self.start, "interval", id="mediaserver_sync", name="同步媒体服务器", hours=int(settings.MEDIASERVER_SYNC_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10), - kwargs={ - 'job_id': 'mediaserver_sync' - } + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + + timedelta(minutes=10), + kwargs={"job_id": "mediaserver_sync"}, ) # 新增订阅时搜索(5分钟检查一次) @@ -234,9 +238,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="new_subscribe_search", name="新增订阅搜索", minutes=5, - kwargs={ - 'job_id': 'new_subscribe_search' - } + kwargs={"job_id": "new_subscribe_search"}, ) # 检查更新订阅TMDB数据(每隔6小时) @@ -246,9 +248,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_tmdb", name="订阅元数据更新", hours=6, - kwargs={ - 'job_id': 'subscribe_tmdb' - } + kwargs={"job_id": "subscribe_tmdb"}, ) # 订阅状态每隔24小时搜索一次 @@ -259,9 +259,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_search", name="订阅搜索补全", hours=settings.SUBSCRIBE_SEARCH_INTERVAL, - kwargs={ - 'job_id': 'subscribe_search' - } + kwargs={"job_id": "subscribe_search"}, ) if settings.SUBSCRIBE_MODE == "spider": @@ -275,13 +273,14 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="订阅刷新", hour=trigger.hour, minute=trigger.minute, - kwargs={ - 'job_id': 'subscribe_refresh' - }) + kwargs={"job_id": "subscribe_refresh"}, + ) else: # RSS订阅模式 - if not settings.SUBSCRIBE_RSS_INTERVAL \ - or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit(): + if ( + not settings.SUBSCRIBE_RSS_INTERVAL + or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit() + ): settings.SUBSCRIBE_RSS_INTERVAL = 30 elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5: settings.SUBSCRIBE_RSS_INTERVAL = 5 @@ -291,9 +290,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_refresh", name="RSS订阅刷新", minutes=int(settings.SUBSCRIBE_RSS_INTERVAL), - kwargs={ - 'job_id': 'subscribe_refresh' - } + kwargs={"job_id": "subscribe_refresh"}, ) # 关注订阅分享(每1小时) @@ -303,9 +300,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_follow", name="关注的订阅分享", hours=1, - kwargs={ - 'job_id': 'subscribe_follow' - } + kwargs={"job_id": "subscribe_follow"}, ) # 下载器文件转移(每5分钟) @@ -315,9 +310,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="transfer", name="下载文件整理", minutes=5, - kwargs={ - 'job_id': 'transfer' - } + kwargs={"job_id": "transfer"}, ) # 后台刷新TMDB壁纸 @@ -327,10 +320,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="random_wallpager", name="壁纸缓存", minutes=30, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1), - kwargs={ - 'job_id': 'random_wallpager' - } + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + + timedelta(seconds=1), + kwargs={"job_id": "random_wallpager"}, ) # 公共定时服务 @@ -340,9 +332,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="scheduler_job", name="公共定时服务", minutes=10, - kwargs={ - 'job_id': 'scheduler_job' - } + kwargs={"job_id": "scheduler_job"}, ) # 缓存清理服务,每隔24小时 @@ -352,9 +342,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="clear_cache", name="缓存清理", hours=settings.CONF.meta / 3600, - kwargs={ - 'job_id': 'clear_cache' - } + kwargs={"job_id": "clear_cache"}, ) # 定时检查用户认证,每隔10分钟 @@ -364,9 +352,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="user_auth", name="用户认证检查", minutes=10, - kwargs={ - 'job_id': 'user_auth' - } + kwargs={"job_id": "user_auth"}, ) # 站点数据刷新 @@ -377,9 +363,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="sitedata_refresh", name="站点数据刷新", minutes=settings.SITEDATA_REFRESH_INTERVAL * 60, - kwargs={ - 'job_id': 'sitedata_refresh' - } + kwargs={"job_id": "sitedata_refresh"}, ) # 推荐缓存 @@ -389,10 +373,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="recommend_refresh", name="推荐缓存", hours=24, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5), - kwargs={ - 'job_id': 'recommend_refresh' - } + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + + timedelta(seconds=5), + kwargs={"job_id": "recommend_refresh"}, ) # 插件市场缓存 @@ -402,9 +385,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="plugin_market_refresh", name="插件市场缓存", minutes=30, - kwargs={ - 'job_id': 'plugin_market_refresh' - } + kwargs={"job_id": "plugin_market_refresh"}, ) # 订阅日历缓存 @@ -414,10 +395,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_calendar_cache", name="订阅日历缓存", hours=6, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2), - kwargs={ - 'job_id': 'subscribe_calendar_cache' - } + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + + timedelta(minutes=2), + kwargs={"job_id": "subscribe_calendar_cache"}, ) # 主动内存回收 @@ -428,9 +408,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="full_gc", name="主动内存回收", minutes=settings.MEMORY_GC_INTERVAL, - kwargs={ - 'job_id': 'full_gc' - } + kwargs={"job_id": "full_gc"}, + ) + + # 智能体定时任务检查 + if settings.AI_AGENT_ENABLE and settings.AI_AGENT_JOB_INTERVAL: + self._scheduler.add_job( + self.start, + "interval", + id="agent_heartbeat", + name="智能体定时任务", + hours=settings.AI_AGENT_JOB_INTERVAL, + kwargs={"job_id": "agent_heartbeat"}, ) # 初始化工作流服务 @@ -502,19 +491,21 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): # 普通函数 job["func"](*args, **kwargs) except Exception as e: - logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}") - MessageHelper().put(title=f"{job.get('name')} 执行失败", - message=str(e), - role="system") + logger.error( + f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}" + ) + MessageHelper().put( + title=f"{job.get('name')} 执行失败", message=str(e), role="system" + ) eventmanager.send_event( EventType.SystemError, { "type": "scheduler", "scheduler_id": job_id, - "scheduler_name": job.get('name'), + "scheduler_name": job.get("name"), "error": str(e), - "traceback": traceback.format_exc() - } + "traceback": traceback.format_exc(), + }, ) # 运行结束 self.__finish_job(job_id) @@ -559,9 +550,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): logger.info(f"移除工作流服务:{service.get('name')}") except Exception as e: logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}") - SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败", - message=str(e), - role="system") + SchedulerChain().messagehelper.put( + title=f"工作流 {workflow.name} 服务移除失败", + message=str(e), + role="system", + ) def remove_plugin_job(self, pid: str, job_id: Optional[str] = None): """ @@ -581,7 +574,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): else: # 移除插件的所有服务 jobs_to_remove = [ - (job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid + (job_id, service) + for job_id, service in self._jobs.items() + if service.get("pid") == pid ] for job_id, _ in jobs_to_remove: self._jobs.pop(job_id, None) @@ -602,12 +597,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): except JobLookupError: pass if job_removed: - logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") # noqa + logger.info( + f"移除插件服务({plugin_name}):{service.get('name')}" + ) # noqa except Exception as e: logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}") - SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败", - message=str(e), - role="system") + SchedulerChain().messagehelper.put( + title=f"插件 {plugin_name} 服务移除失败", + message=str(e), + role="system", + ) def update_workflow_job(self, workflow: Workflow): """ @@ -633,14 +632,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id=job_id, name=workflow.name, kwargs={"job_id": job_id, "workflow_id": workflow.id}, - replace_existing=True + replace_existing=True, ) logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}") except Exception as e: logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}") - SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败", - message=str(e), - role="system") + SchedulerChain().messagehelper.put( + title=f"工作流 {workflow.name} 服务注册失败", + message=str(e), + role="system", + ) def update_plugin_job(self, pid: str): """ @@ -656,7 +657,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): try: plugin_services = plugin_manager.get_plugin_services(pid=pid) except Exception as e: - logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}") + logger.error( + f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}" + ) return # 获取插件名称 plugin_name = plugin_manager.get_plugin_attr(pid, "plugin_name") @@ -681,14 +684,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name=service["name"], **(service.get("kwargs") or {}), kwargs={"job_id": job_id}, - replace_existing=True + replace_existing=True, + ) + logger.info( + f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}" ) - logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}") except Exception as e: logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}") - SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败", - message=str(e), - role="system") + SchedulerChain().messagehelper.put( + title=f"插件 {plugin_name} 服务注册失败", + message=str(e), + role="system", + ) def list(self) -> List[schemas.ScheduleInfo]: """ @@ -714,12 +721,14 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): if service.get("running") and name and provider_name: if job_id not in added: added.append(job_id) - schedulers.append(schemas.ScheduleInfo( - id=job_id, - name=name, - provider=provider_name, - status="正在运行", - )) + schedulers.append( + schemas.ScheduleInfo( + id=job_id, + name=name, + provider=provider_name, + status="正在运行", + ) + ) # 获取其他待执行任务 for job in jobs: job_id = job.id.split("|")[0] @@ -734,13 +743,15 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): status = "正在运行" if service.get("running") else "等待" # 下次运行时间 next_run = TimerUtils.time_difference(job.next_run_time) - schedulers.append(schemas.ScheduleInfo( - id=job_id, - name=job.name, - provider=service.get("provider_name", "[系统]"), - status=status, - next_run=next_run - )) + schedulers.append( + schemas.ScheduleInfo( + id=job_id, + name=job.name, + provider=service.get("provider_name", "[系统]"), + status=status, + next_run=next_run, + ) + ) return schedulers def stop(self): @@ -776,7 +787,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): collected = gc.collect() memory_after = get_memory_usage() memory_freed = memory_before - memory_after - logger.info(f"主动内存回收完成,回收对象数: {collected},释放内存: {memory_freed:.2f} MB") + logger.info( + f"主动内存回收完成,回收对象数: {collected},释放内存: {memory_freed:.2f} MB" + ) + + @staticmethod + async def agent_heartbeat(): + """ + 智能体心跳唤醒:检查并执行待处理的定时任务 + """ + from app.agent import agent_manager + + await agent_manager.heartbeat_check_jobs() def user_auth(self): """ @@ -788,9 +810,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): __max_try__ = 30 if self._auth_count > __max_try__: if not self._auth_message: - SchedulerChain().messagehelper.put(title=f"用户认证失败", - message="用户认证失败次数过多,将不再尝试认证!", - role="system") + SchedulerChain().messagehelper.put( + title=f"用户认证失败", + message="用户认证失败次数过多,将不再尝试认证!", + role="system", + ) self._auth_message = True return logger.info("用户未认证,正在尝试认证...") @@ -807,7 +831,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): mtype=NotificationType.Manual, title="MoviePilot用户认证成功", text=f"使用站点:{msg},如有插件使用异常,请重启MoviePilot。", - link=settings.MP_DOMAIN('#/site') + link=settings.MP_DOMAIN("#/site"), ) ) # 认证通过后重新初始化插件