fix(transfer): tighten queue cleanup edge cases

This commit is contained in:
InfinityPacer
2026-04-14 14:17:58 +08:00
committed by jxxghp
parent eda73e14f7
commit 81828948dd
2 changed files with 67 additions and 9 deletions

View File

@@ -192,7 +192,7 @@ class JobManager:
"""
将任务从 meta 作业迁移到 media 作业
"""
curr_task = self.remove_task(task.fileitem)
curr_task, source_job_id = self.__remove_task_with_job_id(task.fileitem)
if not self.add_task(task, state=curr_task.state if curr_task else "waiting"):
return False
if curr_task and task.mediainfo:
@@ -200,7 +200,7 @@ class JobManager:
meta=task.meta, season=task.meta.begin_season
)
mediaid = self.__get_id(task)
if mediaid != metaid:
if source_job_id == metaid and mediaid != metaid:
with job_lock:
self._meta_to_media_ids.setdefault(metaid, set()).add(mediaid)
return True
@@ -297,6 +297,15 @@ class JobManager:
"""
根据文件项移除任务
"""
task, _ = self.__remove_task_with_job_id(fileitem)
return task
def __remove_task_with_job_id(
self, fileitem: FileItem
) -> Tuple[Optional[TransferJobTask], Optional[Tuple]]:
"""
根据文件项移除任务并返回任务所在的作业ID
"""
with job_lock:
for mediaid in list(self._job_view):
job = self._job_view[mediaid]
@@ -312,8 +321,8 @@ class JobManager:
set(self._season_episodes[mediaid])
- set(task.meta.episode_list)
)
return task
return None
return task, mediaid
return None, None
def remove_job(self, task: TransferTask) -> Optional[TransferJob]:
"""
@@ -1020,6 +1029,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return
self.jobview.remove_task(fileitem)
def __fail_transfer_task(self, task: TransferTask):
"""
标记异常整理任务失败并清理作业视图
"""
self.jobview.fail_unfinished_task(task)
if task.download_hash and self.jobview.is_torrent_done(task.download_hash):
self.transfer_completed(
hashs=task.download_hash, downloader=task.downloader
)
self.jobview.try_remove_job(task)
def __start_transfer(self):
"""
处理队列
@@ -1096,8 +1116,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.error(
f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}"
)
self.jobview.fail_unfinished_task(task)
self.jobview.try_remove_job(task)
self.__fail_transfer_task(task)
with task_lock:
self._processed_num += 1
self._fail_num += 1
@@ -1832,8 +1851,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
f"{transfer_task.fileitem.name} 整理任务处理出现错误:"
f"{e} - {traceback.format_exc()}"
)
self.jobview.fail_unfinished_task(transfer_task)
self.jobview.try_remove_job(transfer_task)
self.__fail_transfer_task(transfer_task)
state, err_msg = False, str(e)
if not state:
all_success = False

View File

@@ -1,6 +1,6 @@
import unittest
from app.chain.transfer import JobManager
from app.chain.transfer import JobManager, TransferChain
from app.schemas import FileItem, TransferTask
from app.schemas.types import MediaType
@@ -167,6 +167,46 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual(1, len(jobs))
self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem)
def test_pre_recognized_migrations_with_same_meta_do_not_link_jobs(self):
jobview = JobManager()
task1 = make_task(1)
task2 = make_task(2)
task1.mediainfo = FakeMedia(100)
task2.mediainfo = FakeMedia(200)
self.assertTrue(jobview.add_task(task1))
self.assertTrue(jobview.add_task(task2))
self.assertTrue(jobview.migrate_task(task1))
self.assertTrue(jobview.migrate_task(task2))
jobview.running_task(task1)
jobview.finish_task(task1)
jobview.try_remove_job(task1)
jobs = jobview.list_jobs()
self.assertEqual(1, len(jobs))
self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem)
def test_exception_failure_marks_downloader_hash_completed_before_cleanup(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
completed = []
def fake_transfer_completed(hashs, downloader):
completed.append((hashs, downloader))
chain.transfer_completed = fake_transfer_completed
task = make_task(1)
task.downloader = "qbittorrent"
task.download_hash = "abc123"
self.assertTrue(chain.jobview.add_task(task))
chain.jobview.running_task(task)
chain._TransferChain__fail_transfer_task(task)
self.assertEqual([("abc123", "qbittorrent")], completed)
self.assertEqual([], chain.jobview.list_jobs())
if __name__ == "__main__":
unittest.main()