feat: 智能体增加定时任务(Jobs)管理和心跳唤醒机制

- 新增 JobsMiddleware 中间件,支持通过 JOB.md 文件管理长期/重复性任务
- 智能体可创建一次性(once)和重复性(recurring)任务,自动跟踪执行状态
- 新增心跳唤醒机制,定时调度器周期性唤醒智能体检查并执行待处理任务
- 新增 AI_AGENT_JOB_INTERVAL 配置项控制检查间隔,默认24小时
- 每次心跳使用独立会话,执行完毕后清理资源
This commit is contained in:
jxxghp
2026-03-25 13:02:20 +08:00
parent de4dbf283b
commit 94c75eb1c7
4 changed files with 558 additions and 136 deletions

View File

@@ -1,5 +1,6 @@
import asyncio
import traceback
import uuid
from time import strftime
from typing import Dict, List
@@ -16,6 +17,7 @@ from langgraph.checkpoint.memory import InMemorySaver
from app.agent.callback import StreamingHandler
from app.agent.memory import memory_manager
from app.agent.middleware.jobs import JobsMiddleware
from app.agent.middleware.memory import MemoryMiddleware
from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware
from app.agent.middleware.skills import SkillsMiddleware
@@ -96,6 +98,10 @@ class MoviePilotAgent:
SkillsMiddleware(
sources=[str(settings.CONFIG_PATH / "agent" / "skills")],
),
# Jobs 任务管理
JobsMiddleware(
sources=[str(settings.CONFIG_PATH / "agent" / "jobs")],
),
# 记忆管理
MemoryMiddleware(
sources=[str(settings.CONFIG_PATH / "agent" / "MEMORY.md")]
@@ -340,6 +346,46 @@ class AgentManager:
memory_manager.clear_memory(session_id, user_id)
logger.info(f"会话 {session_id} 的记忆已清空")
async def heartbeat_check_jobs(self):
"""
心跳唤醒检查并执行待处理的定时任务Jobs
由定时调度器周期性调用,每次使用独立的会话避免上下文干扰。
"""
try:
# 每次使用唯一的 session_id避免共享上下文
session_id = f"__agent_heartbeat_{uuid.uuid4().hex[:12]}__"
user_id = settings.SUPERUSER
logger.info("智能体心跳唤醒:开始检查待处理任务...")
# 英文提示词,便于大模型理解
heartbeat_message = (
"[System Heartbeat Wake-up] Please check all jobs in your jobs directory and process pending tasks:\n"
"1. List all jobs with status 'pending' or 'in_progress'\n"
"2. For 'recurring' jobs, check the 'last_run' timestamp to determine if it's time to run again\n"
"3. For 'once' jobs with status 'pending', execute them now\n"
"4. After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file\n"
"5. If there are no pending jobs, simply respond with a brief summary\n"
"IMPORTANT: Respond in Chinese (中文). Begin checking and processing jobs now."
)
await self.process_message(
session_id=session_id,
user_id=user_id,
message=heartbeat_message,
channel=None,
source=None,
username=settings.SUPERUSER,
)
logger.info("智能体心跳唤醒:任务检查完成")
# 心跳会话用完即弃,清理资源
await self.clear_session(session_id, user_id)
except Exception as e:
logger.error(f"智能体心跳唤醒失败: {e}")
# 全局智能体管理器实例
agent_manager = AgentManager()

View File

@@ -0,0 +1,350 @@
import re
from collections.abc import Awaitable, Callable
from typing import Annotated, NotRequired, TypedDict
import yaml # noqa
from anyio import Path as AsyncPath
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ContextT,
ModelRequest,
ModelResponse,
PrivateStateAttr, # noqa
ResponseT,
)
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
from app.agent.middleware.utils import append_to_system_message
from app.log import logger
# JOB.md 文件最大限制为 1MB
MAX_JOB_FILE_SIZE = 1 * 1024 * 1024
class JobMetadata(TypedDict):
"""Job 元数据。"""
path: str
"""JOB.md 文件路径。"""
id: str
"""Job 标识符(目录名)。"""
name: str
"""Job 名称。"""
description: str
"""Job 描述。"""
schedule: str
"""调度类型: once一次性/ recurring重复性"""
status: str
"""当前状态: pending / in_progress / completed / cancelled。"""
last_run: str | None
"""上次执行时间。"""
class JobsState(AgentState):
"""jobs 中间件状态。"""
jobs_metadata: NotRequired[Annotated[list[JobMetadata], PrivateStateAttr]]
"""已加载的 job 元数据列表,不传播给父 agent。"""
class JobsStateUpdate(TypedDict):
"""jobs 中间件状态更新项。"""
jobs_metadata: list[JobMetadata]
"""待合并的 job 元数据列表。"""
def _parse_job_metadata(
content: str,
job_path: str,
job_id: str,
) -> JobMetadata | None:
"""从 JOB.md 内容中解析 YAML 前言并验证元数据。"""
if len(content) > MAX_JOB_FILE_SIZE:
logger.warning(
"Skipping %s: content too large (%d bytes)", job_path, len(content)
)
return None
# 匹配 --- 分隔的 YAML 前言
frontmatter_pattern = r"^---\s*\n(.*?)\n---\s*\n"
match = re.match(frontmatter_pattern, content, re.DOTALL)
if not match:
logger.warning("Skipping %s: no valid YAML frontmatter found", job_path)
return None
frontmatter_str = match.group(1)
# 解析 YAML
try:
frontmatter_data = yaml.safe_load(frontmatter_str)
except yaml.YAMLError as e:
logger.warning("Invalid YAML in %s: %s", job_path, e)
return None
if not isinstance(frontmatter_data, dict):
logger.warning("Skipping %s: frontmatter is not a mapping", job_path)
return None
# Job 名称和描述
name = str(frontmatter_data.get("name", "")).strip()
description = str(frontmatter_data.get("description", "")).strip()
if not name:
logger.warning("Skipping %s: missing required 'name'", job_path)
return None
# 调度类型
schedule = str(frontmatter_data.get("schedule", "once")).strip().lower()
if schedule not in ("once", "recurring"):
schedule = "once"
# 状态
status = str(frontmatter_data.get("status", "pending")).strip().lower()
if status not in ("pending", "in_progress", "completed", "cancelled"):
status = "pending"
# 上次执行时间
last_run = str(frontmatter_data.get("last_run", "")).strip() or None
return JobMetadata(
id=job_id,
name=name,
description=description,
path=job_path,
schedule=schedule,
status=status,
last_run=last_run,
)
async def _alist_jobs(source_path: AsyncPath) -> list[JobMetadata]:
"""异步列出指定路径下的所有任务。
扫描包含 JOB.md 的目录并解析其元数据。
"""
jobs: list[JobMetadata] = []
if not await source_path.exists():
return []
# 查找所有任务目录(包含 JOB.md 的目录)
job_dirs: list[AsyncPath] = []
async for path in source_path.iterdir():
if await path.is_dir() and await (path / "JOB.md").is_file():
job_dirs.append(path)
if not job_dirs:
return []
# 解析 JOB.md
for job_path in job_dirs:
job_md_path = job_path / "JOB.md"
job_content = await job_md_path.read_text(encoding="utf-8")
# 解析元数据
job_metadata = _parse_job_metadata(
content=job_content,
job_path=str(job_md_path),
job_id=job_path.name,
)
if job_metadata:
jobs.append(job_metadata)
return jobs
JOBS_SYSTEM_PROMPT = """
<jobs_system>
You have a **scheduled jobs** system that allows you to track and execute long-running or recurring tasks.
**Jobs Location:** `{jobs_location}`
**Current Jobs:**
{jobs_list}
**Job File Format:**
Each job is a directory containing a `JOB.md` file with YAML frontmatter followed by task details:
```markdown
---
name: 任务名称(简短中文描述)
description: 任务的详细描述,说明要做什么
schedule: once 或 recurring
status: pending / in_progress / completed / cancelled
last_run: "YYYY-MM-DD HH:MM"(上次执行时间,可选)
---
# 任务详情
## 目标
详细描述这个任务要完成的目标。
## 执行日志
记录每次执行的情况和结果。
- **2024-01-15 10:00** - 执行了XXX操作结果成功/失败
- **2024-01-16 10:00** - 继续执行XXX...
```
**Job Lifecycle Rules:**
1. **Creating a Job**: When a user asks you to do something periodically or at a later time:
- Create a new directory under the jobs location, directory name is the `job-id` (lowercase, hyphens, 1-64 chars)
- Write a `JOB.md` file with proper frontmatter and detailed task description
- Set `schedule: once` for one-time tasks, `schedule: recurring` for repeating tasks (e.g., daily sign-in, weekly checks)
- Set initial `status: pending`
2. **Executing a Job**: When you work on a job:
- Update `status: in_progress` in the frontmatter
- Execute the required actions using your tools
- Log the execution result in the "执行日志" section with timestamp
- Update `last_run` in frontmatter to current time
3. **Completing a Job**:
- For `schedule: once` tasks: set `status: completed` after successful execution
- For `schedule: recurring` tasks: keep `status: pending` after execution, only update `last_run` time. The job stays active for the next scheduled run.
- Set `status: cancelled` if the user explicitly asks to cancel/stop a task
4. **Heartbeat Check**: You will be periodically woken up to check pending jobs. When woken up:
- Read the jobs directory to find all active jobs (status: pending or in_progress)
- Skip jobs with `status: completed` or `status: cancelled`
- For `schedule: recurring` jobs, check `last_run` to determine if it's time to run again
- Execute pending jobs and update their status/logs accordingly
**Important Notes:**
- Each job MUST have its own separate directory and JOB.md file to avoid conflicts
- Always update the frontmatter fields (status, last_run) when executing a job
- Keep execution logs concise but informative
- For recurring jobs, maintain a rolling log (keep recent entries, you can summarize/remove old entries to keep the file manageable)
- When creating jobs, make the description detailed enough that you can understand and execute the task in future sessions without additional context
**When to Create Jobs:**
- User says "每天帮我..." / "定期..." / "定时..." / "提醒我..." / "以后每次..."
- User requests a task that should be done repeatedly
- User asks for monitoring or periodic checking of something
**When NOT to Create Jobs:**
- User asks for an immediate one-time action (just do it now)
- Simple questions or conversations
- Tasks that are already handled by MoviePilot's built-in scheduler services
</jobs_system>
"""
class JobsMiddleware(AgentMiddleware[JobsState, ContextT, ResponseT]): # noqa
"""加载并向系统提示词注入 Agent Jobs 的中间件。
扫描 jobs 目录下的 JOB.md 文件,解析元数据并注入到系统提示词中,
使智能体了解当前的长期任务及其状态。
"""
state_schema = JobsState
def __init__(self, *, sources: list[str]) -> None:
"""初始化 Jobs 中间件。"""
self.sources = sources
self.system_prompt_template = JOBS_SYSTEM_PROMPT
@staticmethod
def _format_jobs_list(jobs: list[JobMetadata]) -> str:
"""格式化任务元数据列表用于系统提示词。"""
if not jobs:
return "(No active jobs. You can create jobs when users request periodic or scheduled tasks.)"
lines = []
for job in jobs:
status_emoji = {
"pending": "",
"in_progress": "🔄",
"completed": "",
"cancelled": "",
}.get(job["status"], "")
schedule_label = (
"recurring (重复)"
if job["schedule"] == "recurring"
else "once (一次性)"
)
desc_line = (
f"- {status_emoji} **{job['id']}**: {job['name']}"
f" [{schedule_label}] - {job['description']}"
)
if job.get("last_run"):
desc_line += f" (上次执行: {job['last_run']})"
lines.append(desc_line)
lines.append(f" -> Read `{job['path']}` for full details")
return "\n".join(lines)
def modify_request(self, request: ModelRequest[ContextT]) -> ModelRequest[ContextT]:
"""将任务文档注入模型请求的系统消息中。"""
jobs_metadata = request.state.get("jobs_metadata", []) # noqa
# 过滤只展示活跃任务pending / in_progress / recurring
active_jobs = [
j
for j in jobs_metadata
if j["status"] in ("pending", "in_progress")
or (j["schedule"] == "recurring" and j["status"] not in ("cancelled",))
]
jobs_list = self._format_jobs_list(active_jobs)
jobs_location = self.sources[0] if self.sources else ""
jobs_section = self.system_prompt_template.format(
jobs_location=jobs_location,
jobs_list=jobs_list,
)
new_system_message = append_to_system_message(
request.system_message, jobs_section
)
return request.override(system_message=new_system_message)
async def abefore_agent( # noqa
self, state: JobsState, runtime: Runtime, config: RunnableConfig
) -> JobsStateUpdate | None:
"""在 Agent 执行前异步加载任务元数据。
每个会话仅加载一次。若 state 中已有则跳过。
"""
# 如果 state 中已存在元数据则跳过
if "jobs_metadata" in state:
return None
all_jobs: list[JobMetadata] = []
# 遍历源加载任务
for source_path_str in self.sources:
source_path = AsyncPath(source_path_str)
if not await source_path.exists():
await source_path.mkdir(parents=True, exist_ok=True)
continue
source_jobs = await _alist_jobs(source_path)
all_jobs.extend(source_jobs)
return JobsStateUpdate(jobs_metadata=all_jobs)
async def awrap_model_call(
self,
request: ModelRequest[ContextT],
handler: Callable[
[ModelRequest[ContextT]], Awaitable[ModelResponse[ResponseT]]
],
) -> ModelResponse[ResponseT]:
"""在模型调用时注入任务文档。"""
modified_request = self.modify_request(request)
return await handler(modified_request)
__all__ = ["JobMetadata", "JobsMiddleware"]

View File

@@ -529,6 +529,8 @@ class ConfigModel(BaseModel):
AI_RECOMMEND_MAX_ITEMS: int = 50
# LLM工具选择中间件最大工具数量0为不启用工具选择中间件
LLM_MAX_TOOLS: int = 0
# AI智能体定时任务检查间隔小时0为不启用默认24小时
AI_AGENT_JOB_INTERVAL: int = 24
class Settings(BaseSettings, ConfigModel, LogConfigModel):

View File

@@ -47,6 +47,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"""
定时任务管理
"""
CONFIG_WATCH = {
"DEV",
"COOKIECLOUD_INTERVAL",
@@ -56,6 +57,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"SUBSCRIBE_MODE",
"SUBSCRIBE_RSS_INTERVAL",
"SITEDATA_REFRESH_INTERVAL",
"AI_AGENT_ENABLE",
"AI_AGENT_JOB_INTERVAL",
}
def __init__(self):
@@ -98,133 +101,134 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"cookiecloud": {
"name": "同步CookieCloud站点",
"func": SiteChain().sync_cookies,
"running": False
"running": False,
},
"mediaserver_sync": {
"name": "同步媒体服务器",
"func": MediaServerChain().sync,
"running": False
"running": False,
},
"subscribe_tmdb": {
"name": "订阅元数据更新",
"func": SubscribeChain().check,
"running": False
"running": False,
},
"subscribe_search": {
"name": "订阅搜索补全",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "R"
}
"kwargs": {"state": "R"},
},
"new_subscribe_search": {
"name": "新增订阅搜索",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "N"
}
"kwargs": {"state": "N"},
},
"subscribe_refresh": {
"name": "订阅刷新",
"func": SubscribeChain().refresh,
"running": False
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
"running": False
"running": False,
},
"clear_cache": {
"name": "缓存清理",
"func": self.clear_cache,
"running": False
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": self.user_auth,
"running": False
"running": False,
},
"scheduler_job": {
"name": "公共定时服务",
"func": SchedulerChain().scheduler_job,
"running": False
"running": False,
},
"random_wallpager": {
"name": "壁纸缓存",
"func": WallpaperHelper().get_wallpapers,
"running": False
"running": False,
},
"sitedata_refresh": {
"name": "站点数据刷新",
"func": SiteChain().refresh_userdatas,
"running": False
"running": False,
},
"recommend_refresh": {
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False
"running": False,
},
"plugin_market_refresh": {
"name": "插件市场缓存",
"func": PluginManager().async_get_online_plugins,
"running": False,
"kwargs": {
"force": True
}
"kwargs": {"force": True},
},
"subscribe_calendar_cache": {
"name": "订阅日历缓存",
"func": SubscribeChain().cache_calendar,
"running": False
"running": False,
},
"full_gc": {
"name": "主动内存回收",
"func": self.full_gc,
"running": False
}
"running": False,
},
"agent_heartbeat": {
"name": "智能体定时任务",
"func": self.agent_heartbeat,
"running": False,
},
}
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(settings.CONF.scheduler)
})
self._scheduler = BackgroundScheduler(
timezone=settings.TZ,
executors={"default": ThreadPoolExecutor(settings.CONF.scheduler)},
)
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL \
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
if (
settings.COOKIECLOUD_INTERVAL
and str(settings.COOKIECLOUD_INTERVAL).isdigit()
):
self._scheduler.add_job(
self.start,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'cookiecloud'
}
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=5),
kwargs={"job_id": "cookiecloud"},
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL \
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
if (
settings.MEDIASERVER_SYNC_INTERVAL
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit()
):
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
kwargs={
'job_id': 'mediaserver_sync'
}
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=10),
kwargs={"job_id": "mediaserver_sync"},
)
# 新增订阅时搜索5分钟检查一次
@@ -234,9 +238,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="new_subscribe_search",
name="新增订阅搜索",
minutes=5,
kwargs={
'job_id': 'new_subscribe_search'
}
kwargs={"job_id": "new_subscribe_search"},
)
# 检查更新订阅TMDB数据每隔6小时
@@ -246,9 +248,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
kwargs={"job_id": "subscribe_tmdb"},
)
# 订阅状态每隔24小时搜索一次
@@ -259,9 +259,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_search",
name="订阅搜索补全",
hours=settings.SUBSCRIBE_SEARCH_INTERVAL,
kwargs={
'job_id': 'subscribe_search'
}
kwargs={"job_id": "subscribe_search"},
)
if settings.SUBSCRIBE_MODE == "spider":
@@ -275,13 +273,14 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
kwargs={
'job_id': 'subscribe_refresh'
})
kwargs={"job_id": "subscribe_refresh"},
)
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL \
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
if (
not settings.SUBSCRIBE_RSS_INTERVAL
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit()
):
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
@@ -291,9 +290,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
kwargs={
'job_id': 'subscribe_refresh'
}
kwargs={"job_id": "subscribe_refresh"},
)
# 关注订阅分享每1小时
@@ -303,9 +300,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_follow'
}
kwargs={"job_id": "subscribe_follow"},
)
# 下载器文件转移每5分钟
@@ -315,9 +310,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'transfer'
}
kwargs={"job_id": "transfer"},
)
# 后台刷新TMDB壁纸
@@ -327,10 +320,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
kwargs={
'job_id': 'random_wallpager'
}
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(seconds=1),
kwargs={"job_id": "random_wallpager"},
)
# 公共定时服务
@@ -340,9 +332,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="scheduler_job",
name="公共定时服务",
minutes=10,
kwargs={
'job_id': 'scheduler_job'
}
kwargs={"job_id": "scheduler_job"},
)
# 缓存清理服务每隔24小时
@@ -352,9 +342,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="clear_cache",
name="缓存清理",
hours=settings.CONF.meta / 3600,
kwargs={
'job_id': 'clear_cache'
}
kwargs={"job_id": "clear_cache"},
)
# 定时检查用户认证每隔10分钟
@@ -364,9 +352,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="user_auth",
name="用户认证检查",
minutes=10,
kwargs={
'job_id': 'user_auth'
}
kwargs={"job_id": "user_auth"},
)
# 站点数据刷新
@@ -377,9 +363,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="sitedata_refresh",
name="站点数据刷新",
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
kwargs={
'job_id': 'sitedata_refresh'
}
kwargs={"job_id": "sitedata_refresh"},
)
# 推荐缓存
@@ -389,10 +373,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
kwargs={
'job_id': 'recommend_refresh'
}
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(seconds=5),
kwargs={"job_id": "recommend_refresh"},
)
# 插件市场缓存
@@ -402,9 +385,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="plugin_market_refresh",
name="插件市场缓存",
minutes=30,
kwargs={
'job_id': 'plugin_market_refresh'
}
kwargs={"job_id": "plugin_market_refresh"},
)
# 订阅日历缓存
@@ -414,10 +395,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_calendar_cache",
name="订阅日历缓存",
hours=6,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
kwargs={
'job_id': 'subscribe_calendar_cache'
}
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=2),
kwargs={"job_id": "subscribe_calendar_cache"},
)
# 主动内存回收
@@ -428,9 +408,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="full_gc",
name="主动内存回收",
minutes=settings.MEMORY_GC_INTERVAL,
kwargs={
'job_id': 'full_gc'
}
kwargs={"job_id": "full_gc"},
)
# 智能体定时任务检查
if settings.AI_AGENT_ENABLE and settings.AI_AGENT_JOB_INTERVAL:
self._scheduler.add_job(
self.start,
"interval",
id="agent_heartbeat",
name="智能体定时任务",
hours=settings.AI_AGENT_JOB_INTERVAL,
kwargs={"job_id": "agent_heartbeat"},
)
# 初始化工作流服务
@@ -502,19 +491,21 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
# 普通函数
job["func"](*args, **kwargs)
except Exception as e:
logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}")
MessageHelper().put(title=f"{job.get('name')} 执行失败",
message=str(e),
role="system")
logger.error(
f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}"
)
MessageHelper().put(
title=f"{job.get('name')} 执行失败", message=str(e), role="system"
)
eventmanager.send_event(
EventType.SystemError,
{
"type": "scheduler",
"scheduler_id": job_id,
"scheduler_name": job.get('name'),
"scheduler_name": job.get("name"),
"error": str(e),
"traceback": traceback.format_exc()
}
"traceback": traceback.format_exc(),
},
)
# 运行结束
self.__finish_job(job_id)
@@ -559,9 +550,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
logger.info(f"移除工作流服务:{service.get('name')}")
except Exception as e:
logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败",
message=str(e),
role="system")
SchedulerChain().messagehelper.put(
title=f"工作流 {workflow.name} 服务移除失败",
message=str(e),
role="system",
)
def remove_plugin_job(self, pid: str, job_id: Optional[str] = None):
"""
@@ -581,7 +574,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
else:
# 移除插件的所有服务
jobs_to_remove = [
(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid
(job_id, service)
for job_id, service in self._jobs.items()
if service.get("pid") == pid
]
for job_id, _ in jobs_to_remove:
self._jobs.pop(job_id, None)
@@ -602,12 +597,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
except JobLookupError:
pass
if job_removed:
logger.info(f"移除插件服务({plugin_name}){service.get('name')}") # noqa
logger.info(
f"移除插件服务({plugin_name}){service.get('name')}"
) # noqa
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败",
message=str(e),
role="system")
SchedulerChain().messagehelper.put(
title=f"插件 {plugin_name} 服务移除失败",
message=str(e),
role="system",
)
def update_workflow_job(self, workflow: Workflow):
"""
@@ -633,14 +632,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id=job_id,
name=workflow.name,
kwargs={"job_id": job_id, "workflow_id": workflow.id},
replace_existing=True
replace_existing=True,
)
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")
except Exception as e:
logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败",
message=str(e),
role="system")
SchedulerChain().messagehelper.put(
title=f"工作流 {workflow.name} 服务注册失败",
message=str(e),
role="system",
)
def update_plugin_job(self, pid: str):
"""
@@ -656,7 +657,9 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
try:
plugin_services = plugin_manager.get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
logger.error(
f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}"
)
return
# 获取插件名称
plugin_name = plugin_manager.get_plugin_attr(pid, "plugin_name")
@@ -681,14 +684,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
name=service["name"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
replace_existing=True,
)
logger.info(
f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}"
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system")
SchedulerChain().messagehelper.put(
title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system",
)
def list(self) -> List[schemas.ScheduleInfo]:
"""
@@ -714,12 +721,14 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
if service.get("running") and name and provider_name:
if job_id not in added:
added.append(job_id)
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=name,
provider=provider_name,
status="正在运行",
))
schedulers.append(
schemas.ScheduleInfo(
id=job_id,
name=name,
provider=provider_name,
status="正在运行",
)
)
# 获取其他待执行任务
for job in jobs:
job_id = job.id.split("|")[0]
@@ -734,13 +743,15 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
status = "正在运行" if service.get("running") else "等待"
# 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time)
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=job.name,
provider=service.get("provider_name", "[系统]"),
status=status,
next_run=next_run
))
schedulers.append(
schemas.ScheduleInfo(
id=job_id,
name=job.name,
provider=service.get("provider_name", "[系统]"),
status=status,
next_run=next_run,
)
)
return schedulers
def stop(self):
@@ -776,7 +787,18 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
collected = gc.collect()
memory_after = get_memory_usage()
memory_freed = memory_before - memory_after
logger.info(f"主动内存回收完成,回收对象数: {collected},释放内存: {memory_freed:.2f} MB")
logger.info(
f"主动内存回收完成,回收对象数: {collected},释放内存: {memory_freed:.2f} MB"
)
@staticmethod
async def agent_heartbeat():
"""
智能体心跳唤醒:检查并执行待处理的定时任务
"""
from app.agent import agent_manager
await agent_manager.heartbeat_check_jobs()
def user_auth(self):
"""
@@ -788,9 +810,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
__max_try__ = 30
if self._auth_count > __max_try__:
if not self._auth_message:
SchedulerChain().messagehelper.put(title=f"用户认证失败",
message="用户认证失败次数过多,将不再尝试认证!",
role="system")
SchedulerChain().messagehelper.put(
title=f"用户认证失败",
message="用户认证失败次数过多,将不再尝试认证!",
role="system",
)
self._auth_message = True
return
logger.info("用户未认证,正在尝试认证...")
@@ -807,7 +831,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
mtype=NotificationType.Manual,
title="MoviePilot用户认证成功",
text=f"使用站点:{msg}如有插件使用异常请重启MoviePilot。",
link=settings.MP_DOMAIN('#/site')
link=settings.MP_DOMAIN("#/site"),
)
)
# 认证通过后重新初始化插件