From ab0008be86cc80177e51308a488621cb962e59c2 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 27 Feb 2025 13:09:01 +0800 Subject: [PATCH] fix actions --- app/actions/__init__.py | 13 ++++++------- app/actions/add_download.py | 4 ++++ app/actions/add_subscribe.py | 4 ++++ app/actions/fetch_downloads.py | 10 +++++++++- app/actions/fetch_medias.py | 4 ++++ app/actions/fetch_rss.py | 7 ++++++- app/actions/fetch_torrents.py | 6 +++++- app/actions/filter_medias.py | 4 ++++ app/actions/filter_torrents.py | 7 ++++++- app/actions/scrape_file.py | 10 ++++++++-- app/actions/send_event.py | 4 ++++ app/actions/send_message.py | 10 +++++++++- app/actions/transfer_file.py | 8 +++++++- app/api/endpoints/workflow.py | 9 +++++++++ app/core/workflow.py | 35 ++++++++++++++++++++++++++-------- app/schemas/workflow.py | 10 ++++------ 16 files changed, 116 insertions(+), 29 deletions(-) diff --git a/app/actions/__init__.py b/app/actions/__init__.py index 0444c90d..a28aab55 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -1,7 +1,5 @@ from abc import ABC, abstractmethod -from pydantic.main import BaseModel - from app.chain import ChainBase from app.schemas import ActionContext, ActionParams @@ -10,7 +8,7 @@ class ActionChain(ChainBase): pass -class BaseAction(BaseModel, ABC): +class BaseAction(ABC): """ 工作流动作基类 """ @@ -18,10 +16,6 @@ class BaseAction(BaseModel, ABC): # 完成标志 _done_flag = False - def __init__(self): - super().__init__() - self.chain = ActionChain() - @property @abstractmethod def name(self) -> str: @@ -32,6 +26,11 @@ class BaseAction(BaseModel, ABC): def description(self) -> str: pass + @property + @abstractmethod + def data(self) -> dict: + pass + @abstractmethod def execute(self, params: ActionParams, context: ActionContext) -> ActionContext: """ diff --git a/app/actions/add_download.py b/app/actions/add_download.py index 3315ba7e..a8716cec 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -37,6 +37,10 @@ class AddDownloadAction(BaseAction): def description(self) -> str: return "根据资源列表添加下载任务" + @property + def data(self) -> dict: + return AddDownloadParams().dict() + @property def success(self) -> bool: return True if self._added_downloads else False diff --git a/app/actions/add_subscribe.py b/app/actions/add_subscribe.py index 8aacae04..ce801bc4 100644 --- a/app/actions/add_subscribe.py +++ b/app/actions/add_subscribe.py @@ -33,6 +33,10 @@ class AddSubscribeAction(BaseAction): def description(self) -> str: return "根据媒体列表添加订阅" + @property + def data(self) -> dict: + return AddSubscribeParams().dict() + @property def success(self) -> bool: return True if self._added_subscribes else False diff --git a/app/actions/fetch_downloads.py b/app/actions/fetch_downloads.py index fe696502..ed27c920 100644 --- a/app/actions/fetch_downloads.py +++ b/app/actions/fetch_downloads.py @@ -1,4 +1,4 @@ -from app.actions import BaseAction +from app.actions import BaseAction, ActionChain from app.schemas import ActionParams, ActionContext from app.log import logger @@ -17,6 +17,10 @@ class FetchDownloadsAction(BaseAction): _downloads = [] + def __init__(self): + super().__init__() + self.chain = ActionChain() + @property def name(self) -> str: return "获取下载任务" @@ -25,6 +29,10 @@ class FetchDownloadsAction(BaseAction): def description(self) -> str: return "获取下载任务,更新任务状态" + @property + def data(self) -> dict: + return FetchDownloadsParams().dict() + @property def success(self) -> bool: if not self._downloads: diff --git a/app/actions/fetch_medias.py b/app/actions/fetch_medias.py index 4aba7a94..8e551b4b 100644 --- a/app/actions/fetch_medias.py +++ b/app/actions/fetch_medias.py @@ -89,6 +89,10 @@ class FetchMediasAction(BaseAction): def description(self) -> str: return "获取媒体数据" + @property + def data(self) -> dict: + return FetchMediasParams().dict() + @property def success(self) -> bool: return True if self.__medias else False diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index 5d14673e..3622bf63 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -2,7 +2,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction +from app.actions import BaseAction, ActionChain from app.core.config import settings from app.core.context import Context from app.core.metainfo import MetaInfo @@ -33,6 +33,7 @@ class FetchRssAction(BaseAction): def __init__(self): super().__init__() self.rsshelper = RssHelper() + self.chain = ActionChain() @property def name(self) -> str: @@ -42,6 +43,10 @@ class FetchRssAction(BaseAction): def description(self) -> str: return "请求RSS地址获取数据,并解析为资源列表" + @property + def data(self) -> dict: + return FetchRssParams().dict() + @property def success(self) -> bool: return True if self._rss_torrents else False diff --git a/app/actions/fetch_torrents.py b/app/actions/fetch_torrents.py index 3292f2bc..42e5f7da 100644 --- a/app/actions/fetch_torrents.py +++ b/app/actions/fetch_torrents.py @@ -38,6 +38,10 @@ class FetchTorrentsAction(BaseAction): def description(self) -> str: return "根据关键字搜索站点种子资源" + @property + def data(self) -> dict: + return FetchTorrentsParams().dict() + @property def success(self) -> bool: return True if self._torrents else False @@ -55,7 +59,7 @@ class FetchTorrentsAction(BaseAction): if params.season and torrent.meta_info.begin_season != params.season: continue # 识别媒体信息 - torrent.media_info = self.chain.recognize_media(torrent.meta_info) + torrent.media_info = self.searchchain.recognize_media(torrent.meta_info) if not torrent.media_info: logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息") continue diff --git a/app/actions/filter_medias.py b/app/actions/filter_medias.py index 44534582..e89b7d3f 100644 --- a/app/actions/filter_medias.py +++ b/app/actions/filter_medias.py @@ -32,6 +32,10 @@ class FilterMediasAction(BaseAction): def description(self) -> str: return "过滤媒体数据列表" + @property + def data(self) -> dict: + return FilterMediasParams().dict() + @property def success(self) -> bool: return True if self.__medias else False diff --git a/app/actions/filter_torrents.py b/app/actions/filter_torrents.py index 449455f1..f3f76576 100644 --- a/app/actions/filter_torrents.py +++ b/app/actions/filter_torrents.py @@ -2,7 +2,7 @@ from typing import Optional, List from pydantic import Field -from app.actions import BaseAction +from app.actions import BaseAction, ActionChain from app.helper.torrent import TorrentHelper from app.schemas import ActionParams, ActionContext @@ -30,6 +30,7 @@ class FilterTorrentsAction(BaseAction): def __init__(self): super().__init__() self.torrenthelper = TorrentHelper() + self.chain = ActionChain() @property def name(self) -> str: @@ -39,6 +40,10 @@ class FilterTorrentsAction(BaseAction): def description(self) -> str: return "过滤资源数据列表" + @property + def data(self) -> dict: + return FilterTorrentsParams().dict() + @property def success(self) -> bool: return self.done diff --git a/app/actions/scrape_file.py b/app/actions/scrape_file.py index e2c7bf9a..e3a367fa 100644 --- a/app/actions/scrape_file.py +++ b/app/actions/scrape_file.py @@ -1,3 +1,5 @@ +from pathlib import Path + from app.actions import BaseAction from app.schemas import ActionParams, ActionContext from app.chain.media import MediaChain @@ -33,6 +35,10 @@ class ScrapeFileAction(BaseAction): def description(self) -> str: return "刮削媒体信息和图片" + @property + def data(self) -> dict: + return ScrapeFileParams().dict() + @property def success(self) -> bool: return True if self.__scraped_files else False @@ -46,8 +52,8 @@ class ScrapeFileAction(BaseAction): continue if not self.storagechain.exists(fileitem): continue - meta = MetaInfoPath(fileitem.path) - mediainfo = self.chain.recognize_media(meta) + meta = MetaInfoPath(Path(fileitem.path)) + mediainfo = self.mediachain.recognize_media(meta) if not mediainfo: logger.info(f"{fileitem.path} 未识别到媒体信息,无法刮削") continue diff --git a/app/actions/send_event.py b/app/actions/send_event.py index 8b3f9f0a..fe56e648 100644 --- a/app/actions/send_event.py +++ b/app/actions/send_event.py @@ -27,6 +27,10 @@ class SendEventAction(BaseAction): def description(self) -> str: return "发送特定事件" + @property + def data(self) -> dict: + return SendEventParams().dict() + @property def success(self) -> bool: return self.__success diff --git a/app/actions/send_message.py b/app/actions/send_message.py index 25a96bc9..5dc5135a 100644 --- a/app/actions/send_message.py +++ b/app/actions/send_message.py @@ -2,7 +2,7 @@ from typing import List, Optional, Union from pydantic import Field -from app.actions import BaseAction +from app.actions import BaseAction, ActionChain from app.schemas import ActionParams, ActionContext, MessageChannel @@ -19,6 +19,10 @@ class SendMessageAction(BaseAction): 发送消息 """ + def __init__(self): + super().__init__() + self.chain = ActionChain() + @property def name(self) -> str: return "发送消息" @@ -27,6 +31,10 @@ class SendMessageAction(BaseAction): def description(self) -> str: return "发送特定消息" + @property + def data(self) -> dict: + return SendMessageParams().dict() + @property def success(self) -> bool: return self.done diff --git a/app/actions/transfer_file.py b/app/actions/transfer_file.py index c933cf09..ad7e09d8 100644 --- a/app/actions/transfer_file.py +++ b/app/actions/transfer_file.py @@ -1,3 +1,5 @@ +from pathlib import Path + from app.actions import BaseAction from app.schemas import ActionParams, ActionContext from app.chain.storage import StorageChain @@ -32,6 +34,10 @@ class TransferFileAction(BaseAction): def description(self) -> str: return "整理和转移文件" + @property + def data(self) -> dict: + return TransferFileParams().dict() + @property def success(self) -> bool: return True if self.__fileitems else False @@ -44,7 +50,7 @@ class TransferFileAction(BaseAction): if not download.completed: logger.info(f"下载任务 {download.download_id} 未完成") continue - fileitem = self.storagechain.get_file_item(storage="local", path=download.path) + fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path)) if not fileitem: logger.info(f"文件 {download.path} 不存在") continue diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index f4754efe..c853d241 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app import schemas +from app.core.workflow import WorkFlowManager from app.db import get_db from app.db.models.workflow import Workflow from app.db.user_oper import get_current_active_user @@ -40,6 +41,14 @@ def create_workflow(workflow: schemas.Workflow, return schemas.Response(success=True, message="创建工作流成功") +@router.get("/actions", summary="所有动作", response_model=List[dict]) +def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 获取所有动作 + """ + return WorkFlowManager().list_actions() + + @router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow) def get_workflow(workflow_id: int, db: Session = Depends(get_db), diff --git a/app/core/workflow.py b/app/core/workflow.py index 7fa57fb6..f86f8787 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -1,5 +1,5 @@ from time import sleep -from typing import Dict, Any, Tuple +from typing import Dict, Any, Tuple, List from app.helper.module import ModuleHelper from app.log import logger @@ -43,7 +43,10 @@ class WorkFlowManager(metaclass=Singleton): ) for action in actions: logger.debug(f"加载动作: {action.__name__}") - self._actions[action.__name__] = action + try: + self._actions[action.__name__] = action() + except Exception as err: + logger.error(f"加载动作失败: {action.__name__} - {err}") def stop(self): """ @@ -59,22 +62,22 @@ class WorkFlowManager(metaclass=Singleton): context = ActionContext() if action.type in self._actions: # 实例化 - action_obj = self._actions[action.type]() + action_obj = self._actions[action.type] # 执行 logger.info(f"执行动作: {action.id} - {action.name}") - result_context = action_obj.execute(action.params, context) + result_context = action_obj.execute(action.data, context) if action_obj.success: logger.info(f"{action.name} 执行成功") else: logger.error(f"{action.name} 执行失败") - if action.loop and action.loop_interval: + if action.data.loop and action.data.loop_interval: while not action_obj.done: # 等待 - logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行 ...") - sleep(action.loop_interval) + logger.info(f"{action.name} 等待 {action.data.loop_interval} 秒后继续执行 ...") + sleep(action.data.loop_interval) # 执行 logger.info(f"继续执行动作: {action.id} - {action.name}") - result_context = action_obj.execute(action.params, result_context) + result_context = action_obj.execute(action.data, result_context) if action_obj.success: logger.info(f"{action.name} 执行成功") else: @@ -84,3 +87,19 @@ class WorkFlowManager(metaclass=Singleton): else: logger.error(f"未找到动作: {action.type} - {action.name}") return False, context + + def list_actions(self) -> List[dict]: + """ + 获取所有动作 + """ + return [ + { + "type": key, + "name": action.name, + "description": action.description, + "data": { + "label": action.name, + **action.data + } + } for key, action in self._actions.items() + ] diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 67f4753a..5d119445 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -36,7 +36,8 @@ class ActionParams(BaseModel): """ 动作基础参数 """ - pass + loop: Optional[bool] = Field(False, description="是否需要循环") + loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)") class Action(BaseModel): @@ -47,10 +48,7 @@ class Action(BaseModel): type: Optional[str] = Field(None, description="动作类型 (类名)") name: Optional[str] = Field(None, description="动作名称") description: Optional[str] = Field(None, description="动作描述") - loop: Optional[bool] = Field(False, description="是否需要循环") - loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)") - params: Optional[ActionParams] = Field({}, description="参数") - label: Optional[str] = Field(None, description="标签") + data: Optional[ActionParams] = Field({}, description="参数") position: Optional[dict] = Field({}, description="位置") @@ -76,4 +74,4 @@ class ActionFlow(BaseModel): id: Optional[str] = Field(None, description="流程ID") source: Optional[str] = Field(None, description="源动作") target: Optional[str] = Field(None, description="目标动作") - animated: Optional[bool] = Field(False, description="是否动画流程") + animated: Optional[bool] = Field(True, description="是否动画流程")