feat(agent): 合并同批次整理失败的agent重试调用,避免重复浪费token

同一download_hash或同一源目录下的失败记录在5分钟缓冲期内合并为一次agent调用,
批量处理时只识别一次媒体信息后复用到所有文件。
This commit is contained in:
jxxghp
2026-04-09 07:16:56 +08:00
parent 1b489ba581
commit 0c51d79be7
3 changed files with 197 additions and 42 deletions

View File

@@ -68,7 +68,7 @@ class _ThinkTagStripper:
on_output(self.buffer[:start_idx])
emitted = True
self.in_think_tag = True
self.buffer = self.buffer[start_idx + 7:]
self.buffer = self.buffer[start_idx + 7 :]
else:
# 检查是否以 <think> 的不完整前缀结尾
partial_match = False
@@ -88,7 +88,7 @@ class _ThinkTagStripper:
end_idx = self.buffer.find("</think>")
if end_idx != -1:
self.in_think_tag = False
self.buffer = self.buffer[end_idx + 8:]
self.buffer = self.buffer[end_idx + 8 :]
else:
# 检查是否以 </think> 的不完整前缀结尾
partial_match = False
@@ -506,12 +506,21 @@ class AgentManager:
同一会话的消息按顺序排队处理,不同会话之间互不影响。
"""
# 批量重试整理的等待时间同一批次内的失败记录会合并为一次agent调用
RETRY_TRANSFER_DEBOUNCE_SECONDS = 300
def __init__(self):
self.active_agents: Dict[str, MoviePilotAgent] = {}
# 每个会话的消息队列
self._session_queues: Dict[str, asyncio.Queue] = {}
# 每个会话的worker任务
self._session_workers: Dict[str, asyncio.Task] = {}
# 重试整理的 debounce 缓冲区: group_key -> List[history_id]
self._retry_transfer_buffer: Dict[str, List[int]] = {}
# 重试整理的 debounce 定时器: group_key -> asyncio.TimerHandle
self._retry_transfer_timers: Dict[str, asyncio.TimerHandle] = {}
# 重试整理缓冲区锁
self._retry_transfer_lock = asyncio.Lock()
@staticmethod
async def initialize():
@@ -525,6 +534,11 @@ class AgentManager:
关闭管理器
"""
await memory_manager.close()
# 取消所有重试整理的延迟定时器
for timer in self._retry_transfer_timers.values():
timer.cancel()
self._retry_transfer_timers.clear()
self._retry_transfer_buffer.clear()
# 取消所有会话worker
for task in self._session_workers.values():
task.cancel()
@@ -779,41 +793,112 @@ class AgentManager:
except Exception as e:
logger.error(f"智能体心跳唤醒失败: {e}")
async def retry_failed_transfer(self, history_id: int):
async def retry_failed_transfer(self, history_id: int, group_key: str = ""):
"""
触发智能体重新整理失败的历史记录。
由文件整理模块在检测到整理失败后调用,使用独立会话执行
由文件整理模块在检测到整理失败后调用。
同一 group_key 的失败记录会在缓冲期内合并为一次agent调用避免重复浪费token。
:param history_id: 失败的整理历史记录ID
:param group_key: 分组键相同key的记录会被合并处理如download_hash、源目录等
"""
if not group_key:
group_key = f"_default_{history_id}"
async with self._retry_transfer_lock:
# 将 history_id 加入缓冲区
if group_key not in self._retry_transfer_buffer:
self._retry_transfer_buffer[group_key] = []
if history_id not in self._retry_transfer_buffer[group_key]:
self._retry_transfer_buffer[group_key].append(history_id)
logger.info(
f"智能体重试整理:记录 ID={history_id} 已加入缓冲区 "
f"(group={group_key}, 当前{len(self._retry_transfer_buffer[group_key])}条)"
)
# 取消该分组的旧定时器
if group_key in self._retry_transfer_timers:
self._retry_transfer_timers[group_key].cancel()
# 设置新的延迟定时器
loop = asyncio.get_running_loop()
self._retry_transfer_timers[group_key] = loop.call_later(
self.RETRY_TRANSFER_DEBOUNCE_SECONDS,
lambda gk=group_key: asyncio.ensure_future(
self._flush_retry_transfer(gk)
),
)
async def _flush_retry_transfer(self, group_key: str):
"""
延迟定时器到期后,取出该分组的所有 history_id 并合并为一次agent调用。
"""
async with self._retry_transfer_lock:
history_ids = self._retry_transfer_buffer.pop(group_key, [])
self._retry_transfer_timers.pop(group_key, None)
if not history_ids:
return
try:
# 每次使用唯一的 session_id避免共享上下文
session_id = f"__agent_retry_transfer_{history_id}_{uuid.uuid4().hex[:8]}__"
session_id = f"__agent_retry_transfer_batch_{uuid.uuid4().hex[:8]}__"
user_id = "system"
logger.info(f"智能体重试整理:开始处理失败记录 ID={history_id} ...")
# 英文提示词,便于大模型理解
retry_message = (
f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. "
f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n"
f"Failed transfer history record ID: {history_id}\n\n"
f"Follow these steps:\n"
f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_id} "
f"and understand the failure details (source path, error message, media info)\n"
f"2. Analyze the error message to determine the best retry strategy\n"
f"3. If the source file no longer exists, skip this retry and report that the file is missing\n"
f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_id}\n"
f"5. Re-identify the media using `recognize_media` with the source file path\n"
f"6. If recognition fails, try `search_media` with keywords from the filename\n"
f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n"
f"8. Report the final result\n\n"
f"IMPORTANT: This is a background system task, NOT a user conversation. "
f"Your final response will be broadcast as a notification. "
f"Only output a brief result summary. "
f"Do NOT include greetings, explanations, or conversational text. "
f"Respond in Chinese (中文)."
ids_str = ", ".join(str(i) for i in history_ids)
logger.info(
f"智能体重试整理:开始批量处理失败记录 IDs=[{ids_str}] (group={group_key})"
)
if len(history_ids) == 1:
# 单条记录,使用原有逻辑
retry_message = (
f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. "
f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n"
f"Failed transfer history record ID: {history_ids[0]}\n\n"
f"Follow these steps:\n"
f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_ids[0]} "
f"and understand the failure details (source path, error message, media info)\n"
f"2. Analyze the error message to determine the best retry strategy\n"
f"3. If the source file no longer exists, skip this retry and report that the file is missing\n"
f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_ids[0]}\n"
f"5. Re-identify the media using `recognize_media` with the source file path\n"
f"6. If recognition fails, try `search_media` with keywords from the filename\n"
f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n"
f"8. Report the final result\n\n"
f"IMPORTANT: This is a background system task, NOT a user conversation. "
f"Your final response will be broadcast as a notification. "
f"Only output a brief result summary. "
f"Do NOT include greetings, explanations, or conversational text. "
f"Respond in Chinese (中文)."
)
else:
# 多条记录,使用批量处理逻辑
retry_message = (
f"[System Task - Batch Transfer Failed Retry] Multiple file transfers from the same source "
f"have failed. These files likely belong to the SAME media (e.g., multiple episodes of the same TV show). "
f"Please use the 'transfer-failed-retry' skill to retry them efficiently.\n\n"
f"Failed transfer history record IDs: {ids_str}\n"
f"Total failed records: {len(history_ids)}\n\n"
f"Follow these steps:\n"
f"1. Use `query_transfer_history` with status='failed' to find ALL records with these IDs "
f"and understand the failure details\n"
f"2. Since these files are likely from the same media, analyze the FIRST record to determine "
f"the media identity and the best retry strategy. The root cause is usually the same for all files.\n"
f"3. If the error is about media recognition (e.g., '未识别到媒体信息'), identify the media ONCE "
f"using `recognize_media` or `search_media`, then reuse that result (tmdbid, media_type) for all files\n"
f"4. For EACH failed record:\n"
f" a. Delete the failed history record using `delete_transfer_history`\n"
f" b. Re-transfer using `transfer_file` with the source path and the identified media info\n"
f"5. Report a summary of results (how many succeeded, how many failed)\n\n"
f"IMPORTANT OPTIMIZATION: These files share the same media identity. "
f"Do NOT call `recognize_media` or `search_media` repeatedly for each file. "
f"Identify the media ONCE, then apply to all files.\n\n"
f"IMPORTANT: This is a background system task, NOT a user conversation. "
f"Your final response will be broadcast as a notification. "
f"Only output a brief result summary. "
f"Do NOT include greetings, explanations, or conversational text. "
f"Respond in Chinese (中文)."
)
await self.process_message(
session_id=session_id,
user_id=user_id,
@@ -834,13 +919,17 @@ class AgentManager:
except asyncio.CancelledError:
pass
logger.info(f"智能体重试整理:记录 ID={history_id} 处理完成")
logger.info(
f"智能体重试整理:批量处理完成 IDs=[{ids_str}] (group={group_key})"
)
# 用完即弃,清理资源
await self.clear_session(session_id, user_id)
except Exception as e:
logger.error(f"智能体重试整理失败 (ID={history_id}): {e}")
logger.error(
f"智能体重试整理失败 (IDs=[{ids_str}], group={group_key}): {e}"
)
# 全局智能体管理器实例

View File

@@ -799,8 +799,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
try:
from app.agent import agent_manager
# 使用 download_hash 或源文件父目录作为分组键,
# 同一批次如同一个种子的失败记录会被合并为一次agent调用
group_key = (
task.download_hash or str(task.fileitem.path).rsplit("/", 1)[0]
if task.fileitem
else ""
)
asyncio.run_coroutine_threadsafe(
agent_manager.retry_failed_transfer(history.id),
agent_manager.retry_failed_transfer(
history.id, group_key=group_key
),
global_vars.loop,
)
logger.info(f"已触发AI智能体重试整理历史记录 #{history.id}")
@@ -1127,8 +1136,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
try:
from app.agent import agent_manager
# 使用 download_hash 或源文件父目录作为分组键
group_key = (
task.download_hash
or str(task.fileitem.path).rsplit("/", 1)[0]
if task.fileitem
else ""
)
asyncio.run_coroutine_threadsafe(
agent_manager.retry_failed_transfer(his.id),
agent_manager.retry_failed_transfer(
his.id, group_key=group_key
),
global_vars.loop,
)
logger.info(f"已触发AI智能体重试整理历史记录 #{his.id}")

View File

@@ -1,12 +1,13 @@
---
name: transfer-failed-retry
description: Use this skill when you need to retry a failed file transfer/organization. Given a failed transfer history record ID, this skill guides you through querying the failure details, deleting the old record, and re-identifying and re-organizing the file. This skill is automatically triggered when the system detects a transfer failure and the AI agent retry feature is enabled.
version: 1
description: Use this skill when you need to retry failed file transfers/organizations. Given one or more failed transfer history record IDs, this skill guides you through querying the failure details, deleting the old records, and re-identifying and re-organizing the files. Supports batch processing of multiple files from the same media (e.g., multiple episodes of a TV show). This skill is automatically triggered when the system detects transfer failures and the AI agent retry feature is enabled.
allowed-tools: query_transfer_history delete_transfer_history recognize_media transfer_file search_media
---
# Transfer Failed Retry (整理失败重试)
This skill handles retrying failed file transfers/organizations. When a file transfer fails, you can use this skill to analyze the failure, remove the stale history record, and attempt to re-identify and re-organize the file.
This skill handles retrying failed file transfers/organizations. When file transfers fail, you can use this skill to analyze the failures, remove stale history records, and attempt to re-identify and re-organize the files. It supports both single-file and batch retry scenarios.
## Prerequisites
@@ -21,15 +22,15 @@ You need the following tools:
### Step 1: Query the Failed Transfer History
Use `query_transfer_history` to get details about the failed record. Filter by status `failed` to find the specific record.
Use `query_transfer_history` to get details about the failed record(s). Filter by status `failed` to find the specific records.
If you are given a specific history record ID, query with that ID to understand the failure context:
If you are given a specific history record ID (or multiple IDs), query with those IDs to understand the failure context:
```
query_transfer_history(status="failed")
```
From the record, extract the following key information:
From each record, extract the following key information:
- **id**: The history record ID
- **src**: Source file path
- **title**: The recognized title (may be incorrect)
@@ -53,9 +54,9 @@ Common failure reasons and how to handle them:
| 未找到有效的集数信息 | Episode number not recognized | Use `recognize_media` with the file path to get better metadata, or specify season/episode in `transfer_file` |
| 未获取到转移目录设置 | No transfer directory configured for this media type | Cannot auto-fix - notify user about directory configuration |
### Step 3: Delete the Failed History Record
### Step 3: Delete the Failed History Record(s)
Before retrying, you **must** delete the old failed history record. The system skips files that already have a transfer history entry (even failed ones).
Before retrying, you **must** delete the old failed history record(s). The system skips files that already have a transfer history entry (even failed ones).
```
delete_transfer_history(history_id=<record_id>)
@@ -101,8 +102,54 @@ For TV shows where episode info couldn't be determined:
### Step 5: Report Result
After the retry attempt, report the result:
- If successful: Confirm the file has been organized correctly
- If successful: Confirm the file(s) have been organized correctly
- If failed again: Report the new error and suggest manual intervention
- For batch operations: Report a summary (e.g., "成功 8/10失败 2/10")
## Batch Processing (批量处理)
When multiple files from the same source fail simultaneously (e.g., 10 episodes of the same TV show all fail with the same error), the system groups them and triggers a single batch retry.
### Key Optimization Rules for Batch Processing:
1. **Identify media ONCE, apply to ALL files**: Since batch files typically belong to the same media, perform media recognition (`recognize_media`) or search (`search_media`) only ONCE using the first file, then reuse the result (tmdbid, media_type) for all subsequent files.
2. **Process each file individually for delete + transfer**: Even though the media identity is shared, you must still:
- Delete each failed history record individually
- Transfer each file individually (they have different source paths)
3. **Stop early if root cause is unfixable**: If the first file fails due to an unfixable issue (e.g., missing directory configuration), skip all remaining files with the same error rather than retrying each one.
4. **Process in order**: Handle files sequentially to avoid race conditions.
### Batch Example Flow:
```
# Given failed records: IDs = [42, 43, 44, 45] (4 episodes of the same show)
# All have errmsg="未识别到媒体信息"
# 1. Query all failed records
query_transfer_history(status="failed")
# 2. Identify media ONCE using the first file
recognize_media(path="/downloads/Show.Name.S01E01.1080p.mkv")
# Found: tmdb_id=789, media_type="tv"
# 3. For each record: delete history, then re-transfer
delete_transfer_history(history_id=42)
transfer_file(file_path="/downloads/Show.Name.S01E01.1080p.mkv", tmdbid=789, media_type="tv")
delete_transfer_history(history_id=43)
transfer_file(file_path="/downloads/Show.Name.S01E02.1080p.mkv", tmdbid=789, media_type="tv")
delete_transfer_history(history_id=44)
transfer_file(file_path="/downloads/Show.Name.S01E03.1080p.mkv", tmdbid=789, media_type="tv")
delete_transfer_history(history_id=45)
transfer_file(file_path="/downloads/Show.Name.S01E04.1080p.mkv", tmdbid=789, media_type="tv")
# 4. Report summary: "重试完成4/4 成功"
```
## Important Notes
@@ -111,9 +158,10 @@ After the retry attempt, report the result:
- **Do not retry** if the error is about missing directory configuration - this requires user intervention.
- **For unrecognized media**, always try `recognize_media` with the file path first before falling back to `search_media`.
- **Be cautious with TV shows** - ensure the correct season and episode information is used.
- When this skill is triggered automatically by the system, it provides the `history_id` directly. Start from Step 1 with that specific ID.
- **For batch processing**, always reuse media identification results across all files to save time and resources.
- When this skill is triggered automatically by the system, it provides the `history_id`(s) directly. Start from Step 1 with those specific IDs.
## Example: Complete Retry Flow
## Example: Single File Retry Flow
```
# 1. Query the failed record