feat:actions增加缓存机制

This commit is contained in:
jxxghp
2025-03-02 12:27:36 +08:00
parent 0d63dfb931
commit 2851f16395
16 changed files with 134 additions and 29 deletions

View File

@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
from typing import List, Any, Union
from app.chain import ChainBase
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas import ActionContext, ActionParams
@@ -13,27 +15,35 @@ class BaseAction(ABC):
工作流动作基类
"""
# 动作ID
_action_id = None
# 完成标志
_done_flag = False
# 执行信息
_message = ""
# 缓存键值
_cache_key = "WorkflowCache-%s"
def __init__(self, action_id: str):
self._action_id = action_id
self.systemconfigoper = SystemConfigOper()
@classmethod
@property
@abstractmethod
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
pass
@property
@@ -65,6 +75,29 @@ class BaseAction(ABC):
self._message = message
self._done_flag = True
def check_cache(self, workflow_id: int, key: str) -> bool:
"""
检查是否处理过
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
return key in action_cache
def save_cache(self, workflow_id: int, data: Union[list, str]):
"""
保存缓存
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
if isinstance(data, list):
action_cache.extend(data)
else:
action_cache.append(data)
workflow_cache[self._action_id] = action_cache
self.systemconfigoper.set(workflow_key, workflow_cache)
@abstractmethod
def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext:
"""

View File

@@ -30,8 +30,8 @@ class AddDownloadAction(BaseAction):
_added_downloads = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
@@ -62,6 +62,11 @@ class AddDownloadAction(BaseAction):
for t in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{t.torrent_info.site}-{t.torrent_info.title}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{t.title} 已添加过下载,跳过")
continue
if not t.meta_info:
t.meta_info = MetaInfo(title=t.title, subtitle=t.description)
if not t.media_info:
@@ -97,6 +102,8 @@ class AddDownloadAction(BaseAction):
label=params.labels)
if did:
self._added_downloads.append(did)
# 保存缓存
self.save_cache(workflow_id, cache_key)
else:
self._has_error = True

View File

@@ -22,8 +22,8 @@ class AddSubscribeAction(BaseAction):
_added_subscribes = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
@@ -53,6 +53,11 @@ class AddSubscribeAction(BaseAction):
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{media.type}-{media.title}-{media.year}-{media.season}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{media.title} {media.year} 已添加过订阅,跳过")
continue
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
@@ -69,6 +74,8 @@ class AddSubscribeAction(BaseAction):
username=settings.SUPERUSER)
if sid:
self._added_subscribes.append(sid)
# 保存缓存
self.save_cache(workflow_id, cache_key)
else:
self._has_error = True

View File

@@ -18,8 +18,8 @@ class FetchDownloadsAction(BaseAction):
_downloads = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod

View File

@@ -31,8 +31,8 @@ class FetchMediasAction(BaseAction):
_medias = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.__inner_sources = [
{

View File

@@ -32,8 +32,8 @@ class FetchRssAction(BaseAction):
_rss_torrents = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.rsshelper = RssHelper()
self.chain = ActionChain()

View File

@@ -31,8 +31,8 @@ class FetchTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.searchchain = SearchChain()
@classmethod

View File

@@ -29,8 +29,8 @@ class FilterTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()

View File

@@ -27,8 +27,8 @@ class ScanFileAction(BaseAction):
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
@classmethod
@@ -68,7 +68,14 @@ class ScanFileAction(BaseAction):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
# 检查缓存
cache_key = f"{file.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{file.path} 已处理过,跳过")
continue
self._fileitems.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)

View File

@@ -24,8 +24,8 @@ class ScrapeFileAction(BaseAction):
_scraped_files = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self.mediachain = MediaChain()
@@ -59,6 +59,11 @@ class ScrapeFileAction(BaseAction):
continue
if not self.storagechain.exists(fileitem):
continue
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已刮削,跳过")
continue
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = self.mediachain.recognize_media(meta)
if not mediainfo:
@@ -67,6 +72,8 @@ class ScrapeFileAction(BaseAction):
continue
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件")
return context

View File

@@ -19,8 +19,8 @@ class SendMessageAction(BaseAction):
发送消息
"""
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod

View File

@@ -29,8 +29,8 @@ class TransferFileAction(BaseAction):
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
@@ -76,6 +76,11 @@ class TransferFileAction(BaseAction):
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
# 检查缓存
cache_key = f"{download.download_id}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{download.path} 已整理过,跳过")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
@@ -92,11 +97,17 @@ class TransferFileAction(BaseAction):
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
self.save_cache(workflow_id, cache_key)
else:
# 从 fileitems 中整理文件
for fileitem in copy.deepcopy(context.fileitems):
if not check_continue():
break
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已整理过,跳过")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
@@ -112,6 +123,8 @@ class TransferFileAction(BaseAction):
# 从 fileitems 中移除已整理的文件
context.fileitems.remove(fileitem)
self._fileitems.append(fileitem)
# 记录已整理的文件
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)

View File

@@ -9,6 +9,7 @@ from app.core.config import global_vars
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
@@ -84,8 +85,12 @@ def delete_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
Workflow.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")
@@ -112,8 +117,9 @@ def start_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
Scheduler().update_workflow_job(workflow)
global_vars.workflow_resume(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@@ -128,7 +134,29 @@ def pause_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)
@router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response)
def reset_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
重置工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 重置工作流
workflow.reset(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True)

View File

@@ -66,6 +66,7 @@ class WorkflowExecutor:
# 初始上下文
if workflow.current_action and workflow.context:
logger.info(f"工作流已执行动作:{workflow.current_action}")
# Base64解码
decoded_data = base64.b64decode(workflow.context["content"])
# 反序列化数据
@@ -73,7 +74,9 @@ class WorkflowExecutor:
else:
self.context = ActionContext()
# 初始化队列入度为0的节点
# 恢复工作流
global_vars.workflow_resume(self.workflow.id)
# 初始化队列添加入度为0的节点
for action_id in self.actions:
if self.indegree[action_id] == 0:
self.queue.append(action_id)

View File

@@ -64,7 +64,7 @@ class WorkFlowManager(metaclass=Singleton):
context = ActionContext()
if action.type in self._actions:
# 实例化
action_obj = self._actions[action.type]()
action_obj = self._actions[action.type](action.id)
# 执行
logger.info(f"执行动作: {action.id} - {action.name}")
try:

View File

@@ -95,7 +95,7 @@ class Workflow(Base):
@db_update
def update_current_action(db, wid: int, action_id: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({
"current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id,
"current_action": Workflow.current_action + f",{action_id}" if Workflow.current_action else action_id,
"context": context
})
return True