fix(agent): clean old media file before AI redo (#5980)

This commit is contained in:
InfinityPacer
2026-06-21 07:39:39 +08:00
committed by GitHub
parent e02cebe16c
commit f0368e359a
7 changed files with 542 additions and 116 deletions

View File

@@ -0,0 +1,87 @@
"""整理记录 AI 重新整理提示词构造。"""
from typing import Any
from app.agent.prompt import prompt_manager
def build_manual_redo_template_context(history: Any) -> dict[str, int | str]:
"""把整理历史对象映射成 System Tasks 需要的模板变量。"""
src_fileitem = history.src_fileitem or {}
dest_fileitem = history.dest_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_storage = history.src_storage or "local"
if history.status and history.mode == "move":
dest_path = dest_fileitem.get("path") if isinstance(dest_fileitem, dict) else ""
if dest_path:
source_path = dest_path
source_storage = history.dest_storage or "local"
source_path = source_path or history.src or ""
season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
return {
"history_id": history.id,
"current_status": "success" if history.status else "failed",
"recognized_title": history.title or "unknown",
"media_type": history.type or "unknown",
"category": history.category or "unknown",
"year": history.year or "unknown",
"season_episode": season_episode or "unknown",
"source_path": source_path or "unknown",
"source_storage": source_storage,
"destination_path": history.dest or "unknown",
"destination_storage": history.dest_storage or "unknown",
"transfer_mode": history.mode or "unknown",
"tmdbid": history.tmdbid or "none",
"doubanid": history.doubanid or "none",
"error_message": history.errmsg or "none",
}
def format_manual_redo_record_context(history: Any) -> str:
"""把单条整理记录格式化为批量任务可直接消费的上下文块。"""
context = build_manual_redo_template_context(history)
return "\n".join(
[
f"Record #{context['history_id']}:",
f"- Current status: {context['current_status']}",
f"- Current recognized title: {context['recognized_title']}",
f"- Media type: {context['media_type']}",
f"- Category: {context['category']}",
f"- Year: {context['year']}",
f"- Season/Episode: {context['season_episode']}",
f"- Source path: {context['source_path']}",
f"- Source storage: {context['source_storage']}",
f"- Destination path: {context['destination_path']}",
f"- Destination storage: {context['destination_storage']}",
f"- Transfer mode: {context['transfer_mode']}",
f"- Current TMDB ID: {context['tmdbid']}",
f"- Current Douban ID: {context['doubanid']}",
f"- Error message: {context['error_message']}",
]
)
def build_manual_redo_prompt(history: Any) -> str:
"""构建手动 AI 整理提示词。"""
return prompt_manager.render_system_task_message(
"manual_transfer_redo",
template_context=build_manual_redo_template_context(history),
)
def build_batch_manual_redo_template_context(histories: list[Any]) -> dict[str, int | str]:
"""把多条整理历史对象映射成批量 System Tasks 需要的模板变量。"""
return {
"history_ids_csv": ", ".join(str(history.id) for history in histories),
"history_count": len(histories),
"records_context": "\n\n".join(
format_manual_redo_record_context(history) for history in histories
),
}
def build_batch_manual_redo_prompt(histories: list[Any]) -> str:
"""构建批量手动 AI 整理提示词。"""
return prompt_manager.render_system_task_message(
"batch_manual_transfer_redo",
template_context=build_batch_manual_redo_template_context(histories),
)

View File

@@ -6,8 +6,10 @@ from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.tags import ToolTag
from app.chain.storage import StorageChain
from app.db.transferhistory_oper import TransferHistoryOper
from app.log import logger
from app.schemas import FileItem
class DeleteTransferHistoryInput(BaseModel):
@@ -27,7 +29,11 @@ class DeleteTransferHistoryTool(MoviePilotTool):
ToolTag.Transfer,
ToolTag.Admin,
]
description: str = "Delete a specific transfer history record by its ID. This is useful when you need to remove a failed transfer record before retrying the transfer, as the system skips files that already have transfer history."
description: str = (
"Delete a specific transfer history record by its ID. For non-successful-move records with an old "
"destination file, the tool removes that media-library file before deleting the history record. This is "
"useful before retrying or re-organizing because the system skips files that already have transfer history."
)
args_schema: Type[BaseModel] = DeleteTransferHistoryInput
require_admin: bool = True
@@ -48,10 +54,21 @@ class DeleteTransferHistoryTool(MoviePilotTool):
title = history.title or "未知"
src = history.src or "未知"
status = "成功" if history.status else "失败"
deleted_dest = False
if history.dest_fileitem and not (history.status and history.mode == "move"):
dest_fileitem = FileItem(**history.dest_fileitem)
storage_chain = StorageChain()
if storage_chain.exists(dest_fileitem):
if not storage_chain.delete_media_file(dest_fileitem):
return f"错误:旧媒体库文件删除失败,路径={dest_fileitem.path}"
deleted_dest = True
await transferhis.async_delete(history_id)
return (
message = (
f"已删除整理历史记录ID={history_id},标题={title},源路径={src},状态={status}"
)
if deleted_dest:
message += ",已删除旧媒体库文件"
return message
except Exception as e:
logger.error(f"删除整理历史记录失败: {e}", exc_info=True)
return f"删除整理历史记录时发生错误: {str(e)}"

View File

@@ -8,7 +8,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app import schemas
from app.agent import ReplyMode, prompt_manager, agent_manager
from app.agent import ReplyMode, agent_manager
from app.agent.prompt.transfer_redo import (
build_batch_manual_redo_prompt,
build_manual_redo_prompt,
)
from app.chain.storage import StorageChain
from app.core.config import settings, global_vars
from app.core.event import eventmanager
@@ -37,86 +41,6 @@ def normalize_history_ids(history_ids: list[int]) -> list[int]:
return normalized_ids
def build_manual_redo_template_context(
history: TransferHistory,
) -> dict[str, int | str]:
"""仅负责把整理历史对象映射成 System Tasks 需要的模板变量。"""
src_fileitem = history.src_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_path = source_path or history.src or ""
season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
return {
"history_id": history.id,
"current_status": "success" if history.status else "failed",
"recognized_title": history.title or "unknown",
"media_type": history.type or "unknown",
"category": history.category or "unknown",
"year": history.year or "unknown",
"season_episode": season_episode or "unknown",
"source_path": source_path or "unknown",
"source_storage": history.src_storage or "local",
"destination_path": history.dest or "unknown",
"destination_storage": history.dest_storage or "unknown",
"transfer_mode": history.mode or "unknown",
"tmdbid": history.tmdbid or "none",
"doubanid": history.doubanid or "none",
"error_message": history.errmsg or "none",
}
def format_manual_redo_record_context(history: Any) -> str:
"""把单条整理记录格式化为批量任务可直接消费的上下文块。"""
context = build_manual_redo_template_context(history)
return "\n".join(
[
f"Record #{context['history_id']}:",
f"- Current status: {context['current_status']}",
f"- Current recognized title: {context['recognized_title']}",
f"- Media type: {context['media_type']}",
f"- Category: {context['category']}",
f"- Year: {context['year']}",
f"- Season/Episode: {context['season_episode']}",
f"- Source path: {context['source_path']}",
f"- Source storage: {context['source_storage']}",
f"- Destination path: {context['destination_path']}",
f"- Destination storage: {context['destination_storage']}",
f"- Transfer mode: {context['transfer_mode']}",
f"- Current TMDB ID: {context['tmdbid']}",
f"- Current Douban ID: {context['doubanid']}",
f"- Error message: {context['error_message']}",
]
)
def build_manual_redo_prompt(history: Any) -> str:
"""构建手动 AI 整理提示词。"""
return prompt_manager.render_system_task_message(
"manual_transfer_redo",
template_context=build_manual_redo_template_context(history),
)
def build_batch_manual_redo_template_context(
histories: list[Any],
) -> dict[str, int | str]:
"""仅负责把多条整理历史对象映射成批量 System Tasks 需要的模板变量。"""
return {
"history_ids_csv": ", ".join(str(history.id) for history in histories),
"history_count": len(histories),
"records_context": "\n\n".join(
format_manual_redo_record_context(history) for history in histories
),
}
def build_batch_manual_redo_prompt(histories: list[Any]) -> str:
"""构建批量手动 AI 整理提示词。"""
return prompt_manager.render_system_task_message(
"batch_manual_transfer_redo",
template_context=build_batch_manual_redo_template_context(histories),
)
def _start_ai_redo_task(history_id: int, prompt: str, progress_key: str):
"""在后台线程中启动单条 AI 重新整理任务,并通过 ProgressHelper 实时更新进度。"""
progress = ProgressHelper(progress_key)

View File

@@ -11,8 +11,9 @@ from pathlib import Path
from typing import Any, Optional, Dict, Union, List, Tuple
from urllib.parse import unquote, urlparse
from app.agent import ReplyMode, agent_manager, prompt_manager
from app.agent import ReplyMode, agent_manager
from app.agent.llm import AgentCapabilityManager, LLMHelper
from app.agent.prompt.transfer_redo import build_manual_redo_prompt
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.chain.media import MediaChain
@@ -872,36 +873,6 @@ class MessageChain(ChainBase):
由智能助手接管一条失败的整理记录。
"""
def __build_manual_redo_prompt(his: TransferHistory) -> str:
"""构建手动 AI 整理提示词。"""
src_fileitem = his.src_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_path = source_path or his.src or ""
season_episode = f"{his.seasons or ''}{his.episodes or ''}".strip()
# 键名必须与 System Tasks.yaml 中 manual_transfer_redo 模板的占位符一致
template_context = {
"history_id": his.id,
"current_status": "success" if his.status else "failed",
"recognized_title": his.title or "unknown",
"media_type": his.type or "unknown",
"category": his.category or "unknown",
"year": his.year or "unknown",
"season_episode": season_episode or "unknown",
"source_path": source_path or "unknown",
"source_storage": his.src_storage or "local",
"destination_path": his.dest or "unknown",
"destination_storage": his.dest_storage or "unknown",
"transfer_mode": his.mode or "unknown",
"tmdbid": his.tmdbid or "none",
"doubanid": his.doubanid or "none",
"error_message": his.errmsg or "none",
}
return prompt_manager.render_system_task_message(
"manual_transfer_redo",
template_context=template_context,
)
if not settings.AI_AGENT_ENABLE:
self.post_message(
Notification(
@@ -931,7 +902,7 @@ class MessageChain(ChainBase):
)
return
redo_prompt = __build_manual_redo_prompt(history)
redo_prompt = build_manual_redo_prompt(history)
self.post_message(
Notification(

View File

@@ -0,0 +1,355 @@
import asyncio
from types import SimpleNamespace
from app.agent.tools.impl.delete_transfer_history import DeleteTransferHistoryTool
from app.agent.prompt.transfer_redo import build_manual_redo_template_context
def test_delete_transfer_history_tool_removes_old_dest_file_before_history(monkeypatch):
"""AI 重新整理删除整理记录前,应按历史目标文件清理旧媒体库文件。"""
calls = []
history = SimpleNamespace(
id=7,
title="奔跑吧",
src="/downloads/Keep.Running.mkv",
status=True,
mode="link",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
)
class FakeTransferHistoryOper:
async def async_get(self, history_id):
calls.append(("get", history_id))
return history
async def async_delete(self, history_id):
calls.append(("delete_history", history_id))
class FakeStorageChain:
def exists(self, fileitem):
calls.append(("exists_dest", fileitem.path))
return True
def delete_media_file(self, fileitem):
calls.append(("delete_dest", fileitem.path))
return True
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.TransferHistoryOper",
FakeTransferHistoryOper,
)
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.StorageChain",
FakeStorageChain,
)
tool = DeleteTransferHistoryTool(session_id="redo-session", user_id="10001")
result = asyncio.run(tool.run(history_id=7))
assert "已删除整理历史记录" in result
assert calls == [
("get", 7),
("exists_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_history", 7),
]
def test_delete_transfer_history_tool_keeps_history_when_old_dest_delete_fails(monkeypatch):
"""旧媒体库文件删除失败时不得删除整理记录,避免重整链路丢失回滚依据。"""
calls = []
history = SimpleNamespace(
id=8,
title="奔跑吧",
src="/downloads/Keep.Running.mkv",
status=True,
mode="copy",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
)
class FakeTransferHistoryOper:
async def async_get(self, history_id):
calls.append(("get", history_id))
return history
async def async_delete(self, history_id):
calls.append(("delete_history", history_id))
class FakeStorageChain:
def exists(self, fileitem):
calls.append(("exists_dest", fileitem.path))
return True
def delete_media_file(self, fileitem):
calls.append(("delete_dest", fileitem.path))
return False
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.TransferHistoryOper",
FakeTransferHistoryOper,
)
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.StorageChain",
FakeStorageChain,
)
tool = DeleteTransferHistoryTool(session_id="redo-session", user_id="10001")
result = asyncio.run(tool.run(history_id=8))
assert "旧媒体库文件删除失败" in result
assert calls == [
("get", 8),
("exists_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
]
def test_delete_transfer_history_tool_deletes_history_when_old_dest_is_missing(monkeypatch):
"""旧媒体库文件已不存在时应视为已清理,继续删除整理记录。"""
calls = []
history = SimpleNamespace(
id=13,
title="奔跑吧",
src="/downloads/Keep.Running.mkv",
status=True,
mode="link",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
)
class FakeTransferHistoryOper:
async def async_get(self, history_id):
calls.append(("get", history_id))
return history
async def async_delete(self, history_id):
calls.append(("delete_history", history_id))
class FakeStorageChain:
def exists(self, fileitem):
calls.append(("exists_dest", fileitem.path))
return False
def delete_media_file(self, fileitem):
calls.append(("delete_dest", fileitem.path))
return False
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.TransferHistoryOper",
FakeTransferHistoryOper,
)
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.StorageChain",
FakeStorageChain,
)
tool = DeleteTransferHistoryTool(session_id="redo-session", user_id="10001")
result = asyncio.run(tool.run(history_id=13))
assert "已删除整理历史记录" in result
assert "已删除旧媒体库文件" not in result
assert calls == [
("get", 13),
("exists_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_history", 13),
]
def test_delete_transfer_history_tool_keeps_successful_move_dest_as_reorganize_source(monkeypatch):
"""成功 move 记录的目标文件是重新整理输入,不应在删除历史时先删除。"""
calls = []
history = SimpleNamespace(
id=9,
title="奔跑吧",
src="/downloads/Keep.Running.mkv",
status=True,
mode="move",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
)
class FakeTransferHistoryOper:
async def async_get(self, history_id):
calls.append(("get", history_id))
return history
async def async_delete(self, history_id):
calls.append(("delete_history", history_id))
class FakeStorageChain:
def exists(self, fileitem):
calls.append(("exists_dest", fileitem.path))
return True
def delete_media_file(self, fileitem):
calls.append(("delete_dest", fileitem.path))
return True
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.TransferHistoryOper",
FakeTransferHistoryOper,
)
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.StorageChain",
FakeStorageChain,
)
tool = DeleteTransferHistoryTool(session_id="redo-session", user_id="10001")
result = asyncio.run(tool.run(history_id=9))
assert "已删除整理历史记录" in result
assert calls == [
("get", 9),
("delete_history", 9),
]
def test_delete_transfer_history_tool_only_treats_exact_move_as_reorganize_source(monkeypatch):
"""整理方式必须精确等于 move其他模式仍应清理旧目标文件。"""
calls = []
history = SimpleNamespace(
id=11,
title="奔跑吧",
src="/downloads/Keep.Running.mkv",
status=True,
mode="not-move",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
)
class FakeTransferHistoryOper:
async def async_get(self, history_id):
calls.append(("get", history_id))
return history
async def async_delete(self, history_id):
calls.append(("delete_history", history_id))
class FakeStorageChain:
def exists(self, fileitem):
calls.append(("exists_dest", fileitem.path))
return True
def delete_media_file(self, fileitem):
calls.append(("delete_dest", fileitem.path))
return True
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.TransferHistoryOper",
FakeTransferHistoryOper,
)
monkeypatch.setattr(
"app.agent.tools.impl.delete_transfer_history.StorageChain",
FakeStorageChain,
)
tool = DeleteTransferHistoryTool(session_id="redo-session", user_id="10001")
result = asyncio.run(tool.run(history_id=11))
assert "已删除旧媒体库文件" in result
assert calls == [
("get", 11),
("exists_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_dest", "/library/奔跑吧 (2014)/Keep.Running.mkv"),
("delete_history", 11),
]
def test_manual_redo_context_uses_dest_path_for_successful_move_record():
"""成功 move 记录重新整理时,旧目标文件才是可继续整理的输入路径。"""
history = SimpleNamespace(
id=10,
status=True,
title="奔跑吧",
type="电视剧",
category="综艺",
year="2014",
seasons="S01",
episodes="E01",
src="/downloads/Keep.Running.mkv",
src_storage="local",
src_fileitem={
"storage": "local",
"path": "/downloads/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
dest="/library/奔跑吧 (2014)/Keep.Running.mkv",
dest_storage="local",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
mode="move",
tmdbid=100,
doubanid=None,
errmsg=None,
)
context = build_manual_redo_template_context(history)
assert context["source_path"] == "/library/奔跑吧 (2014)/Keep.Running.mkv"
assert context["source_storage"] == "local"
def test_manual_redo_context_only_treats_exact_move_as_dest_source():
"""非 move 整理方式即使名称包含 move也应继续使用原始来源。"""
history = SimpleNamespace(
id=12,
status=True,
title="奔跑吧",
type="电视剧",
category="综艺",
year="2014",
seasons="S01",
episodes="E01",
src="/downloads/Keep.Running.mkv",
src_storage="local",
src_fileitem={
"storage": "local",
"path": "/downloads/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
dest="/library/奔跑吧 (2014)/Keep.Running.mkv",
dest_storage="local",
dest_fileitem={
"storage": "local",
"path": "/library/奔跑吧 (2014)/Keep.Running.mkv",
"name": "Keep.Running.mkv",
"type": "file",
},
mode="not-move",
tmdbid=100,
doubanid=None,
errmsg=None,
)
context = build_manual_redo_template_context(history)
assert context["source_path"] == "/downloads/Keep.Running.mkv"
assert context["source_storage"] == "local"

View File

@@ -1,7 +1,7 @@
from types import SimpleNamespace
from app.agent.prompt import prompt_manager
from app.api.endpoints.history import build_batch_manual_redo_prompt
from app.agent.prompt.transfer_redo import build_batch_manual_redo_prompt
def test_batch_manual_redo_prompt_requires_plain_text_result():
@@ -20,6 +20,7 @@ def test_batch_manual_redo_prompt_requires_plain_text_result():
src_storage="local",
dest="/media/a.mkv",
dest_storage="local",
dest_fileitem=None,
mode="copy",
tmdbid=123,
doubanid=None,

View File

@@ -1,4 +1,5 @@
import unittest
import asyncio
import sys
from types import ModuleType
from types import SimpleNamespace
@@ -89,6 +90,7 @@ class TestTransferFailedRetryButtons(unittest.TestCase):
src_fileitem={"path": "/downloads/Test.Show.S01E01.mkv"},
dest=None,
dest_storage=None,
dest_fileitem=None,
mode="copy",
tmdbid=123,
doubanid=None,
@@ -122,3 +124,72 @@ class TestTransferFailedRetryButtons(unittest.TestCase):
post_message.call_args_list[0].args[0].title,
"已将整理记录 #34 交给智能助手处理",
)
def test_transfer_ai_retry_callback_uses_successful_move_dest_as_source(self):
chain = MessageChain()
captured = {}
history = SimpleNamespace(
id=35,
status=True,
title="Test Show",
type="电视剧",
category=None,
year="2024",
seasons="S01",
episodes="E01",
src="/downloads/Test.Show.S01E01.mkv",
src_storage="local",
src_fileitem={"path": "/downloads/Test.Show.S01E01.mkv"},
dest="/library/Test Show (2024)/Season 1/Test.Show.S01E01.mkv",
dest_storage="local",
dest_fileitem={
"storage": "local",
"path": "/library/Test Show (2024)/Season 1/Test.Show.S01E01.mkv",
"name": "Test.Show.S01E01.mkv",
"type": "file",
},
mode="move",
tmdbid=123,
doubanid=None,
errmsg=None,
)
def _run_pending_coro(coro, *args, **kwargs):
asyncio.run(coro)
return SimpleNamespace()
async def fake_run_background_prompt(**kwargs):
captured["message"] = kwargs["message"]
output_callback = kwargs.get("output_callback")
if output_callback:
output_callback("ok")
async def fake_async_post_message(*args, **kwargs):
return None
with patch.object(settings, "AI_AGENT_ENABLE", True):
with patch(
"app.chain.message.TransferHistoryOper"
) as history_oper_cls, patch(
"app.chain.message.agent_manager.run_background_prompt",
side_effect=fake_run_background_prompt,
), patch(
"app.chain.message.asyncio.run_coroutine_threadsafe",
side_effect=_run_pending_coro,
):
history_oper_cls.return_value.get.return_value = history
with patch.object(chain, "post_message"), patch.object(
chain, "async_post_message", side_effect=fake_async_post_message
):
chain._handle_callback(
text="CALLBACK:transfer_ai_retry_35",
channel=MessageChannel.Telegram,
source="telegram-test",
userid="10001",
username="tester",
)
self.assertIn(
"- Source path: /library/Test Show (2024)/Season 1/Test.Show.S01E01.mkv",
captured["message"],
)