Add manual AI redo flow

This commit is contained in:
jxxghp
2026-04-15 17:10:18 +08:00
parent 65a4b7438c
commit 04c2a1eb18
2 changed files with 195 additions and 4 deletions

View File

@@ -28,6 +28,7 @@ from app.agent.prompt import prompt_manager
from app.agent.tools.factory import MoviePilotToolFactory
from app.chain import ChainBase
from app.core.config import settings
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.llm import LLMHelper
from app.log import logger
from app.schemas import Notification, NotificationType
@@ -132,6 +133,10 @@ class MoviePilotAgent:
self.username = username
self.reply_with_voice = False
self._tool_context: Dict[str, object] = {}
self.output_callback: Optional[Callable[[str], None]] = None
self.force_streaming = False
self.suppress_user_reply = False
self._streamed_output = ""
# 流式token管理
self.stream_handler = StreamingHandler()
@@ -153,9 +158,11 @@ class MoviePilotAgent:
- 其他情况不启用流式输出
"""
if self.is_background:
return False
return self.force_streaming or callable(self.output_callback)
if self.reply_with_voice:
return False
if self.force_streaming or callable(self.output_callback):
return True
# 啰嗦模式下始终需要流式输出来捕获工具调用前的 Agent 文字
if settings.AI_AGENT_VERBOSE:
return True
@@ -208,6 +215,20 @@ class MoviePilotAgent:
return "".join(text_parts)
return str(content)
def _emit_output(self, text: str):
"""
输出当前流式文本到外部回调。
"""
if not text:
return
self._streamed_output += text
if not callable(self.output_callback):
return
try:
self.output_callback(self._streamed_output)
except Exception as e:
logger.debug(f"智能体输出回调失败: {e}")
def _initialize_tools(self) -> List:
"""
初始化工具列表
@@ -301,6 +322,7 @@ class MoviePilotAgent:
"user_reply_sent": False,
"reply_mode": None,
}
self._streamed_output = ""
# 获取历史消息
messages = memory_manager.get_agent_messages(
@@ -332,6 +354,8 @@ class MoviePilotAgent:
except Exception as e:
error_message = f"处理消息时发生错误: {str(e)}"
logger.error(error_message)
if self.suppress_user_reply:
raise
await self.send_agent_message(error_message)
return error_message
@@ -432,7 +456,7 @@ class MoviePilotAgent:
agent=agent,
messages={"messages": messages},
config=agent_config,
on_token=self.stream_handler.emit,
on_token=lambda token: (self.stream_handler.emit(token), self._emit_output(token)),
)
# 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本
@@ -445,7 +469,13 @@ class MoviePilotAgent:
# 流式输出未能发送全部内容(发送失败等)
# 通过常规方式发送剩余内容
remaining_text = await self.stream_handler.take()
if remaining_text and not self._tool_context.get("user_reply_sent"):
if remaining_text and not self._streamed_output:
self._emit_output(remaining_text)
if (
remaining_text
and not self.suppress_user_reply
and not self._tool_context.get("user_reply_sent")
):
await self.send_agent_message(remaining_text)
elif streamed_text:
# 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录
@@ -475,7 +505,14 @@ class MoviePilotAgent:
final_text = text.strip()
break
if final_text and not self._tool_context.get("user_reply_sent"):
if final_text and not self._streamed_output:
self._emit_output(final_text)
if (
final_text
and not self.suppress_user_reply
and not self._tool_context.get("user_reply_sent")
):
if self.is_background:
# 后台任务仅广播最终回复,带标题
await self.send_agent_message(
@@ -1003,6 +1040,95 @@ class AgentManager:
f"智能体重试整理失败 (IDs=[{ids_str}], group={group_key}): {e}"
)
@staticmethod
def _build_manual_redo_prompt(history) -> str:
"""
构建手动 AI 整理提示词。
"""
src_fileitem = history.src_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_path = source_path or history.src or ""
season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
return "\n".join(
[
"[System Task - Manual Transfer Re-Organize]",
"A user manually triggered an AI re-organize task from the transfer history page.",
"Your goal is to directly fix ONE transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again.",
"",
"IMPORTANT:",
"1. This is NOT a normal conversation. It is a background execution task.",
"2. Do NOT rely on previous chat context. Work only from the record below.",
"3. You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`.",
"4. Your final response must be a brief Chinese result summary only.",
"",
"Transfer history record:",
f"- History ID: {history.id}",
f"- Current status: {'success' if history.status else 'failed'}",
f"- Current recognized title: {history.title or 'unknown'}",
f"- Media type: {history.type or 'unknown'}",
f"- Category: {history.category or 'unknown'}",
f"- Year: {history.year or 'unknown'}",
f"- Season/Episode: {season_episode or 'unknown'}",
f"- Source path: {source_path or 'unknown'}",
f"- Source storage: {history.src_storage or 'local'}",
f"- Destination path: {history.dest or 'unknown'}",
f"- Destination storage: {history.dest_storage or 'unknown'}",
f"- Transfer mode: {history.mode or 'unknown'}",
f"- Current TMDB ID: {history.tmdbid or 'none'}",
f"- Current Douban ID: {history.doubanid or 'none'}",
f"- Error message: {history.errmsg or 'none'}",
"",
"Required workflow:",
f"1. Use `query_transfer_history` to locate and inspect the record with id={history.id}, and verify the source path, status, media info, and failure context.",
"2. Decide whether the current recognition is trustworthy.",
"3. If the source file no longer exists or cannot be safely processed, stop and report the reason.",
"4. If the current recognition is wrong or the record should be reorganized, determine the correct media identity first.",
"5. Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename/title/year.",
"6. Only continue when you have high confidence in the target media.",
"7. Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file.",
"8. Then use `transfer_file` to organize the source path directly.",
"9. When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid/doubanid, and media_type.",
"10. If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary.",
"",
"Important execution rules:",
"- Do NOT reorganize blindly when media identity is uncertain.",
"- If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`.",
"- Keep the final response short, in Chinese, and focused on outcome.",
]
)
async def manual_redo_transfer(
self,
history_id: int,
output_callback: Optional[Callable[[str], None]] = None,
) -> None:
"""
手动触发单条历史记录的 AI 整理。
"""
session_id = f"__agent_manual_redo_{history_id}_{uuid.uuid4().hex[:8]}__"
user_id = "system"
agent = MoviePilotAgent(
session_id=session_id,
user_id=user_id,
channel=None,
source=None,
username=settings.SUPERUSER,
)
agent.output_callback = output_callback
agent.force_streaming = True
agent.suppress_user_reply = True
try:
history = TransferHistoryOper().get(history_id)
if not history:
raise ValueError(f"整理记录不存在: {history_id}")
await agent.process(self._build_manual_redo_prompt(history))
finally:
await agent.cleanup()
memory_manager.clear_memory(session_id, user_id)
# 全局智能体管理器实例
agent_manager = AgentManager()

View File

@@ -1,3 +1,5 @@
import asyncio
import time
from typing import List, Any, Optional
import jieba
@@ -8,6 +10,7 @@ from pathlib import Path
from app import schemas
from app.chain.storage import StorageChain
from app.core.config import settings, global_vars
from app.core.event import eventmanager
from app.core.security import verify_token
from app.db import get_async_db, get_db
@@ -15,11 +18,51 @@ from app.db.models import User
from app.db.models.downloadhistory import DownloadHistory, DownloadFiles
from app.db.models.transferhistory import TransferHistory
from app.db.user_oper import get_current_active_superuser_async, get_current_active_superuser
from app.helper.progress import ProgressHelper
from app.schemas.types import EventType
router = APIRouter()
def _start_ai_redo_task(history_id: int, progress_key: str):
from app.agent import agent_manager
progress = ProgressHelper(progress_key)
progress.start()
progress.update(
text=f"智能助正在准备整理记录 #{history_id} ...",
data={"history_id": history_id, "success": True},
)
def update_output(text: str):
progress.update(text=text, data={"history_id": history_id})
async def runner():
try:
await agent_manager.manual_redo_transfer(
history_id=history_id,
output_callback=update_output,
)
progress.update(
text="智能助手整理完成",
data={"history_id": history_id, "success": True, "completed": True},
)
except Exception as e:
progress.update(
text=f"智能助手整理失败:{str(e)}",
data={
"history_id": history_id,
"success": False,
"completed": True,
"error": str(e),
},
)
finally:
progress.end()
asyncio.run_coroutine_threadsafe(runner(), global_vars.loop)
@router.get("/download", summary="查询下载历史记录", response_model=List[schemas.DownloadHistory])
async def download_history(page: Optional[int] = 1,
count: Optional[int] = 30,
@@ -114,6 +157,28 @@ def delete_transfer_history(history_in: schemas.TransferHistory,
return schemas.Response(success=True)
@router.post("/transfer/{history_id}/ai-redo", summary="智能助手重新整理", response_model=schemas.Response)
def ai_redo_transfer_history(
history_id: int,
db: Session = Depends(get_db),
_: User = Depends(get_current_active_superuser),
) -> Any:
"""
手动触发单条历史记录的 AI 重新整理,并返回进度键。
"""
if not settings.AI_AGENT_ENABLE:
return schemas.Response(success=False, message="MoviePilot智能助手未启用")
history = TransferHistory.get(db, history_id)
if not history:
return schemas.Response(success=False, message="整理记录不存在")
progress_key = f"ai_redo_transfer_{history_id}_{int(time.time() * 1000)}"
_start_ai_redo_task(history_id=history_id, progress_key=progress_key)
return schemas.Response(success=True, data={"progress_key": progress_key})
@router.get("/empty/transfer", summary="清空整理记录", response_model=schemas.Response)
async def empty_transfer_history(db: AsyncSession = Depends(get_async_db),
_: User = Depends(get_current_active_superuser_async)) -> Any: