fix: prevent repeated scans after history-based exits

Only mark downloader tasks as organized after a transfer history record exists, including existing-history skips and unrecognized media failures.
This commit is contained in:
jxxghp
2026-05-10 10:25:39 +08:00
parent 79eb128196
commit adb7aa6aa9
2 changed files with 169 additions and 10 deletions

View File

@@ -1075,10 +1075,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
__notify()
# 只要该种子的所有任务都已整理完成,则设置种子状态为已整理
if task.download_hash and self.jobview.is_torrent_done(task.download_hash):
self.transfer_completed(
hashs=task.download_hash, downloader=task.downloader
)
self.__mark_torrent_completed_if_done(task.download_hash, task.downloader)
# 移动模式,全部成功时删除空目录和种子文件
if transferinfo.transfer_type in ["move"]:
@@ -1136,6 +1133,22 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
return self.jobview.add_task(task)
def __mark_torrent_completed_if_done(
self,
download_hash: Optional[str],
downloader: Optional[str],
history_exists: bool = True,
):
"""
当同一种子的任务都已结束时,回写下载器已整理标签。
"""
if (
history_exists
and download_hash
and self.jobview.is_torrent_done(download_hash)
):
self.transfer_completed(hashs=download_hash, downloader=downloader)
def remove_from_queue(self, fileitem: FileItem):
"""
从待整理队列移除
@@ -1149,10 +1162,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
标记异常整理任务失败并清理作业视图
"""
self.jobview.fail_unfinished_task(task)
if task.download_hash and self.jobview.is_torrent_done(task.download_hash):
self.transfer_completed(
hashs=task.download_hash, downloader=task.downloader
)
self.jobview.try_remove_job(task)
def __start_transfer(self):
@@ -1328,6 +1337,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
)
# 任务失败直接移除task
self.jobview.remove_task(task.fileitem)
self.__mark_torrent_completed_if_done(
task.download_hash, task.downloader
)
# AI智能体自动重试整理
if (
@@ -1953,10 +1965,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
logger.info(f"正在计划整理 {len(file_items)} 个文件...")
planned_file_count = len(file_items)
logger.info(f"正在计划整理 {planned_file_count} 个文件...")
# 整理所有文件
transfer_tasks: List[TransferTask] = []
skipped_history_count = 0
skipped_torrents = set()
try:
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
@@ -1971,8 +1986,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
file_item.path, storage=file_item.storage
)
if transferd:
skipped_history_count += 1
if not transferd.status:
all_success = False
candidate_hash = download_hash or transferd.download_hash
candidate_downloader = downloader or transferd.downloader
if candidate_hash and candidate_downloader:
skipped_torrents.add(
(candidate_hash, candidate_downloader)
)
logger.info(
f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。"
)
@@ -2139,6 +2161,18 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
progress.update(value=100, text=__end_msg, data={})
progress.end()
# 下载器任务在这一轮可能因为历史记录全部命中而没有进入整理队列,
# 这里补打一遍已整理标签,避免同一种子被重复扫描。
if (
skipped_history_count == planned_file_count
and skipped_torrents
):
for skipped_hash, skipped_downloader in skipped_torrents:
logger.info(f"补充设置下载任务已整理标签:{skipped_hash}")
self.__mark_torrent_completed_if_done(
skipped_hash, skipped_downloader
)
error_msg = "".join(err_msgs[:2]) + (
f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else ""
)

View File

@@ -1,4 +1,6 @@
import unittest
from types import SimpleNamespace
from unittest.mock import patch
from app.chain.transfer import JobManager, TransferChain
from app.schemas import FileItem, TransferTask
@@ -187,7 +189,7 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual(1, len(jobs))
self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem)
def test_exception_failure_marks_downloader_hash_completed_before_cleanup(self):
def test_exception_failure_does_not_mark_downloader_without_history(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
completed = []
@@ -204,6 +206,129 @@ class TransferJobManagerTest(unittest.TestCase):
chain._TransferChain__fail_transfer_task(task)
self.assertEqual([], completed)
self.assertEqual([], chain.jobview.list_jobs())
def test_successful_history_skip_marks_downloader_hash_completed(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
completed = []
def fake_transfer_completed(hashs, downloader):
completed.append((hashs, downloader))
chain.transfer_completed = fake_transfer_completed
chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [
(fileitem, False)
]
fileitem = make_task(1).fileitem
history = SimpleNamespace(
status=True,
download_hash="abc123",
downloader="qbittorrent",
)
transfer_history_oper = SimpleNamespace(
get_by_src=lambda src, storage=None: history
)
system_config_oper = SimpleNamespace(get=lambda key: None)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=fileitem,
downloader="qbittorrent",
download_hash="abc123",
background=False,
)
self.assertTrue(state)
self.assertEqual("Test.Show.S01E01.mkv 已整理过", errmsg)
self.assertEqual([("abc123", "qbittorrent")], completed)
def test_failed_history_skip_still_marks_downloader_hash_completed(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
completed = []
def fake_transfer_completed(hashs, downloader):
completed.append((hashs, downloader))
chain.transfer_completed = fake_transfer_completed
chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [
(fileitem, False)
]
fileitem = make_task(1).fileitem
history = SimpleNamespace(
status=False,
download_hash="abc123",
downloader="qbittorrent",
)
transfer_history_oper = SimpleNamespace(
get_by_src=lambda src, storage=None: history
)
system_config_oper = SimpleNamespace(get=lambda key: None)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=fileitem,
downloader="qbittorrent",
download_hash="abc123",
background=False,
)
self.assertFalse(state)
self.assertEqual("Test.Show.S01E01.mkv 已整理过", errmsg)
self.assertEqual([("abc123", "qbittorrent")], completed)
def test_unrecognized_task_marks_downloader_hash_completed(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
chain.post_message = lambda *_args, **_kwargs: None
completed = []
def fake_transfer_completed(hashs, downloader):
completed.append((hashs, downloader))
chain.transfer_completed = fake_transfer_completed
task = make_task(1)
task.downloader = "qbittorrent"
task.download_hash = "abc123"
self.assertTrue(chain.jobview.add_task(task))
transfer_history_oper = SimpleNamespace(
add_fail=lambda **kwargs: SimpleNamespace(id=1)
)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.MediaChain"
) as media_chain_cls, patch(
"app.chain.transfer.settings.AI_AGENT_ENABLE", False
), patch(
"app.chain.transfer.settings.AI_AGENT_RETRY_TRANSFER", False
):
media_chain_cls.return_value.recognize_by_meta.return_value = None
state, errmsg = chain._TransferChain__handle_transfer(task)
self.assertFalse(state)
self.assertEqual("未识别到媒体信息", errmsg)
self.assertEqual([("abc123", "qbittorrent")], completed)
self.assertEqual([], chain.jobview.list_jobs())