From 0c51d79be74695917bb5d7c9a0c2e6629e183ea3 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 9 Apr 2026 07:16:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(agent):=20=E5=90=88=E5=B9=B6=E5=90=8C?= =?UTF-8?q?=E6=89=B9=E6=AC=A1=E6=95=B4=E7=90=86=E5=A4=B1=E8=B4=A5=E7=9A=84?= =?UTF-8?q?agent=E9=87=8D=E8=AF=95=E8=B0=83=E7=94=A8=EF=BC=8C=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E9=87=8D=E5=A4=8D=E6=B5=AA=E8=B4=B9token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 同一download_hash或同一源目录下的失败记录在5分钟缓冲期内合并为一次agent调用, 批量处理时只识别一次媒体信息后复用到所有文件。 --- app/agent/__init__.py | 149 ++++++++++++++++++++------ app/chain/transfer.py | 22 +++- skills/transfer-failed-retry/SKILL.md | 68 ++++++++++-- 3 files changed, 197 insertions(+), 42 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index dfbab507..2342e43a 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -68,7 +68,7 @@ class _ThinkTagStripper: on_output(self.buffer[:start_idx]) emitted = True self.in_think_tag = True - self.buffer = self.buffer[start_idx + 7:] + self.buffer = self.buffer[start_idx + 7 :] else: # 检查是否以 的不完整前缀结尾 partial_match = False @@ -88,7 +88,7 @@ class _ThinkTagStripper: end_idx = self.buffer.find("") if end_idx != -1: self.in_think_tag = False - self.buffer = self.buffer[end_idx + 8:] + self.buffer = self.buffer[end_idx + 8 :] else: # 检查是否以 的不完整前缀结尾 partial_match = False @@ -506,12 +506,21 @@ class AgentManager: 同一会话的消息按顺序排队处理,不同会话之间互不影响。 """ + # 批量重试整理的等待时间(秒),同一批次内的失败记录会合并为一次agent调用 + RETRY_TRANSFER_DEBOUNCE_SECONDS = 300 + def __init__(self): self.active_agents: Dict[str, MoviePilotAgent] = {} # 每个会话的消息队列 self._session_queues: Dict[str, asyncio.Queue] = {} # 每个会话的worker任务 self._session_workers: Dict[str, asyncio.Task] = {} + # 重试整理的 debounce 缓冲区: group_key -> List[history_id] + self._retry_transfer_buffer: Dict[str, List[int]] = {} + # 重试整理的 debounce 定时器: group_key -> asyncio.TimerHandle + self._retry_transfer_timers: Dict[str, asyncio.TimerHandle] = {} + # 重试整理缓冲区锁 + self._retry_transfer_lock = asyncio.Lock() @staticmethod async def initialize(): @@ -525,6 +534,11 @@ class AgentManager: 关闭管理器 """ await memory_manager.close() + # 取消所有重试整理的延迟定时器 + for timer in self._retry_transfer_timers.values(): + timer.cancel() + self._retry_transfer_timers.clear() + self._retry_transfer_buffer.clear() # 取消所有会话worker for task in self._session_workers.values(): task.cancel() @@ -779,41 +793,112 @@ class AgentManager: except Exception as e: logger.error(f"智能体心跳唤醒失败: {e}") - async def retry_failed_transfer(self, history_id: int): + async def retry_failed_transfer(self, history_id: int, group_key: str = ""): """ 触发智能体重新整理失败的历史记录。 - 由文件整理模块在检测到整理失败后调用,使用独立会话执行。 + 由文件整理模块在检测到整理失败后调用。 + 同一 group_key 的失败记录会在缓冲期内合并为一次agent调用,避免重复浪费token。 :param history_id: 失败的整理历史记录ID + :param group_key: 分组键,相同key的记录会被合并处理(如download_hash、源目录等) """ + if not group_key: + group_key = f"_default_{history_id}" + + async with self._retry_transfer_lock: + # 将 history_id 加入缓冲区 + if group_key not in self._retry_transfer_buffer: + self._retry_transfer_buffer[group_key] = [] + if history_id not in self._retry_transfer_buffer[group_key]: + self._retry_transfer_buffer[group_key].append(history_id) + logger.info( + f"智能体重试整理:记录 ID={history_id} 已加入缓冲区 " + f"(group={group_key}, 当前{len(self._retry_transfer_buffer[group_key])}条)" + ) + + # 取消该分组的旧定时器 + if group_key in self._retry_transfer_timers: + self._retry_transfer_timers[group_key].cancel() + + # 设置新的延迟定时器 + loop = asyncio.get_running_loop() + self._retry_transfer_timers[group_key] = loop.call_later( + self.RETRY_TRANSFER_DEBOUNCE_SECONDS, + lambda gk=group_key: asyncio.ensure_future( + self._flush_retry_transfer(gk) + ), + ) + + async def _flush_retry_transfer(self, group_key: str): + """ + 延迟定时器到期后,取出该分组的所有 history_id 并合并为一次agent调用。 + """ + async with self._retry_transfer_lock: + history_ids = self._retry_transfer_buffer.pop(group_key, []) + self._retry_transfer_timers.pop(group_key, None) + + if not history_ids: + return + try: - # 每次使用唯一的 session_id,避免共享上下文 - session_id = f"__agent_retry_transfer_{history_id}_{uuid.uuid4().hex[:8]}__" + session_id = f"__agent_retry_transfer_batch_{uuid.uuid4().hex[:8]}__" user_id = "system" - logger.info(f"智能体重试整理:开始处理失败记录 ID={history_id} ...") - - # 英文提示词,便于大模型理解 - retry_message = ( - f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. " - f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n" - f"Failed transfer history record ID: {history_id}\n\n" - f"Follow these steps:\n" - f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_id} " - f"and understand the failure details (source path, error message, media info)\n" - f"2. Analyze the error message to determine the best retry strategy\n" - f"3. If the source file no longer exists, skip this retry and report that the file is missing\n" - f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_id}\n" - f"5. Re-identify the media using `recognize_media` with the source file path\n" - f"6. If recognition fails, try `search_media` with keywords from the filename\n" - f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n" - f"8. Report the final result\n\n" - f"IMPORTANT: This is a background system task, NOT a user conversation. " - f"Your final response will be broadcast as a notification. " - f"Only output a brief result summary. " - f"Do NOT include greetings, explanations, or conversational text. " - f"Respond in Chinese (中文)." + ids_str = ", ".join(str(i) for i in history_ids) + logger.info( + f"智能体重试整理:开始批量处理失败记录 IDs=[{ids_str}] (group={group_key})" ) + if len(history_ids) == 1: + # 单条记录,使用原有逻辑 + retry_message = ( + f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. " + f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n" + f"Failed transfer history record ID: {history_ids[0]}\n\n" + f"Follow these steps:\n" + f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_ids[0]} " + f"and understand the failure details (source path, error message, media info)\n" + f"2. Analyze the error message to determine the best retry strategy\n" + f"3. If the source file no longer exists, skip this retry and report that the file is missing\n" + f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_ids[0]}\n" + f"5. Re-identify the media using `recognize_media` with the source file path\n" + f"6. If recognition fails, try `search_media` with keywords from the filename\n" + f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n" + f"8. Report the final result\n\n" + f"IMPORTANT: This is a background system task, NOT a user conversation. " + f"Your final response will be broadcast as a notification. " + f"Only output a brief result summary. " + f"Do NOT include greetings, explanations, or conversational text. " + f"Respond in Chinese (中文)." + ) + else: + # 多条记录,使用批量处理逻辑 + retry_message = ( + f"[System Task - Batch Transfer Failed Retry] Multiple file transfers from the same source " + f"have failed. These files likely belong to the SAME media (e.g., multiple episodes of the same TV show). " + f"Please use the 'transfer-failed-retry' skill to retry them efficiently.\n\n" + f"Failed transfer history record IDs: {ids_str}\n" + f"Total failed records: {len(history_ids)}\n\n" + f"Follow these steps:\n" + f"1. Use `query_transfer_history` with status='failed' to find ALL records with these IDs " + f"and understand the failure details\n" + f"2. Since these files are likely from the same media, analyze the FIRST record to determine " + f"the media identity and the best retry strategy. The root cause is usually the same for all files.\n" + f"3. If the error is about media recognition (e.g., '未识别到媒体信息'), identify the media ONCE " + f"using `recognize_media` or `search_media`, then reuse that result (tmdbid, media_type) for all files\n" + f"4. For EACH failed record:\n" + f" a. Delete the failed history record using `delete_transfer_history`\n" + f" b. Re-transfer using `transfer_file` with the source path and the identified media info\n" + f"5. Report a summary of results (how many succeeded, how many failed)\n\n" + f"IMPORTANT OPTIMIZATION: These files share the same media identity. " + f"Do NOT call `recognize_media` or `search_media` repeatedly for each file. " + f"Identify the media ONCE, then apply to all files.\n\n" + f"IMPORTANT: This is a background system task, NOT a user conversation. " + f"Your final response will be broadcast as a notification. " + f"Only output a brief result summary. " + f"Do NOT include greetings, explanations, or conversational text. " + f"Respond in Chinese (中文)." + ) + await self.process_message( session_id=session_id, user_id=user_id, @@ -834,13 +919,17 @@ class AgentManager: except asyncio.CancelledError: pass - logger.info(f"智能体重试整理:记录 ID={history_id} 处理完成") + logger.info( + f"智能体重试整理:批量处理完成 IDs=[{ids_str}] (group={group_key})" + ) # 用完即弃,清理资源 await self.clear_session(session_id, user_id) except Exception as e: - logger.error(f"智能体重试整理失败 (ID={history_id}): {e}") + logger.error( + f"智能体重试整理失败 (IDs=[{ids_str}], group={group_key}): {e}" + ) # 全局智能体管理器实例 diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 48fce582..5a46d3d1 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -799,8 +799,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): try: from app.agent import agent_manager + # 使用 download_hash 或源文件父目录作为分组键, + # 同一批次(如同一个种子)的失败记录会被合并为一次agent调用 + group_key = ( + task.download_hash or str(task.fileitem.path).rsplit("/", 1)[0] + if task.fileitem + else "" + ) asyncio.run_coroutine_threadsafe( - agent_manager.retry_failed_transfer(history.id), + agent_manager.retry_failed_transfer( + history.id, group_key=group_key + ), global_vars.loop, ) logger.info(f"已触发AI智能体重试整理历史记录 #{history.id}") @@ -1127,8 +1136,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): try: from app.agent import agent_manager + # 使用 download_hash 或源文件父目录作为分组键 + group_key = ( + task.download_hash + or str(task.fileitem.path).rsplit("/", 1)[0] + if task.fileitem + else "" + ) asyncio.run_coroutine_threadsafe( - agent_manager.retry_failed_transfer(his.id), + agent_manager.retry_failed_transfer( + his.id, group_key=group_key + ), global_vars.loop, ) logger.info(f"已触发AI智能体重试整理历史记录 #{his.id}") diff --git a/skills/transfer-failed-retry/SKILL.md b/skills/transfer-failed-retry/SKILL.md index 3cd84b11..475ae929 100644 --- a/skills/transfer-failed-retry/SKILL.md +++ b/skills/transfer-failed-retry/SKILL.md @@ -1,12 +1,13 @@ --- name: transfer-failed-retry -description: Use this skill when you need to retry a failed file transfer/organization. Given a failed transfer history record ID, this skill guides you through querying the failure details, deleting the old record, and re-identifying and re-organizing the file. This skill is automatically triggered when the system detects a transfer failure and the AI agent retry feature is enabled. +version: 1 +description: Use this skill when you need to retry failed file transfers/organizations. Given one or more failed transfer history record IDs, this skill guides you through querying the failure details, deleting the old records, and re-identifying and re-organizing the files. Supports batch processing of multiple files from the same media (e.g., multiple episodes of a TV show). This skill is automatically triggered when the system detects transfer failures and the AI agent retry feature is enabled. allowed-tools: query_transfer_history delete_transfer_history recognize_media transfer_file search_media --- # Transfer Failed Retry (整理失败重试) -This skill handles retrying failed file transfers/organizations. When a file transfer fails, you can use this skill to analyze the failure, remove the stale history record, and attempt to re-identify and re-organize the file. +This skill handles retrying failed file transfers/organizations. When file transfers fail, you can use this skill to analyze the failures, remove stale history records, and attempt to re-identify and re-organize the files. It supports both single-file and batch retry scenarios. ## Prerequisites @@ -21,15 +22,15 @@ You need the following tools: ### Step 1: Query the Failed Transfer History -Use `query_transfer_history` to get details about the failed record. Filter by status `failed` to find the specific record. +Use `query_transfer_history` to get details about the failed record(s). Filter by status `failed` to find the specific records. -If you are given a specific history record ID, query with that ID to understand the failure context: +If you are given a specific history record ID (or multiple IDs), query with those IDs to understand the failure context: ``` query_transfer_history(status="failed") ``` -From the record, extract the following key information: +From each record, extract the following key information: - **id**: The history record ID - **src**: Source file path - **title**: The recognized title (may be incorrect) @@ -53,9 +54,9 @@ Common failure reasons and how to handle them: | 未找到有效的集数信息 | Episode number not recognized | Use `recognize_media` with the file path to get better metadata, or specify season/episode in `transfer_file` | | 未获取到转移目录设置 | No transfer directory configured for this media type | Cannot auto-fix - notify user about directory configuration | -### Step 3: Delete the Failed History Record +### Step 3: Delete the Failed History Record(s) -Before retrying, you **must** delete the old failed history record. The system skips files that already have a transfer history entry (even failed ones). +Before retrying, you **must** delete the old failed history record(s). The system skips files that already have a transfer history entry (even failed ones). ``` delete_transfer_history(history_id=) @@ -101,8 +102,54 @@ For TV shows where episode info couldn't be determined: ### Step 5: Report Result After the retry attempt, report the result: -- If successful: Confirm the file has been organized correctly +- If successful: Confirm the file(s) have been organized correctly - If failed again: Report the new error and suggest manual intervention +- For batch operations: Report a summary (e.g., "成功 8/10,失败 2/10") + +## Batch Processing (批量处理) + +When multiple files from the same source fail simultaneously (e.g., 10 episodes of the same TV show all fail with the same error), the system groups them and triggers a single batch retry. + +### Key Optimization Rules for Batch Processing: + +1. **Identify media ONCE, apply to ALL files**: Since batch files typically belong to the same media, perform media recognition (`recognize_media`) or search (`search_media`) only ONCE using the first file, then reuse the result (tmdbid, media_type) for all subsequent files. + +2. **Process each file individually for delete + transfer**: Even though the media identity is shared, you must still: + - Delete each failed history record individually + - Transfer each file individually (they have different source paths) + +3. **Stop early if root cause is unfixable**: If the first file fails due to an unfixable issue (e.g., missing directory configuration), skip all remaining files with the same error rather than retrying each one. + +4. **Process in order**: Handle files sequentially to avoid race conditions. + +### Batch Example Flow: + +``` +# Given failed records: IDs = [42, 43, 44, 45] (4 episodes of the same show) +# All have errmsg="未识别到媒体信息" + +# 1. Query all failed records +query_transfer_history(status="failed") + +# 2. Identify media ONCE using the first file +recognize_media(path="/downloads/Show.Name.S01E01.1080p.mkv") +# Found: tmdb_id=789, media_type="tv" + +# 3. For each record: delete history, then re-transfer +delete_transfer_history(history_id=42) +transfer_file(file_path="/downloads/Show.Name.S01E01.1080p.mkv", tmdbid=789, media_type="tv") + +delete_transfer_history(history_id=43) +transfer_file(file_path="/downloads/Show.Name.S01E02.1080p.mkv", tmdbid=789, media_type="tv") + +delete_transfer_history(history_id=44) +transfer_file(file_path="/downloads/Show.Name.S01E03.1080p.mkv", tmdbid=789, media_type="tv") + +delete_transfer_history(history_id=45) +transfer_file(file_path="/downloads/Show.Name.S01E04.1080p.mkv", tmdbid=789, media_type="tv") + +# 4. Report summary: "重试完成:4/4 成功" +``` ## Important Notes @@ -111,9 +158,10 @@ After the retry attempt, report the result: - **Do not retry** if the error is about missing directory configuration - this requires user intervention. - **For unrecognized media**, always try `recognize_media` with the file path first before falling back to `search_media`. - **Be cautious with TV shows** - ensure the correct season and episode information is used. -- When this skill is triggered automatically by the system, it provides the `history_id` directly. Start from Step 1 with that specific ID. +- **For batch processing**, always reuse media identification results across all files to save time and resources. +- When this skill is triggered automatically by the system, it provides the `history_id`(s) directly. Start from Step 1 with those specific IDs. -## Example: Complete Retry Flow +## Example: Single File Retry Flow ``` # 1. Query the failed record