From 2851f1639549aa5398d765966bc9e58af6c98644 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 2 Mar 2025 12:27:36 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9Aactions=E5=A2=9E=E5=8A=A0=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/actions/__init__.py | 39 +++++++++++++++++++++++++++++++--- app/actions/add_download.py | 11 ++++++++-- app/actions/add_subscribe.py | 11 ++++++++-- app/actions/fetch_downloads.py | 4 ++-- app/actions/fetch_medias.py | 4 ++-- app/actions/fetch_rss.py | 4 ++-- app/actions/fetch_torrents.py | 4 ++-- app/actions/filter_torrents.py | 4 ++-- app/actions/scan_file.py | 11 ++++++++-- app/actions/scrape_file.py | 11 ++++++++-- app/actions/send_message.py | 4 ++-- app/actions/transfer_file.py | 17 +++++++++++++-- app/api/endpoints/workflow.py | 30 +++++++++++++++++++++++++- app/chain/workflow.py | 5 ++++- app/core/workflow.py | 2 +- app/db/models/workflow.py | 2 +- 16 files changed, 134 insertions(+), 29 deletions(-) diff --git a/app/actions/__init__.py b/app/actions/__init__.py index ac46d4cf..259e2cec 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod +from typing import List, Any, Union from app.chain import ChainBase +from app.db.systemconfig_oper import SystemConfigOper from app.schemas import ActionContext, ActionParams @@ -13,27 +15,35 @@ class BaseAction(ABC): 工作流动作基类 """ + # 动作ID + _action_id = None # 完成标志 _done_flag = False # 执行信息 _message = "" + # 缓存键值 + _cache_key = "WorkflowCache-%s" + + def __init__(self, action_id: str): + self._action_id = action_id + self.systemconfigoper = SystemConfigOper() @classmethod @property @abstractmethod - def name(cls) -> str: # noqa + def name(cls) -> str: # noqa pass @classmethod @property @abstractmethod - def description(cls) -> str: # noqa + def description(cls) -> str: # noqa pass @classmethod @property @abstractmethod - def data(cls) -> dict: # noqa + def data(cls) -> dict: # noqa pass @property @@ -65,6 +75,29 @@ class BaseAction(ABC): self._message = message self._done_flag = True + def check_cache(self, workflow_id: int, key: str) -> bool: + """ + 检查是否处理过 + """ + workflow_key = self._cache_key % workflow_id + workflow_cache = self.systemconfigoper.get(workflow_key) or {} + action_cache = workflow_cache.get(self._action_id) or [] + return key in action_cache + + def save_cache(self, workflow_id: int, data: Union[list, str]): + """ + 保存缓存 + """ + workflow_key = self._cache_key % workflow_id + workflow_cache = self.systemconfigoper.get(workflow_key) or {} + action_cache = workflow_cache.get(self._action_id) or [] + if isinstance(data, list): + action_cache.extend(data) + else: + action_cache.append(data) + workflow_cache[self._action_id] = action_cache + self.systemconfigoper.set(workflow_key, workflow_cache) + @abstractmethod def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext: """ diff --git a/app/actions/add_download.py b/app/actions/add_download.py index 6a4f455f..87aa7f08 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -30,8 +30,8 @@ class AddDownloadAction(BaseAction): _added_downloads = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.downloadchain = DownloadChain() self.mediachain = MediaChain() @@ -62,6 +62,11 @@ class AddDownloadAction(BaseAction): for t in context.torrents: if global_vars.is_workflow_stopped(workflow_id): break + # 检查缓存 + cache_key = f"{t.torrent_info.site}-{t.torrent_info.title}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{t.title} 已添加过下载,跳过") + continue if not t.meta_info: t.meta_info = MetaInfo(title=t.title, subtitle=t.description) if not t.media_info: @@ -97,6 +102,8 @@ class AddDownloadAction(BaseAction): label=params.labels) if did: self._added_downloads.append(did) + # 保存缓存 + self.save_cache(workflow_id, cache_key) else: self._has_error = True diff --git a/app/actions/add_subscribe.py b/app/actions/add_subscribe.py index 97a92616..8900e05e 100644 --- a/app/actions/add_subscribe.py +++ b/app/actions/add_subscribe.py @@ -22,8 +22,8 @@ class AddSubscribeAction(BaseAction): _added_subscribes = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.subscribechain = SubscribeChain() self.subscribeoper = SubscribeOper() @@ -53,6 +53,11 @@ class AddSubscribeAction(BaseAction): for media in context.medias: if global_vars.is_workflow_stopped(workflow_id): break + # 检查缓存 + cache_key = f"{media.type}-{media.title}-{media.year}-{media.season}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{media.title} {media.year} 已添加过订阅,跳过") + continue mediainfo = MediaInfo() mediainfo.from_dict(media.dict()) if self.subscribechain.exists(mediainfo): @@ -69,6 +74,8 @@ class AddSubscribeAction(BaseAction): username=settings.SUPERUSER) if sid: self._added_subscribes.append(sid) + # 保存缓存 + self.save_cache(workflow_id, cache_key) else: self._has_error = True diff --git a/app/actions/fetch_downloads.py b/app/actions/fetch_downloads.py index eefe3502..79c0ed48 100644 --- a/app/actions/fetch_downloads.py +++ b/app/actions/fetch_downloads.py @@ -18,8 +18,8 @@ class FetchDownloadsAction(BaseAction): _downloads = [] - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.chain = ActionChain() @classmethod diff --git a/app/actions/fetch_medias.py b/app/actions/fetch_medias.py index 3f1372d6..bc028b90 100644 --- a/app/actions/fetch_medias.py +++ b/app/actions/fetch_medias.py @@ -31,8 +31,8 @@ class FetchMediasAction(BaseAction): _medias = [] - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.__inner_sources = [ { diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index 6f42ee92..d0abfef2 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -32,8 +32,8 @@ class FetchRssAction(BaseAction): _rss_torrents = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.rsshelper = RssHelper() self.chain = ActionChain() diff --git a/app/actions/fetch_torrents.py b/app/actions/fetch_torrents.py index 6ec46093..fb17eda8 100644 --- a/app/actions/fetch_torrents.py +++ b/app/actions/fetch_torrents.py @@ -31,8 +31,8 @@ class FetchTorrentsAction(BaseAction): _torrents = [] - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.searchchain = SearchChain() @classmethod diff --git a/app/actions/filter_torrents.py b/app/actions/filter_torrents.py index ee350708..efc10fbf 100644 --- a/app/actions/filter_torrents.py +++ b/app/actions/filter_torrents.py @@ -29,8 +29,8 @@ class FilterTorrentsAction(BaseAction): _torrents = [] - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.torrenthelper = TorrentHelper() self.chain = ActionChain() diff --git a/app/actions/scan_file.py b/app/actions/scan_file.py index 843de4ef..2a833955 100644 --- a/app/actions/scan_file.py +++ b/app/actions/scan_file.py @@ -27,8 +27,8 @@ class ScanFileAction(BaseAction): _fileitems = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.storagechain = StorageChain() @classmethod @@ -68,7 +68,14 @@ class ScanFileAction(BaseAction): break if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT: continue + # 检查缓存 + cache_key = f"{file.path}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{file.path} 已处理过,跳过") + continue self._fileitems.append(fileitem) + # 保存缓存 + self.save_cache(workflow_id, cache_key) if self._fileitems: context.fileitems.extend(self._fileitems) diff --git a/app/actions/scrape_file.py b/app/actions/scrape_file.py index 148fabb2..eb9d8e35 100644 --- a/app/actions/scrape_file.py +++ b/app/actions/scrape_file.py @@ -24,8 +24,8 @@ class ScrapeFileAction(BaseAction): _scraped_files = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.storagechain = StorageChain() self.mediachain = MediaChain() @@ -59,6 +59,11 @@ class ScrapeFileAction(BaseAction): continue if not self.storagechain.exists(fileitem): continue + # 检查缓存 + cache_key = f"{fileitem.path}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{fileitem.path} 已刮削,跳过") + continue meta = MetaInfoPath(Path(fileitem.path)) mediainfo = self.mediachain.recognize_media(meta) if not mediainfo: @@ -67,6 +72,8 @@ class ScrapeFileAction(BaseAction): continue self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo) self._scraped_files.append(fileitem) + # 保存缓存 + self.save_cache(workflow_id, cache_key) self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件") return context diff --git a/app/actions/send_message.py b/app/actions/send_message.py index 8ac9185d..bebba726 100644 --- a/app/actions/send_message.py +++ b/app/actions/send_message.py @@ -19,8 +19,8 @@ class SendMessageAction(BaseAction): 发送消息 """ - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.chain = ActionChain() @classmethod diff --git a/app/actions/transfer_file.py b/app/actions/transfer_file.py index e8e67ea9..cd753767 100644 --- a/app/actions/transfer_file.py +++ b/app/actions/transfer_file.py @@ -29,8 +29,8 @@ class TransferFileAction(BaseAction): _fileitems = [] _has_error = False - def __init__(self): - super().__init__() + def __init__(self, action_id: str): + super().__init__(action_id) self.transferchain = TransferChain() self.storagechain = StorageChain() self.transferhis = TransferHistoryOper() @@ -76,6 +76,11 @@ class TransferFileAction(BaseAction): if not download.completed: logger.info(f"下载任务 {download.download_id} 未完成") continue + # 检查缓存 + cache_key = f"{download.download_id}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{download.path} 已整理过,跳过") + continue fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path)) if not fileitem: logger.info(f"文件 {download.path} 不存在") @@ -92,11 +97,17 @@ class TransferFileAction(BaseAction): continue logger.info(f"整理文件 {download.path} 完成") self._fileitems.append(fileitem) + self.save_cache(workflow_id, cache_key) else: # 从 fileitems 中整理文件 for fileitem in copy.deepcopy(context.fileitems): if not check_continue(): break + # 检查缓存 + cache_key = f"{fileitem.path}" + if self.check_cache(workflow_id, cache_key): + logger.info(f"{fileitem.path} 已整理过,跳过") + continue transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage) if transferd: # 已经整理过的文件不再整理 @@ -112,6 +123,8 @@ class TransferFileAction(BaseAction): # 从 fileitems 中移除已整理的文件 context.fileitems.remove(fileitem) self._fileitems.append(fileitem) + # 记录已整理的文件 + self.save_cache(workflow_id, cache_key) if self._fileitems: context.fileitems.extend(self._fileitems) diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 46ef5bac..14aef75b 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -9,6 +9,7 @@ from app.core.config import global_vars from app.core.workflow import WorkFlowManager from app.db import get_db from app.db.models.workflow import Workflow +from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_user from app.chain.workflow import WorkflowChain from app.scheduler import Scheduler @@ -84,8 +85,12 @@ def delete_workflow(workflow_id: int, workflow = Workflow.get(db, workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") + # 删除定时任务 Scheduler().remove_workflow_job(workflow) + # 删除工作流 Workflow.delete(db, workflow_id) + # 删除缓存 + SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") return schemas.Response(success=True, message="删除成功") @@ -112,8 +117,9 @@ def start_workflow(workflow_id: int, workflow = Workflow.get(db, workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") + # 添加定时任务 Scheduler().update_workflow_job(workflow) - global_vars.workflow_resume(workflow_id) + # 更新状态 workflow.update_state(db, workflow_id, "W") return schemas.Response(success=True) @@ -128,7 +134,29 @@ def pause_workflow(workflow_id: int, workflow = Workflow.get(db, workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") + # 删除定时任务 Scheduler().remove_workflow_job(workflow) + # 停止工作流 global_vars.stop_workflow(workflow_id) + # 更新状态 workflow.update_state(db, workflow_id, "P") return schemas.Response(success=True) + + +@router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response) +def reset_workflow(workflow_id: int, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 重置工作流 + """ + workflow = Workflow.get(db, workflow_id) + if not workflow: + return schemas.Response(success=False, message="工作流不存在") + # 停止工作流 + global_vars.stop_workflow(workflow_id) + # 重置工作流 + workflow.reset(db, workflow_id) + # 删除缓存 + SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") + return schemas.Response(success=True) diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 8bdd6db0..f7ccc573 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -66,6 +66,7 @@ class WorkflowExecutor: # 初始上下文 if workflow.current_action and workflow.context: + logger.info(f"工作流已执行动作:{workflow.current_action}") # Base64解码 decoded_data = base64.b64decode(workflow.context["content"]) # 反序列化数据 @@ -73,7 +74,9 @@ class WorkflowExecutor: else: self.context = ActionContext() - # 初始化队列:入度为0的节点 + # 恢复工作流 + global_vars.workflow_resume(self.workflow.id) + # 初始化队列,添加入度为0的节点 for action_id in self.actions: if self.indegree[action_id] == 0: self.queue.append(action_id) diff --git a/app/core/workflow.py b/app/core/workflow.py index 18553ffe..30f23515 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -64,7 +64,7 @@ class WorkFlowManager(metaclass=Singleton): context = ActionContext() if action.type in self._actions: # 实例化 - action_obj = self._actions[action.type]() + action_obj = self._actions[action.type](action.id) # 执行 logger.info(f"执行动作: {action.id} - {action.name}") try: diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 31c00f1c..183dbcd6 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -95,7 +95,7 @@ class Workflow(Base): @db_update def update_current_action(db, wid: int, action_id: str, context: dict): db.query(Workflow).filter(Workflow.id == wid).update({ - "current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id, + "current_action": Workflow.current_action + f",{action_id}" if Workflow.current_action else action_id, "context": context }) return True