feat(activity_log): enhance activity logging with query tool and context handling

This commit is contained in:
jxxghp
2026-06-19 20:39:12 +08:00
parent a9534d2422
commit 38c3dcc76b
11 changed files with 1110 additions and 70 deletions

View File

@@ -534,7 +534,7 @@ class MoviePilotAgent:
@property
def is_background(self) -> bool:
"""
是否为后台任务模式(无渠道信息,如定时唤醒)
是否为无需回传捕获内容的后台任务模式。
"""
return (not self.channel or not self.source) and not callable(self.output_callback)
@@ -555,6 +555,13 @@ class MoviePilotAgent:
"""
return self.session_id.startswith(HEARTBEAT_SESSION_PREFIX)
@property
def has_message_context(self) -> bool:
"""
是否具备真实消息渠道上下文。
"""
return bool(self.channel and self.source)
async def _is_system_admin_context(self) -> bool:
"""
判断当前 Agent 会话是否应按系统管理员上下文运行工具。
@@ -1042,7 +1049,7 @@ class MoviePilotAgent:
UsageMiddleware(on_usage=self._record_usage),
]
if not self.is_heartbeat_session:
if self.has_message_context:
middlewares.insert(
4,
ActivityLogMiddleware(

View File

@@ -3,13 +3,14 @@
按日期存储在 CONFIG_PATH/agent/activity/YYYY-MM-DD.md 中,
每次 Agent 执行完毕后自动调用 LLM 对本轮对话生成简洁的活动摘要,
并在每次 Agent 启动时加载近几天的活动日志注入系统提示词
并在每次 Agent 启动时注入轻量索引,完整日志由工具按需查询
"""
import re
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta
from typing import Annotated, Any, NotRequired, TypedDict
from pathlib import Path
from typing import Annotated, Any, NotRequired, Optional, TypedDict
from anyio import Path as AsyncPath
from langchain.agents.middleware.types import (
@@ -30,29 +31,210 @@ from app.log import logger
# 活动日志保留天数
DEFAULT_RETENTION_DAYS = 7
# 注入系统提示词时加载的天数
# 注入系统提示词时索引的天数
PROMPT_LOAD_DAYS = 3
# 工具默认查询的天数
DEFAULT_QUERY_DAYS = 7
# 工具单次返回的最大条数
DEFAULT_QUERY_LIMIT = 20
MAX_QUERY_LIMIT = 50
# 每日日志文件最大大小 (256KB)
MAX_LOG_FILE_SIZE = 256 * 1024
# 提取本轮对话上下文的最大字符数(避免过长的对话消耗太多 token
MAX_CONTEXT_FOR_SUMMARY = 4000
TRIVIAL_USER_TEXT_PATTERN = re.compile(
r"^\s*(你好|您好|hi|hello|hey|谢谢|谢了|多谢|ok|好的|收到|嗯|嗯嗯|是的|对|可以|行|好)\s*[。.!]?\s*$",
re.IGNORECASE,
)
SUMMARY_SKIP_MARKER = "SKIP"
# LLM 总结的提示词
SUMMARY_PROMPT = """根据以下 AI 助手与用户的对话记录生成一条简洁的活动摘要中文一句话不超过80字
摘要应包含:用户的需求是什么、助手做了什么、结果如何。
只输出摘要内容,不要加任何前缀、标点序号或解释。
SUMMARY_PROMPT = """判断以下 AI 助手与用户的对话是否值得写入 MoviePilot 活动日志
如果本轮只是问候、寒暄、感谢、确认、闲聊、没有实际任务、没有工具动作、任务没有推进、纯粹的格式纠正或无意义空转请只输出SKIP
如果值得记录,请输出一条中文单行活动摘要,要求:
- 40 到 160 个汉字左右,信息密度高,不要写成泛泛一句话。
- 只输出摘要正文不要标题、编号、Markdown、JSON 或解释。
- 尽量包含:用户目标、关键对象(影片/剧集/站点/路径/任务/设置)、助手采取的关键动作或工具、结果状态、失败原因或下一步。
- 如果有明确 ID、路径、站点名、任务状态、成功/失败数量,请保留关键值。
- 不要记录 API Key、Cookie、Token、密码等敏感信息如出现请写成“敏感信息已省略”。
推荐格式示例:
用户要求整理 `/downloads/Show`助手识别为《示例剧》TMDB 12345并提交 transfer_file 整理,结果成功。
用户排查下载失败,助手查询 qBittorrent 任务和站点状态,发现 tracker 超时,建议更换站点或重试。
对话记录:
{conversation}"""
ACTIVITY_ENTRY_PATTERN = re.compile(r"^-\s+\*\*(?P<time>\d{2}:\d{2})\*\*\s+(?P<summary>.+)$")
def _coerce_query_limit(limit: Optional[int]) -> int:
"""规范化活动日志查询条数。"""
if limit is None:
return DEFAULT_QUERY_LIMIT
try:
value = int(limit)
except (TypeError, ValueError):
return DEFAULT_QUERY_LIMIT
return min(max(value, 1), MAX_QUERY_LIMIT)
def _build_log_path(activity_dir: str, date_str: str) -> Path:
"""构建指定日期的活动日志路径。"""
return Path(activity_dir) / f"{date_str}.md"
def _iter_recent_dates(days: int) -> list[str]:
"""返回从今天开始向前的日期字符串列表。"""
normalized_days = max(1, int(days or 1))
today = datetime.now().date()
return [
(today - timedelta(days=index)).strftime("%Y-%m-%d")
for index in range(normalized_days)
]
def _parse_activity_entries(date_str: str, content: str) -> list[dict[str, str]]:
"""从单日活动日志 Markdown 中解析活动条目。"""
entries: list[dict[str, str]] = []
for line in content.splitlines():
match = ACTIVITY_ENTRY_PATTERN.match(line.strip())
if not match:
continue
entries.append(
{
"date": date_str,
"time": match.group("time"),
"summary": match.group("summary").strip(),
}
)
return entries
def _activity_summary_matches_keyword(
summary: str,
keyword: str,
regex_pattern: Optional[re.Pattern[str]],
) -> bool:
"""判断活动摘要是否命中普通关键词或正则表达式。"""
if regex_pattern:
return bool(regex_pattern.search(summary))
return keyword.lower() in summary.lower()
def load_activity_log_index(activity_dir: str, days: int = PROMPT_LOAD_DAYS) -> dict[str, str]:
"""加载近期活动日志索引,不返回完整日志正文。"""
index: dict[str, str] = {}
for date_str in _iter_recent_dates(days):
log_path = _build_log_path(activity_dir, date_str)
if not log_path.is_file():
continue
try:
content = log_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning(f"读取活动日志索引失败 {log_path}: {e}")
continue
entry_count = len(_parse_activity_entries(date_str, content))
if entry_count:
index[date_str] = f"{entry_count} 条活动记录"
return index
def query_activity_logs(
activity_dir: str,
*,
keyword: Optional[str] = None,
use_regex: bool = False,
date: Optional[str] = None,
days: int = DEFAULT_QUERY_DAYS,
limit: Optional[int] = DEFAULT_QUERY_LIMIT,
) -> dict[str, Any]:
"""
查询活动日志条目。
:param activity_dir: 活动日志目录
:param keyword: 可选关键词,按摘要文本过滤
:param use_regex: 是否将关键词按正则表达式匹配
:param date: 可选日期,格式为 ``YYYY-MM-DD``
:param days: 未指定日期时向前查询的天数
:param limit: 返回条数上限
:return: 查询结果载荷
"""
normalized_limit = _coerce_query_limit(limit)
normalized_keyword = str(keyword or "").strip()
normalized_use_regex = bool(use_regex)
regex_pattern: Optional[re.Pattern[str]] = None
if normalized_keyword and normalized_use_regex:
try:
regex_pattern = re.compile(normalized_keyword, re.IGNORECASE)
except re.error as err:
return {
"success": False,
"message": f"无效的活动日志正则表达式: {err}",
"activity_dir": activity_dir,
"keyword": normalized_keyword,
"use_regex": normalized_use_regex,
"date": date,
"days": days if not date else None,
"searched_dates": [],
"total_count": 0,
"returned_count": 0,
"truncated": False,
"entries": [],
}
date_candidates = [date] if date else _iter_recent_dates(days)
entries: list[dict[str, str]] = []
searched_dates: list[str] = []
for date_str in date_candidates:
if not date_str:
continue
searched_dates.append(date_str)
log_path = _build_log_path(activity_dir, date_str)
if not log_path.is_file():
continue
try:
content = log_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning(f"读取活动日志失败 {log_path}: {e}")
continue
for entry in _parse_activity_entries(date_str, content):
if normalized_keyword and not _activity_summary_matches_keyword(
entry["summary"], normalized_keyword, regex_pattern
):
continue
entries.append(entry)
entries.sort(key=lambda item: (item["date"], item["time"]), reverse=True)
total_count = len(entries)
return {
"success": True,
"activity_dir": activity_dir,
"keyword": normalized_keyword or None,
"use_regex": normalized_use_regex,
"date": date,
"days": days if not date else None,
"searched_dates": searched_dates,
"total_count": total_count,
"returned_count": min(total_count, normalized_limit),
"truncated": total_count > normalized_limit,
"entries": entries[:normalized_limit],
}
class ActivityLogState(AgentState):
"""ActivityLogMiddleware 的状态模型。"""
activity_log_contents: NotRequired[Annotated[dict[str, str], PrivateStateAttr]]
"""将日期字符串映射到日志内容的字典。标记为私有,不包含在最终代理状态中。"""
"""将日期字符串映射到日志索引摘要的字典。标记为私有,不包含在最终代理状态中。"""
class ActivityLogStateUpdate(TypedDict):
@@ -61,7 +243,7 @@ class ActivityLogStateUpdate(TypedDict):
activity_log_contents: dict[str, str]
def _extract_last_round(messages: list) -> list | None:
def _extract_last_round(messages: list) -> Optional[list]:
"""从完整消息列表中提取最后一轮交互。
从最后一条 HumanMessage 到消息末尾即为本轮交互。
@@ -148,7 +330,31 @@ def _format_conversation_for_summary(round_messages: list) -> str:
return "\n".join(lines)
async def _summarize_with_llm(conversation_text: str) -> str | None:
def _should_skip_activity_summary(round_messages: list) -> bool:
"""判断本轮交互是否无需生成活动日志。"""
if not round_messages:
return True
has_tool_activity = any(
isinstance(msg, ToolMessage)
or (isinstance(msg, AIMessage) and bool(getattr(msg, "tool_calls", None)))
for msg in round_messages
)
if has_tool_activity:
return False
user_text_parts = []
for msg in round_messages:
if not isinstance(msg, HumanMessage):
continue
content = msg.content if isinstance(msg.content, str) else str(msg.content)
if content:
user_text_parts.append(content)
user_text = " ".join(user_text_parts).strip()
return bool(user_text and TRIVIAL_USER_TEXT_PATTERN.match(user_text))
async def _summarize_with_llm(conversation_text: str) -> Optional[str]:
"""调用 LLM 对对话文本生成活动摘要。
参数:
@@ -166,50 +372,47 @@ async def _summarize_with_llm(conversation_text: str) -> str | None:
summary = response.content.strip()
# 清理模型可能输出的前缀(如 "摘要:" "总结:"
summary = re.sub(r"^(摘要|总结|活动记录)[:]\s*", "", summary)
if summary.strip().upper() == SUMMARY_SKIP_MARKER:
return None
return summary if summary else None
except Exception as e:
logger.debug("LLM summarization failed: %s", e)
logger.debug(f"LLM 活动摘要生成失败: {e}")
return None
ACTIVITY_LOG_SYSTEM_PROMPT = """<activity_log>
{activity_log}
</activity_log>
<activity_log_index>
{activity_log_index}
</activity_log_index>
<activity_log_guidelines>
The above <activity_log> contains a record of your recent interactions with the user, automatically maintained by the system.
Activity logs are automatically maintained by the system and are available for continuity, but full log contents are not injected into context by default.
**How to use this information:**
- Reference past activities when relevant to provide continuity (e.g., "之前帮你订阅了《XXX》现在有更新了")
- Use activity history to understand ongoing tasks and user patterns
- When the user asks "你之前帮我做了什么" or similar questions, refer to this log
- Activity logs are automatically recorded after each interaction - you do NOT need to manually update them
**What is automatically logged:**
- Each user interaction: what was asked, which tools were used, and the outcome
- Timestamps for all activities
- The log is organized by date for easy reference
**Important:**
- Activity logs are READ-ONLY from your perspective - the system manages them automatically
- Do not attempt to edit or write to activity log files
- For long-term preferences and knowledge, continue to use MEMORY.md
- Activity logs are retained for {retention_days} days and then automatically cleaned up
- The <activity_log_index> above only lists which recent dates have activity records and how many entries exist.
- Use `query_activity_log` when the user asks about previous work, asks to continue a prior task, or when recent activity is clearly relevant to the current request.
- To find related logs, start with a broad search: use the exact date if known; otherwise query recent days with a short keyword, or omit `keyword` and inspect the latest entries. If there are no matches, retry with a larger `days` value or a shorter object/path fragment.
- `query_activity_log.keyword` is a plain substring by default. Set `use_regex=true` only when matching alternatives or patterns such as multiple titles, paths, or task IDs.
- Do not query activity logs for routine standalone tasks such as file organization, media recognition, downloads, subscriptions, or diagnostics unless the user explicitly references prior activity.
- Activity logs are read-only from your perspective. Do not attempt to edit or write to activity log files.
- For long-term preferences and knowledge, continue to use MEMORY.md.
- Activity logs are retained for {retention_days} days and then automatically cleaned up.
</activity_log_guidelines>
</activity_log>
"""
class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, ResponseT]): # noqa
"""自动记录和加载 Agent 活动日志的中间件。
"""自动记录 Agent 活动日志并注入轻量索引的中间件。
- abefore_agent: 加载近几天的活动日志
- awrap_model_call: 将活动日志注入系统提示词
- abefore_agent: 加载近几天的活动日志索引
- awrap_model_call: 将活动日志索引和检索规则注入系统提示词
- aafter_agent: 从本次对话中提取摘要并追加到当日日志文件
参数:
activity_dir: 活动日志存储目录路径。
retention_days: 日志保留天数(默认 7 天)。
prompt_load_days: 注入系统提示词时加载的天数(默认 3 天)。
prompt_load_days: 注入系统提示词时索引的天数(默认 3 天)。
"""
state_schema = ActivityLogState
@@ -230,10 +433,10 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
return AsyncPath(self.activity_dir) / f"{date_str}.md"
def _format_activity_log(self, contents: dict[str, str]) -> str:
"""格式化活动日志用于系统提示词注入。"""
"""格式化活动日志索引用于系统提示词注入。"""
if not contents:
return ACTIVITY_LOG_SYSTEM_PROMPT.format(
activity_log="(暂无活动记录)",
activity_log_index="(近期暂无活动日志索引。需要历史上下文时可调用 query_activity_log。)",
retention_days=self.retention_days,
)
@@ -247,35 +450,22 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
if not sections:
return ACTIVITY_LOG_SYSTEM_PROMPT.format(
activity_log="(暂无活动记录)",
activity_log_index="(近期暂无活动日志索引。需要历史上下文时可调用 query_activity_log。)",
retention_days=self.retention_days,
)
log_body = "\n\n".join(sections)
log_body = "\n".join(sections)
return ACTIVITY_LOG_SYSTEM_PROMPT.format(
activity_log=log_body,
activity_log_index=log_body,
retention_days=self.retention_days,
)
async def _load_recent_logs(self) -> dict[str, str]:
"""加载近几天的活动日志。"""
contents: dict[str, str] = {}
today = datetime.now().date()
for i in range(self.prompt_load_days):
date = today - timedelta(days=i)
date_str = date.strftime("%Y-%m-%d")
log_path = self._get_log_path(date_str)
if await log_path.exists():
try:
content = await log_path.read_text(encoding="utf-8")
contents[date_str] = content
logger.debug("Loaded activity log for %s", date_str)
except Exception as e:
logger.warning("Failed to load activity log %s: %s", date_str, e)
return contents
"""加载近几天的活动日志索引"""
return load_activity_log_index(
activity_dir=self.activity_dir,
days=self.prompt_load_days,
)
async def _append_activity(self, summary: str) -> None:
"""将一条活动记录追加到当日日志文件。"""
@@ -340,7 +530,7 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
async def abefore_agent(
self, state: ActivityLogState, runtime: Runtime
) -> ActivityLogStateUpdate | None:
) -> Optional[ActivityLogStateUpdate]:
"""在 Agent 执行前加载近期活动日志。"""
# 如果已经加载则跳过
if "activity_log_contents" in state:
@@ -376,7 +566,7 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
async def aafter_agent(
self, state: ActivityLogState, runtime: Runtime
) -> dict[str, Any] | None:
) -> Optional[dict[str, Any]]:
"""Agent 执行完毕后,调用 LLM 对本轮对话生成摘要并追加到当日活动日志。"""
try:
messages = state.get("messages", [])
@@ -387,6 +577,8 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
round_messages = _extract_last_round(messages)
if not round_messages:
return None
if _should_skip_activity_summary(round_messages):
return None
# 格式化对话文本
conversation_text = _format_conversation_for_summary(round_messages)
@@ -403,4 +595,8 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
return None
__all__ = ["ActivityLogMiddleware"]
__all__ = [
"ActivityLogMiddleware",
"load_activity_log_index",
"query_activity_logs",
]

View File

@@ -26,6 +26,130 @@ from typing_extensions import TypedDict # noqa
from app.log import logger
MIN_SELECTED_TOOL_COUNT = 4
MOVIEPILOT_TOOL_SELECTION_HINT = """
MoviePilot tool-chain hints:
- For media search and download tasks, keep related steps together when relevant:
search_media, search_torrents, get_search_results, add_download_tasks, query_download_tasks.
- For file organization and library transfer tasks, keep related steps together when relevant:
list_directory, query_directory_settings, recognize_media, query_library_exists, transfer_file, query_transfer_history, scrape_metadata.
- For subscription tasks, keep related steps together when relevant:
search_subscribe, add_subscribe, query_subscribes, update_subscribe, query_subscribe_history, query_popular_subscribes.
- For download management tasks, keep related steps together when relevant:
query_download_tasks, update_download_tasks, delete_download_tasks, query_downloaders.
- For site diagnostics or maintenance tasks, keep related steps together when relevant:
query_sites, query_site_userdata, test_site, update_site, update_site_cookie.
- For scheduler and workflow tasks, keep related steps together when relevant:
query_schedulers, run_scheduler, query_workflows, run_workflow, query_episode_schedule.
- For plugin tasks, keep related steps together when relevant:
query_installed_plugins, query_market_plugins, query_plugin_capabilities, query_plugin_config, update_plugin_config, query_plugin_data, install_plugin, uninstall_plugin, reload_plugin.
- For rule, identifier, or system setting tasks, keep related steps together when relevant:
query_rule_groups, query_builtin_filter_rules, query_custom_filter_rules, add_custom_filter_rule, update_custom_filter_rule, delete_custom_filter_rule, add_rule_group, update_rule_group, delete_rule_group, query_custom_identifiers, update_custom_identifiers, query_system_settings, update_system_settings.
- Prefer including the likely next-step tools in the same workflow instead of selecting only the first tool.
"""
TOOL_CHAIN_GROUPS = (
(
"media_download",
(
"search_media",
"search_torrents",
"get_search_results",
"add_download_tasks",
"query_download_tasks",
"query_downloaders",
),
),
(
"library_transfer",
(
"list_directory",
"query_directory_settings",
"recognize_media",
"query_library_exists",
"transfer_file",
"query_transfer_history",
"scrape_metadata",
),
),
(
"subscription",
(
"search_subscribe",
"add_subscribe",
"query_subscribes",
"update_subscribe",
"delete_subscribe",
"query_subscribe_history",
"query_popular_subscribes",
"query_subscribe_shares",
),
),
(
"download_management",
(
"query_download_tasks",
"update_download_tasks",
"delete_download_tasks",
"query_downloaders",
),
),
(
"site_management",
(
"query_sites",
"query_site_userdata",
"test_site",
"update_site",
"update_site_cookie",
),
),
(
"workflow_scheduler",
(
"query_schedulers",
"run_scheduler",
"query_workflows",
"run_workflow",
"query_episode_schedule",
),
),
(
"plugin_management",
(
"query_installed_plugins",
"query_market_plugins",
"query_plugin_capabilities",
"query_plugin_config",
"update_plugin_config",
"query_plugin_data",
"install_plugin",
"uninstall_plugin",
"reload_plugin",
),
),
(
"rule_settings",
(
"query_rule_groups",
"query_builtin_filter_rules",
"query_custom_filter_rules",
"add_custom_filter_rule",
"update_custom_filter_rule",
"delete_custom_filter_rule",
"add_rule_group",
"update_rule_group",
"delete_rule_group",
"query_custom_identifiers",
"update_custom_identifiers",
"query_system_settings",
"update_system_settings",
),
),
)
class ToolSelectionState(AgentState):
"""工具筛选中间件私有状态。"""
@@ -73,12 +197,63 @@ class ToolSelectorMiddleware(LLMToolSelectorMiddleware):
) -> None:
super().__init__(
model=model,
system_prompt=system_prompt,
system_prompt=self._append_tool_selection_hint(system_prompt),
max_tools=max_tools,
always_include=always_include,
)
self.selection_tools = selection_tools or []
@staticmethod
def _append_tool_selection_hint(system_prompt: str) -> str:
"""追加 MoviePilot 工具组选择提示,避免复杂链路只选中首个工具。"""
if "MoviePilot tool-chain hints:" in system_prompt:
return system_prompt
return f"{system_prompt.rstrip()}{MOVIEPILOT_TOOL_SELECTION_HINT}"
def _get_tool_selection_limit(self, valid_tool_names: list[str]) -> int:
"""计算补齐筛选结果时允许使用的工具数量上限。"""
if self.max_tools:
return min(self.max_tools, len(valid_tool_names))
return len(valid_tool_names)
def _complete_low_count_selection(
self,
selected_tool_names: list[str],
valid_tool_names: list[str],
) -> list[str]:
"""
当模型只选出极少工具时,按 MoviePilot 常见工具链补齐相邻工具。
这只补齐已经命中的工具组,不会把所有工具组都展开,因此能降低
“选了搜索工具但漏了结果/下载工具”这类链式任务失败概率。
"""
limit = self._get_tool_selection_limit(valid_tool_names)
target_count = min(MIN_SELECTED_TOOL_COUNT, limit)
selected_names = [
tool_name
for tool_name in selected_tool_names
if tool_name in valid_tool_names
]
if len(selected_names) >= target_count:
return selected_names[:limit]
selected_set = set(selected_names)
valid_tool_set = set(valid_tool_names)
completed_names = list(selected_names)
for _, group_tool_names in TOOL_CHAIN_GROUPS:
if not selected_set.intersection(group_tool_names):
continue
for tool_name in group_tool_names:
if tool_name in selected_set or tool_name not in valid_tool_set:
continue
completed_names.append(tool_name)
selected_set.add(tool_name)
if len(completed_names) >= target_count:
return completed_names[:limit]
return completed_names[:limit]
def _process_selection_response(
self,
response: dict[str, Any],
@@ -103,6 +278,14 @@ class ToolSelectorMiddleware(LLMToolSelectorMiddleware):
tools=[*available_tools, *always_included_tools, *provider_tools]
)
response["tools"] = self._complete_low_count_selection(
selected_tool_names=[
tool_name
for tool_name in response.get("tools", [])
if isinstance(tool_name, str)
],
valid_tool_names=valid_tool_names,
)
return super()._process_selection_response(
response,
available_tools,

View File

@@ -75,6 +75,7 @@ from app.agent.tools.impl.uninstall_plugin import UninstallPluginTool
from app.agent.tools.impl.run_slash_command import RunSlashCommandTool
from app.agent.tools.impl.list_slash_commands import ListSlashCommandsTool
from app.agent.tools.impl.query_custom_identifiers import QueryCustomIdentifiersTool
from app.agent.tools.impl.query_activity_log import QueryActivityLogTool
from app.agent.tools.impl.query_doctor_report import QueryDoctorReportTool
from app.agent.tools.impl.update_custom_identifiers import UpdateCustomIdentifiersTool
from app.agent.tools.impl.query_system_settings import QuerySystemSettingsTool
@@ -93,7 +94,7 @@ class MoviePilotToolFactory:
"""
# 这些通用工具需要始终保留,避免大工具集裁剪后让 Agent 丢失基础的
# 文件系统、命令执行、主动消息发送或交互确认能力。AskUserChoiceTool 仅在支持按钮
# 文件系统、命令执行、历史检索或交互确认能力。AskUserChoiceTool 仅在支持按钮
# 的渠道中才会实际注入,因此后续会再按已加载工具做一次求交集。
TOOL_SELECTOR_ALWAYS_INCLUDE_NAMES = (
"list_directory",
@@ -101,8 +102,7 @@ class MoviePilotToolFactory:
"read_file",
"edit_file",
"execute_command",
"query_doctor_report",
"send_message",
"query_activity_log",
"ask_user_choice",
)
@@ -224,6 +224,7 @@ class MoviePilotToolFactory:
UninstallPluginTool,
RunSlashCommandTool,
ListSlashCommandsTool,
QueryActivityLogTool,
QueryDoctorReportTool,
QueryCustomIdentifiersTool,
UpdateCustomIdentifiersTool,

View File

@@ -0,0 +1,120 @@
"""查询 Agent 活动日志工具。"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.middleware.activity_log import (
DEFAULT_QUERY_DAYS,
DEFAULT_QUERY_LIMIT,
query_activity_logs,
)
from app.agent.runtime import agent_runtime_manager
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.tags import ToolTag
from app.log import logger
class QueryActivityLogInput(BaseModel):
"""查询活动日志工具的输入参数模型。"""
explanation: Optional[str] = Field(
None,
description="Clear explanation of why this tool is being used in the current context",
)
keyword: Optional[str] = Field(
None,
description=(
"Optional plain-text keyword to filter activity summaries. Use short title, path, site, task, "
"or status fragments; omit it to inspect latest entries."
),
)
use_regex: Optional[bool] = Field(
False,
description=(
"Whether to treat keyword as a regular expression. Defaults to false; enable only for "
"alternative or pattern matching."
),
)
date: Optional[str] = Field(
None,
description="Optional exact date in YYYY-MM-DD format. If omitted, recent days are searched.",
)
days: Optional[int] = Field(
DEFAULT_QUERY_DAYS,
description="Number of recent days to search when date is not specified.",
)
limit: Optional[int] = Field(
DEFAULT_QUERY_LIMIT,
description="Maximum number of activity entries to return.",
)
class QueryActivityLogTool(MoviePilotTool):
"""
Agent 活动日志只读查询工具。
"""
name: str = "query_activity_log"
tags: list[str] = [
ToolTag.Read,
ToolTag.System,
]
description: str = (
"Query recent MoviePilot Agent activity logs on demand. Use this when the user asks what was done before, "
"asks to continue a previous task, or explicitly references recent agent activity. Supports keyword, date, "
"recent-day window, limit, and optional regex filters. If a keyword search returns no results, retry with "
"a shorter keyword, a larger days window, or no keyword to inspect recent entries."
)
args_schema: Type[BaseModel] = QueryActivityLogInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息。"""
keyword = kwargs.get("keyword")
date = kwargs.get("date")
if date and keyword:
return f"查询活动日志: {date} / {keyword}"
if date:
return f"查询活动日志: {date}"
if keyword:
return f"搜索活动日志: {keyword}"
return "查询近期活动日志"
async def run(
self,
keyword: Optional[str] = None,
use_regex: Optional[bool] = False,
date: Optional[str] = None,
days: Optional[int] = DEFAULT_QUERY_DAYS,
limit: Optional[int] = DEFAULT_QUERY_LIMIT,
**kwargs,
) -> str:
"""
查询活动日志并返回 JSON 字符串。
"""
logger.info(
f"执行工具: {self.name}, keyword={keyword}, use_regex={use_regex}, date={date}, "
f"days={days}, limit={limit}"
)
try:
payload = await self.run_blocking(
"default",
query_activity_logs,
str(agent_runtime_manager.activity_dir),
keyword=keyword,
use_regex=bool(use_regex),
date=date,
days=days or DEFAULT_QUERY_DAYS,
limit=limit,
)
return json.dumps(payload, ensure_ascii=False, indent=2)
except Exception as err:
logger.error(f"查询活动日志失败: {err}", exc_info=True)
return json.dumps(
{
"success": False,
"message": f"查询活动日志时发生错误: {str(err)}",
},
ensure_ascii=False,
)

View File

@@ -66,6 +66,19 @@ class TransferFileTool(MoviePilotTool):
args_schema: Type[BaseModel] = TransferFileInput
require_admin: bool = True
@staticmethod
def _get_fileitem_type(file_path: str, storage: Optional[str] = "local") -> str:
"""
判断待整理路径的文件类型。
:param file_path: 已规范化的源文件或目录路径
:param storage: 源存储类型
:return: ``dir`` 或 ``file``
"""
if (storage or "local") == "local" and Path(file_path).is_dir():
return "dir"
return "dir" if file_path.endswith("/") else "file"
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据整理参数生成友好的提示消息"""
file_path = kwargs.get("file_path", "")
@@ -119,7 +132,7 @@ class TransferFileTool(MoviePilotTool):
fileitem = FileItem(
storage=storage or "local",
path=file_path,
type="dir" if file_path.endswith("/") else "file",
type=TransferFileTool._get_fileitem_type(file_path, storage),
)
target_path_obj = Path(target_path) if target_path else None

View File

@@ -596,7 +596,7 @@ class ConfigModel(BaseModel):
# AI推荐条目数量限制
AI_RECOMMEND_MAX_ITEMS: int = 50
# LLM工具选择中间件最大工具数量0为不启用工具选择中间件
LLM_MAX_TOOLS: int = 0
LLM_MAX_TOOLS: int = 20
# AI智能体定时任务检查间隔小时0为不启用默认24小时
AI_AGENT_JOB_INTERVAL: int = 0
# AI智能体啰嗦模式开启后会回复工具调用过程

View File

@@ -0,0 +1,286 @@
import asyncio
import json
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agent.middleware.activity_log import (
ActivityLogMiddleware,
_summarize_with_llm,
load_activity_log_index,
query_activity_logs,
)
from app.agent.tools.factory import MoviePilotToolFactory
from app.agent.tools.impl.query_activity_log import QueryActivityLogTool
from app.agent.tools.manager import MoviePilotToolsManager
def _write_activity_log(activity_dir, date_str: str, lines: list[str]) -> None:
"""写入测试用活动日志。"""
activity_dir.mkdir(parents=True, exist_ok=True)
body = "\n".join(lines)
(activity_dir / f"{date_str}.md").write_text(
f"# {date_str} 活动日志\n\n{body}\n",
encoding="utf-8",
)
def test_activity_log_index_counts_entries_without_body(tmp_path):
"""活动日志索引只应包含条目数量,不暴露完整摘要正文。"""
date_str = datetime.now().strftime("%Y-%m-%d")
_write_activity_log(
tmp_path,
date_str,
[
"- **10:00** 帮用户整理了电影文件",
"- **11:00** 查询了下载任务状态",
],
)
index = load_activity_log_index(str(tmp_path), days=1)
assert index == {date_str: "2 条活动记录"}
assert "整理了电影文件" not in json.dumps(index, ensure_ascii=False)
def test_activity_log_prompt_injects_index_not_full_log(tmp_path):
"""ActivityLogMiddleware 注入系统提示词时不应携带完整活动日志正文。"""
date_str = datetime.now().strftime("%Y-%m-%d")
_write_activity_log(
tmp_path,
date_str,
["- **10:00** 这是一条不应默认进入上下文的活动正文"],
)
middleware = ActivityLogMiddleware(activity_dir=str(tmp_path), prompt_load_days=1)
state_update = asyncio.run(middleware.abefore_agent({}, runtime=None))
request = SimpleNamespace(
state=state_update,
system_message=SystemMessage(content="SYSTEM"),
override=lambda **kwargs: SimpleNamespace(
state=state_update,
system_message=kwargs.get("system_message", SystemMessage(content="SYSTEM")),
),
)
modified = middleware.modify_request(request)
system_text = str(modified.system_message.content)
assert "1 条活动记录" in system_text
assert "这是一条不应默认进入上下文的活动正文" not in system_text
assert "query_activity_log" in system_text
def test_activity_log_skips_trivial_greeting_without_llm(tmp_path):
"""无实际任务的寒暄不应调用 LLM也不应写入活动日志。"""
middleware = ActivityLogMiddleware(activity_dir=str(tmp_path))
summarize_mock = AsyncMock(return_value="不应写入")
append_mock = AsyncMock()
with (
patch(
"app.agent.middleware.activity_log._summarize_with_llm",
new=summarize_mock,
),
patch.object(middleware, "_append_activity", new=append_mock),
):
asyncio.run(
middleware.aafter_agent(
{
"messages": [
HumanMessage(content="你好"),
AIMessage(content="你好,有什么可以帮你?"),
],
},
runtime=None,
)
)
summarize_mock.assert_not_awaited()
append_mock.assert_not_awaited()
assert not list(tmp_path.glob("*.md"))
def test_summarize_with_llm_ignores_skip_marker():
"""LLM 返回 SKIP 时应视为无需记录活动日志。"""
llm = SimpleNamespace(
ainvoke=AsyncMock(return_value=SimpleNamespace(content="SKIP"))
)
with patch(
"app.agent.llm.LLMHelper.get_llm",
new=AsyncMock(return_value=llm),
):
summary = asyncio.run(_summarize_with_llm("用户: 你好"))
assert summary is None
llm.ainvoke.assert_awaited_once()
def test_activity_log_records_detailed_summary(tmp_path):
"""有实际工具动作的交互应写入较完整的活动摘要。"""
middleware = ActivityLogMiddleware(activity_dir=str(tmp_path))
summary = (
"用户要求整理 `/downloads/Show`,助手调用 transfer_file 识别并转移剧集,"
"结果成功写入目标媒体库。"
)
with patch(
"app.agent.middleware.activity_log._summarize_with_llm",
new=AsyncMock(return_value=summary),
):
asyncio.run(
middleware.aafter_agent(
{
"messages": [
HumanMessage(content="帮我整理 /downloads/Show"),
AIMessage(
content="",
tool_calls=[
{
"name": "transfer_file",
"args": {"path": "/downloads/Show"},
"id": "call_1",
}
],
),
ToolMessage(
content='{"success": true, "target": "/media/Show"}',
tool_call_id="call_1",
),
],
},
runtime=None,
)
)
log_files = list(tmp_path.glob("*.md"))
assert len(log_files) == 1
content = log_files[0].read_text(encoding="utf-8")
assert summary in content
assert "- **" in content
def test_query_activity_logs_filters_by_keyword_and_date(tmp_path):
"""活动日志查询应支持日期和关键词过滤。"""
_write_activity_log(
tmp_path,
"2026-06-18",
[
"- **10:00** 帮用户整理了电影 A",
"- **10:30** 查询了站点状态",
],
)
_write_activity_log(
tmp_path,
"2026-06-17",
["- **09:00** 帮用户整理了电影 B"],
)
payload = query_activity_logs(
str(tmp_path),
keyword="整理",
date="2026-06-18",
limit=10,
)
assert payload["success"] is True
assert payload["total_count"] == 1
assert payload["entries"][0]["date"] == "2026-06-18"
assert payload["entries"][0]["time"] == "10:00"
assert payload["entries"][0]["summary"] == "帮用户整理了电影 A"
def test_query_activity_logs_supports_optional_regex(tmp_path):
"""活动日志查询应在显式开启时支持正则匹配。"""
_write_activity_log(
tmp_path,
"2026-06-18",
[
"- **10:00** 帮用户整理了剧集 A",
"- **10:30** 查询了站点状态",
],
)
payload = query_activity_logs(
str(tmp_path),
keyword="整理|站点",
use_regex=True,
date="2026-06-18",
limit=10,
)
assert payload["success"] is True
assert payload["use_regex"] is True
assert payload["total_count"] == 2
def test_query_activity_logs_reports_invalid_regex(tmp_path):
"""活动日志查询遇到无效正则时应返回结构化错误。"""
payload = query_activity_logs(
str(tmp_path),
keyword="[",
use_regex=True,
date="2026-06-18",
)
assert payload["success"] is False
assert "无效的活动日志正则表达式" in payload["message"]
assert payload["entries"] == []
def test_query_activity_log_tool_returns_json_payload(tmp_path):
"""query_activity_log 工具应返回结构化 JSON 查询结果。"""
_write_activity_log(
tmp_path,
"2026-06-18",
["- **10:00** 帮用户整理了电影 A"],
)
tool = QueryActivityLogTool(session_id="activity-session", user_id="10001")
with patch(
"app.agent.tools.impl.query_activity_log.agent_runtime_manager.activity_dir",
tmp_path,
):
result = asyncio.run(
tool.run(keyword="整理", date="2026-06-18", limit=5)
)
payload = json.loads(result)
assert payload["success"] is True
assert payload["returned_count"] == 1
assert payload["entries"][0]["summary"] == "帮用户整理了电影 A"
def test_factory_registers_activity_log_tool():
"""工具工厂应注册活动日志查询工具。"""
with patch(
"app.agent.tools.factory.PluginManager.get_plugin_agent_tools",
return_value=[],
):
tools = MoviePilotToolFactory.create_tools(
session_id="activity-session",
user_id="10001",
)
tool_names = {tool.name for tool in tools}
assert "query_activity_log" in tool_names
def test_mcp_tool_manager_exposes_activity_log_tool():
"""MCP 工具管理器应暴露活动日志查询工具。"""
tool = QueryActivityLogTool(session_id="activity-session", user_id="10001")
with patch(
"app.agent.tools.manager.MoviePilotToolFactory.create_tools",
return_value=[tool],
):
manager = MoviePilotToolsManager(is_admin=True)
tool_definitions = manager.list_tools()
assert [item.name for item in tool_definitions] == ["query_activity_log"]
schema = tool_definitions[0].input_schema
assert "keyword" in schema["properties"]
assert "use_regex" in schema["properties"]
assert "date" in schema["properties"]

View File

@@ -330,14 +330,62 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
created["middleware"],
)
def test_send_message_tool_is_always_included_by_tool_selector(self):
async def test_create_agent_excludes_activity_log_without_message_context(self):
"""无渠道信息的后台捕获任务不应注入活动日志。"""
agent = MoviePilotAgent(
session_id="background-capture-session",
user_id="system",
output_callback=lambda _text: None,
)
agent._initialize_tools = lambda: []
agent._initialize_subagent_tools = lambda: []
with (
patch.object(settings, "LLM_MAX_TOOLS", 0),
patch.object(agent, "_initialize_llm", new=AsyncMock(return_value=object())),
patch("app.agent.prompt_manager.get_agent_prompt", return_value="PROMPT"),
patch("app.agent.create_subagent_middlewares", return_value=([], [])),
patch(
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"),
patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"),
patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs),
):
created = await agent._create_agent(streaming=False)
self.assertEqual(
["skills", "jobs", "runtime", "memory", "summary", "patch", "usage"],
created["middleware"],
)
def test_message_tool_is_not_always_included_by_tool_selector(self):
"""消息发送工具不应绕过工具筛选。"""
send_message_tool = SimpleNamespace(name="send_message")
always_include = MoviePilotToolFactory.get_tool_selector_always_include_names(
[send_message_tool]
)
self.assertIn("send_message", always_include)
self.assertNotIn("send_message", always_include)
def test_activity_log_tool_is_always_included_by_tool_selector(self):
"""活动日志查询工具应绕过工具筛选。"""
activity_log_tool = SimpleNamespace(name="query_activity_log")
always_include = MoviePilotToolFactory.get_tool_selector_always_include_names(
[activity_log_tool]
)
self.assertIn("query_activity_log", always_include)
async def test_create_agent_always_includes_subagent_tools(self):
"""工具筛选开启时应保留同步和异步子代理入口。"""
@@ -386,7 +434,12 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
self.assertIn(SUBAGENT_CONTROL_TOOL_NAME, captured["always_include"])
async def test_create_agent_keeps_activity_log_for_normal_session(self):
agent = MoviePilotAgent(session_id="normal-session", user_id="system")
agent = MoviePilotAgent(
session_id="normal-session",
user_id="system",
channel="Web",
source="openai",
)
agent._initialize_tools = lambda: []
agent._initialize_subagent_tools = lambda: []

View File

@@ -103,6 +103,7 @@ class ToolSelectorMiddlewareTest(unittest.TestCase):
self.assertIn("Return the answer in JSON only.", prompt)
self.assertIn('- search: Search for information', prompt)
self.assertIn('- calendar: Manage events', prompt)
self.assertIn("MoviePilot tool-chain hints:", prompt)
self.assertEqual(len(handled_requests), 1)
def test_awrap_model_call_reuses_first_selection_for_later_model_rounds(self):
@@ -221,3 +222,112 @@ class ToolSelectorMiddlewareTest(unittest.TestCase):
normalized = middleware._normalize_selection_response(response)
self.assertEqual(normalized, {"tools": ["search"]})
def test_process_selection_response_completes_low_count_tool_chain(self):
"""筛选结果过少时应按已命中的工具链补齐相邻工具。"""
tools = [
SimpleNamespace(name="search_media", description="Search media"),
SimpleNamespace(name="search_torrents", description="Search torrents"),
SimpleNamespace(name="get_search_results", description="Get results"),
SimpleNamespace(name="add_download_tasks", description="Add downloads"),
SimpleNamespace(name="query_download_tasks", description="Query downloads"),
]
middleware = tool_selector_module.ToolSelectorMiddleware(
max_tools=4,
selection_tools=tools,
)
request = _FakeRequest(
tools=tools,
messages=[HumanMessage(content="帮我下载流浪地球")],
model=_FakeModel(),
)
result = middleware._process_selection_response(
{"tools": ["search_media"]},
available_tools=tools,
valid_tool_names=[tool.name for tool in tools],
request=request,
)
self.assertEqual(
[tool.name for tool in result.tools],
[
"search_media",
"search_torrents",
"get_search_results",
"add_download_tasks",
],
)
def test_process_selection_response_keeps_high_count_selection(self):
"""筛选结果数量足够时不应额外补齐工具。"""
tools = [
SimpleNamespace(name="search_media", description="Search media"),
SimpleNamespace(name="search_torrents", description="Search torrents"),
SimpleNamespace(name="get_search_results", description="Get results"),
SimpleNamespace(name="query_sites", description="Query sites"),
]
middleware = tool_selector_module.ToolSelectorMiddleware(
max_tools=4,
selection_tools=tools,
)
request = _FakeRequest(
tools=tools,
messages=[HumanMessage(content="帮我下载流浪地球")],
model=_FakeModel(),
)
result = middleware._process_selection_response(
{
"tools": [
"search_media",
"search_torrents",
"get_search_results",
"query_sites",
]
},
available_tools=tools,
valid_tool_names=[tool.name for tool in tools],
request=request,
)
self.assertEqual(
[tool.name for tool in result.tools],
[
"search_media",
"search_torrents",
"get_search_results",
"query_sites",
],
)
def test_process_selection_response_respects_max_tools_when_completing(self):
"""工具链补齐不应突破 max_tools 上限。"""
tools = [
SimpleNamespace(name="list_directory", description="List directory"),
SimpleNamespace(name="query_directory_settings", description="Query settings"),
SimpleNamespace(name="recognize_media", description="Recognize media"),
SimpleNamespace(name="transfer_file", description="Transfer file"),
]
middleware = tool_selector_module.ToolSelectorMiddleware(
max_tools=2,
selection_tools=tools,
)
request = _FakeRequest(
tools=tools,
messages=[HumanMessage(content="帮我整理这个目录")],
model=_FakeModel(),
)
result = middleware._process_selection_response(
{"tools": ["transfer_file"]},
available_tools=tools,
valid_tool_names=[tool.name for tool in tools],
request=request,
)
self.assertEqual(len(result.tools), 2)
self.assertEqual(
{tool.name for tool in result.tools},
{"transfer_file", "list_directory"},
)

View File

@@ -0,0 +1,71 @@
from pathlib import Path
from unittest.mock import patch
from app.agent.tools.impl.transfer_file import TransferFileTool
def test_transfer_file_local_directory_without_trailing_slash_uses_dir(tmp_path):
"""本地目录路径即使没有尾斜杠,也应按目录交给整理链路。"""
source_dir = tmp_path / "Movie Folder"
source_dir.mkdir()
captured = {}
def _manual_transfer(self, **kwargs):
"""记录工具传给整理链路的参数。"""
captured.update(kwargs)
return True, None
with patch(
"app.chain.transfer.TransferChain.manual_transfer",
new=_manual_transfer,
):
result = TransferFileTool._transfer_file_sync(str(source_dir))
assert result == f"整理成功:{source_dir}"
assert captured["fileitem"].type == "dir"
assert captured["fileitem"].path == str(source_dir)
def test_transfer_file_local_file_uses_file(tmp_path):
"""本地文件路径应继续按文件交给整理链路。"""
source_file = tmp_path / "Movie.mkv"
source_file.write_text("fake media", encoding="utf-8")
captured = {}
def _manual_transfer(self, **kwargs):
"""记录工具传给整理链路的参数。"""
captured.update(kwargs)
return True, None
with patch(
"app.chain.transfer.TransferChain.manual_transfer",
new=_manual_transfer,
):
result = TransferFileTool._transfer_file_sync(str(source_file))
assert result == f"整理成功:{source_file}"
assert captured["fileitem"].type == "file"
assert captured["fileitem"].path == str(source_file)
def test_transfer_file_remote_directory_still_uses_trailing_slash():
"""远程存储无法用本地 stat 判断时,继续使用尾斜杠识别目录。"""
captured = {}
def _manual_transfer(self, **kwargs):
"""记录工具传给整理链路的参数。"""
captured.update(kwargs)
return True, None
with patch(
"app.chain.transfer.TransferChain.manual_transfer",
new=_manual_transfer,
):
result = TransferFileTool._transfer_file_sync(
"downloads/Show/",
storage="alist",
)
assert result == "整理成功:/downloads/Show/"
assert captured["fileitem"].type == "dir"
assert captured["fileitem"].path == "/downloads/Show/"