From 04c2a1eb18471ddac1cf62acc7180c7b065927db Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 15 Apr 2026 17:10:18 +0800 Subject: [PATCH] Add manual AI redo flow --- app/agent/__init__.py | 134 +++++++++++++++++++++++++++++++++-- app/api/endpoints/history.py | 65 +++++++++++++++++ 2 files changed, 195 insertions(+), 4 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 974d8285..b349875b 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -28,6 +28,7 @@ from app.agent.prompt import prompt_manager from app.agent.tools.factory import MoviePilotToolFactory from app.chain import ChainBase from app.core.config import settings +from app.db.transferhistory_oper import TransferHistoryOper from app.helper.llm import LLMHelper from app.log import logger from app.schemas import Notification, NotificationType @@ -132,6 +133,10 @@ class MoviePilotAgent: self.username = username self.reply_with_voice = False self._tool_context: Dict[str, object] = {} + self.output_callback: Optional[Callable[[str], None]] = None + self.force_streaming = False + self.suppress_user_reply = False + self._streamed_output = "" # 流式token管理 self.stream_handler = StreamingHandler() @@ -153,9 +158,11 @@ class MoviePilotAgent: - 其他情况不启用流式输出 """ if self.is_background: - return False + return self.force_streaming or callable(self.output_callback) if self.reply_with_voice: return False + if self.force_streaming or callable(self.output_callback): + return True # 啰嗦模式下始终需要流式输出来捕获工具调用前的 Agent 文字 if settings.AI_AGENT_VERBOSE: return True @@ -208,6 +215,20 @@ class MoviePilotAgent: return "".join(text_parts) return str(content) + def _emit_output(self, text: str): + """ + 输出当前流式文本到外部回调。 + """ + if not text: + return + self._streamed_output += text + if not callable(self.output_callback): + return + try: + self.output_callback(self._streamed_output) + except Exception as e: + logger.debug(f"智能体输出回调失败: {e}") + def _initialize_tools(self) -> List: """ 初始化工具列表 @@ -301,6 +322,7 @@ class MoviePilotAgent: "user_reply_sent": False, "reply_mode": None, } + self._streamed_output = "" # 获取历史消息 messages = memory_manager.get_agent_messages( @@ -332,6 +354,8 @@ class MoviePilotAgent: except Exception as e: error_message = f"处理消息时发生错误: {str(e)}" logger.error(error_message) + if self.suppress_user_reply: + raise await self.send_agent_message(error_message) return error_message @@ -432,7 +456,7 @@ class MoviePilotAgent: agent=agent, messages={"messages": messages}, config=agent_config, - on_token=self.stream_handler.emit, + on_token=lambda token: (self.stream_handler.emit(token), self._emit_output(token)), ) # 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本 @@ -445,7 +469,13 @@ class MoviePilotAgent: # 流式输出未能发送全部内容(发送失败等) # 通过常规方式发送剩余内容 remaining_text = await self.stream_handler.take() - if remaining_text and not self._tool_context.get("user_reply_sent"): + if remaining_text and not self._streamed_output: + self._emit_output(remaining_text) + if ( + remaining_text + and not self.suppress_user_reply + and not self._tool_context.get("user_reply_sent") + ): await self.send_agent_message(remaining_text) elif streamed_text: # 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录 @@ -475,7 +505,14 @@ class MoviePilotAgent: final_text = text.strip() break - if final_text and not self._tool_context.get("user_reply_sent"): + if final_text and not self._streamed_output: + self._emit_output(final_text) + + if ( + final_text + and not self.suppress_user_reply + and not self._tool_context.get("user_reply_sent") + ): if self.is_background: # 后台任务仅广播最终回复,带标题 await self.send_agent_message( @@ -1003,6 +1040,95 @@ class AgentManager: f"智能体重试整理失败 (IDs=[{ids_str}], group={group_key}): {e}" ) + @staticmethod + def _build_manual_redo_prompt(history) -> str: + """ + 构建手动 AI 整理提示词。 + """ + src_fileitem = history.src_fileitem or {} + source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else "" + source_path = source_path or history.src or "" + season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip() + + return "\n".join( + [ + "[System Task - Manual Transfer Re-Organize]", + "A user manually triggered an AI re-organize task from the transfer history page.", + "Your goal is to directly fix ONE transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again.", + "", + "IMPORTANT:", + "1. This is NOT a normal conversation. It is a background execution task.", + "2. Do NOT rely on previous chat context. Work only from the record below.", + "3. You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`.", + "4. Your final response must be a brief Chinese result summary only.", + "", + "Transfer history record:", + f"- History ID: {history.id}", + f"- Current status: {'success' if history.status else 'failed'}", + f"- Current recognized title: {history.title or 'unknown'}", + f"- Media type: {history.type or 'unknown'}", + f"- Category: {history.category or 'unknown'}", + f"- Year: {history.year or 'unknown'}", + f"- Season/Episode: {season_episode or 'unknown'}", + f"- Source path: {source_path or 'unknown'}", + f"- Source storage: {history.src_storage or 'local'}", + f"- Destination path: {history.dest or 'unknown'}", + f"- Destination storage: {history.dest_storage or 'unknown'}", + f"- Transfer mode: {history.mode or 'unknown'}", + f"- Current TMDB ID: {history.tmdbid or 'none'}", + f"- Current Douban ID: {history.doubanid or 'none'}", + f"- Error message: {history.errmsg or 'none'}", + "", + "Required workflow:", + f"1. Use `query_transfer_history` to locate and inspect the record with id={history.id}, and verify the source path, status, media info, and failure context.", + "2. Decide whether the current recognition is trustworthy.", + "3. If the source file no longer exists or cannot be safely processed, stop and report the reason.", + "4. If the current recognition is wrong or the record should be reorganized, determine the correct media identity first.", + "5. Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename/title/year.", + "6. Only continue when you have high confidence in the target media.", + "7. Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file.", + "8. Then use `transfer_file` to organize the source path directly.", + "9. When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid/doubanid, and media_type.", + "10. If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary.", + "", + "Important execution rules:", + "- Do NOT reorganize blindly when media identity is uncertain.", + "- If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`.", + "- Keep the final response short, in Chinese, and focused on outcome.", + ] + ) + + async def manual_redo_transfer( + self, + history_id: int, + output_callback: Optional[Callable[[str], None]] = None, + ) -> None: + """ + 手动触发单条历史记录的 AI 整理。 + """ + session_id = f"__agent_manual_redo_{history_id}_{uuid.uuid4().hex[:8]}__" + user_id = "system" + agent = MoviePilotAgent( + session_id=session_id, + user_id=user_id, + channel=None, + source=None, + username=settings.SUPERUSER, + ) + agent.output_callback = output_callback + agent.force_streaming = True + agent.suppress_user_reply = True + + try: + history = TransferHistoryOper().get(history_id) + if not history: + raise ValueError(f"整理记录不存在: {history_id}") + + await agent.process(self._build_manual_redo_prompt(history)) + finally: + await agent.cleanup() + memory_manager.clear_memory(session_id, user_id) + # 全局智能体管理器实例 agent_manager = AgentManager() diff --git a/app/api/endpoints/history.py b/app/api/endpoints/history.py index 4b0708e3..fa048c78 100644 --- a/app/api/endpoints/history.py +++ b/app/api/endpoints/history.py @@ -1,3 +1,5 @@ +import asyncio +import time from typing import List, Any, Optional import jieba @@ -8,6 +10,7 @@ from pathlib import Path from app import schemas from app.chain.storage import StorageChain +from app.core.config import settings, global_vars from app.core.event import eventmanager from app.core.security import verify_token from app.db import get_async_db, get_db @@ -15,11 +18,51 @@ from app.db.models import User from app.db.models.downloadhistory import DownloadHistory, DownloadFiles from app.db.models.transferhistory import TransferHistory from app.db.user_oper import get_current_active_superuser_async, get_current_active_superuser +from app.helper.progress import ProgressHelper from app.schemas.types import EventType router = APIRouter() +def _start_ai_redo_task(history_id: int, progress_key: str): + from app.agent import agent_manager + + progress = ProgressHelper(progress_key) + progress.start() + progress.update( + text=f"智能助正在准备整理记录 #{history_id} ...", + data={"history_id": history_id, "success": True}, + ) + + def update_output(text: str): + progress.update(text=text, data={"history_id": history_id}) + + async def runner(): + try: + await agent_manager.manual_redo_transfer( + history_id=history_id, + output_callback=update_output, + ) + progress.update( + text="智能助手整理完成", + data={"history_id": history_id, "success": True, "completed": True}, + ) + except Exception as e: + progress.update( + text=f"智能助手整理失败:{str(e)}", + data={ + "history_id": history_id, + "success": False, + "completed": True, + "error": str(e), + }, + ) + finally: + progress.end() + + asyncio.run_coroutine_threadsafe(runner(), global_vars.loop) + + @router.get("/download", summary="查询下载历史记录", response_model=List[schemas.DownloadHistory]) async def download_history(page: Optional[int] = 1, count: Optional[int] = 30, @@ -114,6 +157,28 @@ def delete_transfer_history(history_in: schemas.TransferHistory, return schemas.Response(success=True) +@router.post("/transfer/{history_id}/ai-redo", summary="智能助手重新整理", response_model=schemas.Response) +def ai_redo_transfer_history( + history_id: int, + db: Session = Depends(get_db), + _: User = Depends(get_current_active_superuser), +) -> Any: + """ + 手动触发单条历史记录的 AI 重新整理,并返回进度键。 + """ + if not settings.AI_AGENT_ENABLE: + return schemas.Response(success=False, message="MoviePilot智能助手未启用") + + history = TransferHistory.get(db, history_id) + if not history: + return schemas.Response(success=False, message="整理记录不存在") + + progress_key = f"ai_redo_transfer_{history_id}_{int(time.time() * 1000)}" + _start_ai_redo_task(history_id=history_id, progress_key=progress_key) + + return schemas.Response(success=True, data={"progress_key": progress_key}) + + @router.get("/empty/transfer", summary="清空整理记录", response_model=schemas.Response) async def empty_transfer_history(db: AsyncSession = Depends(get_async_db), _: User = Depends(get_current_active_superuser_async)) -> Any: