Compare commits

...

7 Commits

Author SHA1 Message Date
jxxghp
8bd6ccb0de fix 完善事件和消息发送 2025-03-01 18:34:39 +08:00
jxxghp
ed8895dfbb v2.3.2
- 任务工作流支持手动停止、支持导入导出流程数据、完善动作组件等
2025-03-01 15:51:15 +08:00
jxxghp
a55632051b fix fetch_medias action 2025-03-01 13:54:29 +08:00
jxxghp
7e347a458d add ScanFileAction 2025-02-28 21:23:44 +08:00
jxxghp
cce71f23e2 add ScanFileAction 2025-02-28 21:11:51 +08:00
jxxghp
d68461a127 更新 scheduler.py 2025-02-28 19:37:39 +08:00
jxxghp
1bd12a9411 feat:工作流手动中止 2025-02-28 19:02:38 +08:00
24 changed files with 422 additions and 160 deletions

View File

@@ -15,6 +15,8 @@ class BaseAction(ABC):
# 完成标志
_done_flag = False
# 执行信息
_message = ""
@classmethod
@property
@@ -49,14 +51,22 @@ class BaseAction(ABC):
"""
pass
def job_done(self):
@property
def message(self) -> str:
"""
执行信息
"""
return self._message
def job_done(self, message: str = None):
"""
标记动作完成
"""
self._message = message
self._done_flag = True
@abstractmethod
def execute(self, params: ActionParams, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext:
"""
执行动作
"""

View File

@@ -1,8 +1,11 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.download import DownloadChain
from app.chain.media import MediaChain
from app.core.config import global_vars
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas import ActionParams, ActionContext, DownloadTask, MediaType
@@ -12,9 +15,9 @@ class AddDownloadParams(ActionParams):
"""
添加下载资源参数
"""
downloader: str = Field(None, description="下载器")
save_path: str = Field(None, description="保存路径")
only_lack: bool = Field(False, description="仅下载缺失的资源")
downloader: Optional[str] = Field(None, description="下载器")
save_path: Optional[str] = Field(None, description="保存路径")
only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源")
class AddDownloadAction(BaseAction):
@@ -50,12 +53,14 @@ class AddDownloadAction(BaseAction):
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
将上下文中的torrents添加到下载任务中
"""
params = AddDownloadParams(**params)
for t in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if not t.meta_info:
t.meta_info = MetaInfo(title=t.title, subtitle=t.description)
if not t.media_info:
@@ -99,5 +104,5 @@ class AddDownloadAction(BaseAction):
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)
self.job_done()
self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务")
return context

View File

@@ -1,6 +1,6 @@
from app.actions import BaseAction
from app.chain.subscribe import SubscribeChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.context import MediaInfo
from app.db.subscribe_oper import SubscribeOper
from app.log import logger
@@ -46,11 +46,13 @@ class AddSubscribeAction(BaseAction):
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
将medias中的信息添加订阅如果订阅不存在的话
"""
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
@@ -75,5 +77,5 @@ class AddSubscribeAction(BaseAction):
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
self.job_done()
self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅")
return context

View File

@@ -1,4 +1,5 @@
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.log import logger
@@ -29,7 +30,7 @@ class FetchDownloadsAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "获取下载任务,更新任务状态"
return "获取下载队列中的任务状态"
@classmethod
@property
@@ -40,12 +41,14 @@ class FetchDownloadsAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
更新downloads中的下载任务状态
"""
__all_complete = False
for download in self._downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
logger.info(f"获取下载任务 {download.download_id} 状态 ...")
torrents = self.chain.list_torrents(hashs=[download.download_id])
if not torrents:
@@ -56,6 +59,9 @@ class FetchDownloadsAction(BaseAction):
if t.progress >= 100:
logger.info(f"下载任务 {download.download_id} 已完成")
download.completed = True
else:
logger.info(f"下载任务 {download.download_id} 未完成")
download.completed = False
if all([d.completed for d in self._downloads]):
self.job_done()
return context

View File

@@ -1,11 +1,11 @@
from typing import List
from typing import List, Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.recommend import RecommendChain
from app.schemas import ActionParams, ActionContext
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.event import eventmanager
from app.log import logger
from app.schemas import RecommendSourceEventData, MediaInfo
@@ -17,7 +17,9 @@ class FetchMediasParams(ActionParams):
"""
获取媒体数据参数
"""
sources: List[str] = Field([], description="媒体数据来源")
source_type: Optional[str] = Field("ranking", description="来源")
sources: Optional[List[str]] = Field([], description="榜单")
api_path: Optional[str] = Field(None, description="API路径")
class FetchMediasAction(BaseAction):
@@ -124,33 +126,45 @@ class FetchMediasAction(BaseAction):
return s
return None
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
获取媒体数据填充到medias
"""
params = FetchMediasParams(**params)
for name in params.sources:
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
if params.source_type == "ranking":
for name in params.sources:
if global_vars.is_workflow_stopped(workflow_id):
break
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取{len(results)}数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}{params.api_path}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{params.api_path} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
if self._medias:
context.medias.extend(self._medias)
self.job_done()
self.job_done(f"获取到 {len(self._medias)} 条媒数据")
return context

View File

@@ -3,7 +3,7 @@ from typing import Optional
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.context import Context
from app.core.metainfo import MetaInfo
from app.helper.rss import RssHelper
@@ -55,7 +55,7 @@ class FetchRssAction(BaseAction):
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
请求RSS地址获取数据并解析为资源列表
"""
@@ -86,6 +86,8 @@ class FetchRssAction(BaseAction):
# 组装种子
for item in rss_items:
if global_vars.is_workflow_stopped(workflow_id):
break
if not item.get("title"):
continue
torrentinfo = TorrentInfo(
@@ -103,8 +105,8 @@ class FetchRssAction(BaseAction):
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))
if self._rss_torrents:
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
context.torrents.extend(self._rss_torrents)
self.job_done()
self.job_done(f"获取到 {len(self._rss_torrents)} 个资源")
return context

View File

@@ -1,9 +1,12 @@
import random
import time
from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction
from app.chain.search import SearchChain
from app.core.config import global_vars
from app.log import logger
from app.schemas import ActionParams, ActionContext, MediaType
@@ -12,7 +15,8 @@ class FetchTorrentsParams(ActionParams):
"""
获取站点资源参数
"""
name: str = Field(None, description="资源名称")
search_type: Optional[str] = Field("keyword", description="搜索类型")
name: Optional[str] = Field(None, description="资源名称")
year: Optional[str] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
@@ -38,7 +42,7 @@ class FetchTorrentsAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "根据关键字搜索站点种子资源"
return "搜索站点种子资源列表"
@classmethod
@property
@@ -49,29 +53,49 @@ class FetchTorrentsAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
搜索站点,获取资源列表
"""
params = FetchTorrentsParams(**params)
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites, cache_local=False)
for torrent in torrents:
if params.year and torrent.meta_info.year != params.year:
continue
if params.type and torrent.media_info and torrent.media_info.type != MediaType(params.type):
continue
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
if params.search_type == "keyword":
# 按关键字搜索
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites, cache_local=False)
for torrent in torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if params.year and torrent.meta_info.year != params.year:
continue
if params.type and torrent.media_info and torrent.media_info.type != MediaType(params.type):
continue
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
else:
# 搜索媒体列表
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
torrents = self.searchchain.search_by_id(tmdbid=media.tmdb_id,
doubanid=media.douban_id,
mtype=MediaType(media.type),
sites=params.sites)
for torrent in torrents:
self._torrents.append(torrent)
# 随机休眠 10-60秒
sleep_time = random.randint(10, 60)
logger.info(f"随机休眠 {sleep_time} 秒 ...")
time.sleep(sleep_time)
if self._torrents:
context.torrents.extend(self._torrents)
logger.info(f"搜索到 {len(self._torrents)} 条资源")
logger.info(f"搜索到 {len(self._torrents)} 条资源")
self.job_done()
self.job_done(f"搜索到 {len(self._torrents)} 个资源")
return context

View File

@@ -3,6 +3,7 @@ from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
@@ -42,12 +43,14 @@ class FilterMediasAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
过滤medias中媒体数据
"""
params = FilterMediasParams(**params)
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
if params.type and media.type != params.type:
continue
if params.category and media.category != params.category:
@@ -61,5 +64,5 @@ class FilterMediasAction(BaseAction):
if self._medias:
context.medias = self._medias
self.job_done()
self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据")
return context

View File

@@ -3,6 +3,7 @@ from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.helper.torrent import TorrentHelper
from app.schemas import ActionParams, ActionContext
@@ -51,12 +52,14 @@ class FilterTorrentsAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
过滤torrents中的资源
"""
params = FilterTorrentsParams(**params)
for torrent in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if self.torrenthelper.filter_torrent(
torrent_info=torrent.torrent_info,
filter_params={
@@ -77,5 +80,5 @@ class FilterTorrentsAction(BaseAction):
context.torrents = self._torrents
self.job_done()
self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源")
return context

77
app/actions/scan_file.py Normal file
View File

@@ -0,0 +1,77 @@
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.storage import StorageChain
from app.core.config import global_vars, settings
from app.log import logger
from app.schemas import ActionParams, ActionContext
class ScanFileParams(ActionParams):
"""
整理文件参数
"""
# 存储
storage: Optional[str] = Field("local", description="存储")
directory: Optional[str] = Field(None, description="目录")
class ScanFileAction(BaseAction):
"""
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
self.storagechain = StorageChain()
@classmethod
@property
def name(cls) -> str:
return "扫描目录"
@classmethod
@property
def description(cls) -> str:
return "扫描目录文件到队列"
@classmethod
@property
def data(cls) -> dict:
return ScanFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
扫描目录中的所有文件记录到fileitems
"""
params = ScanFileParams(**params)
if not params.storage or not params.directory:
return context
fileitem = self.storagechain.get_file_item(params.storage, Path(params.directory))
if not fileitem:
logger.error(f"目录不存在: 【{params.storage}{params.directory}")
self._has_error = True
return context
files = self.storagechain.list_files(fileitem, recursion=True)
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
self._fileitems.append(fileitem)
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done(f"扫描到 {len(self._fileitems)} 个文件")
return context

View File

@@ -1,6 +1,7 @@
from pathlib import Path
from app.actions import BaseAction
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
@@ -47,11 +48,13 @@ class ScrapeFileAction(BaseAction):
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
刮削fileitems中的所有文件
"""
for fileitem in context.fileitems:
if global_vars.is_workflow_stopped(workflow_id):
break
if fileitem in self._scraped_files:
continue
if not self.storagechain.exists(fileitem):
@@ -65,5 +68,5 @@ class ScrapeFileAction(BaseAction):
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
self.job_done()
self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件")
return context

View File

@@ -1,8 +1,7 @@
import copy
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.core.event import eventmanager
from app.schemas import ActionParams, ActionContext
from app.schemas.types import ChainEventType
class SendEventParams(ActionParams):
@@ -25,7 +24,7 @@ class SendEventAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有事件"
return "发送任务执行事件"
@classmethod
@property
@@ -36,16 +35,14 @@ class SendEventAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送events中的事件
发送工作流事件,以更插件干预工作流执行
"""
if context.events:
# 按优先级排序,优先级高的先发送
context.events.sort(key=lambda x: x.priority, reverse=True)
for event in copy.deepcopy(context.events):
eventmanager.send_event(etype=event.event_type, data=event.event_data)
context.events.remove(event)
# 触发资源下载事件,更新执行上下文
event = eventmanager.send_event(ChainEventType.WorkflowExecution, context)
if event and event.event_data:
context = event.event_data
self.job_done()
return context

View File

@@ -1,10 +1,9 @@
import copy
from typing import List, Optional, Union
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext
from app.schemas import ActionParams, ActionContext, Notification
class SendMessageParams(ActionParams):
@@ -32,7 +31,7 @@ class SendMessageAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有消息"
return "发送任务执行消息"
@classmethod
@property
@@ -43,19 +42,32 @@ class SendMessageAction(BaseAction):
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送messages中的消息
"""
for message in copy.deepcopy(context.messages):
if params.client:
message.source = params.client
if params.userid:
message.userid = params.userid
self.chain.post_message(message)
context.messages.remove(message)
context.messages = []
params = SendMessageParams(**params)
msg_text = f"当前进度:{context.progress}%"
index = 1
if context.execute_history:
for history in context.execute_history:
if not history.message:
continue
msg_text += f"\n{index}. {history.action}{history.message}"
index += 1
# 发送消息
if not params.client:
params.client = [None]
for client in params.client:
self.chain.post_message(
Notification(
source=client,
userid=params.userid,
title="【工作流执行结果】",
text=msg_text,
link="#/workflow"
)
)
self.job_done()
return context

View File

@@ -1,6 +1,12 @@
import copy
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
from app.db.transferhistory_oper import TransferHistoryOper
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
@@ -11,7 +17,8 @@ class TransferFileParams(ActionParams):
"""
整理文件参数
"""
pass
# 来源
source: Optional[str] = Field("downloads", description="来源")
class TransferFileAction(BaseAction):
@@ -26,6 +33,7 @@ class TransferFileAction(BaseAction):
super().__init__()
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
@classmethod
@property
@@ -35,7 +43,7 @@ class TransferFileAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "整理下载队列中的文件"
return "整理队列中的文件"
@classmethod
@property
@@ -46,26 +54,64 @@ class TransferFileAction(BaseAction):
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
从downloads中整理文件记录到fileitems
downloads / fileitems 中整理文件记录到fileitems
"""
for download in context.downloads:
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
def check_continue():
"""
检查是否继续整理文件
"""
if global_vars.is_workflow_stopped(workflow_id):
return False
return True
params = TransferFileParams(**params)
if params.source == "downloads":
# 从下载任务中整理文件
for download in context.downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
else:
# 从 fileitems 中整理文件
for fileitem in copy.deepcopy(context.fileitems):
if not check_continue():
break
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {fileitem.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False,
continue_callback=check_continue)
if not state:
self._has_error = True
logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {fileitem.path} 完成")
# 从 fileitems 中移除已整理的文件
context.fileitems.remove(fileitem)
self._fileitems.append(fileitem)
if self._fileitems:
context.fileitems.extend(self._fileitems)

View File

@@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.core.config import global_vars
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
@@ -112,6 +113,7 @@ def start_workflow(workflow_id: int,
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().update_workflow_job(workflow)
global_vars.workflow_resume(workflow_id)
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@@ -127,5 +129,6 @@ def pause_workflow(workflow_id: int,
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
global_vars.stop_workflow(workflow_id)
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)

View File

@@ -905,7 +905,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, background: bool = True,
manual: bool = False) -> Tuple[bool, str]:
manual: bool = False, continue_callback: Callable = None) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
@@ -926,6 +926,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param force: 是否强制整理
:param background: 是否后台运行
:param manual: 是否手动整理
:param continue_callback: 继续处理回调
返回:成功标识,错误信息
"""
@@ -991,6 +992,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
if continue_callback and not continue_callback():
break
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
@@ -1111,6 +1114,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
for transfer_task in transfer_tasks:
if global_vars.is_system_stopped:
break
if continue_callback and not continue_callback():
break
# 更新进度
__process_msg = f"正在整理 {processed_num + fail_num + 1}/{total_num}{transfer_task.fileitem.name} ..."
logger.info(__process_msg)

View File

@@ -14,7 +14,7 @@ from app.core.workflow import WorkFlowManager
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import ActionContext, ActionFlow, Action
from app.schemas import ActionContext, ActionFlow, Action, ActionExecution
class WorkflowExecutor:
@@ -33,6 +33,8 @@ class WorkflowExecutor:
self.step_callback = step_callback
self.actions = {action['id']: Action(**action) for action in workflow.actions}
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
self.total_actions = len(self.actions)
self.finished_actions = 0
self.success = True
self.errmsg = ""
@@ -97,7 +99,8 @@ class WorkflowExecutor:
self.running_tasks += 1
# 已停机
if global_vars.is_system_stopped:
if global_vars.is_workflow_stopped(self.workflow.id):
global_vars.workflow_resume(self.workflow.id)
break
# 已执行的跳过
@@ -108,54 +111,66 @@ class WorkflowExecutor:
# 提交任务到线程池
future = self.executor.submit(
self.execute_node,
self.workflow.id,
node_id,
self.context
)
future.add_done_callback(self.on_node_complete)
def execute_node(self, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
def execute_node(self, workflow_id: int, node_id: int,
context: ActionContext) -> Tuple[Action, bool, str, ActionContext]:
"""
执行单个节点操作返回修改后的上下文和节点ID
"""
action = self.actions[node_id]
state, result_ctx = self.workflowmanager.excute(action, context=context)
return action, state, result_ctx
state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, message, result_ctx
def on_node_complete(self, future):
"""
节点完成回调:更新上下文、处理后继节点
"""
action, state, result_ctx = future.result()
action, state, message, result_ctx = future.result()
# 节点执行失败
if not state:
self.success = False
self.errmsg = f"{action.name} 失败"
try:
self.finished_actions += 1
# 更新当前进度
self.context.progress = round(self.finished_actions / self.total_actions) * 100
# 补充执行历史
self.context.execute_history.append(
ActionExecution(
action=action.name,
result=state,
message=message
)
)
# 节点执行失败
if not state:
self.success = False
self.errmsg = f"{action.name} 失败"
return
with self.lock:
# 更新主上下文
self.merge_context(result_ctx)
# 回调
if self.step_callback:
self.step_callback(action, self.context)
# 处理后继节点
successors = self.adjacency.get(action.id, [])
for succ_id in successors:
with self.lock:
self.indegree[succ_id] -= 1
if self.indegree[succ_id] == 0:
self.queue.append(succ_id)
finally:
# 标记任务完成
with self.lock:
self.running_tasks -= 1
return
with self.lock:
# 更新主上下文
self.merge_context(result_ctx)
# 回调
if self.step_callback:
self.step_callback(action, self.context)
# 处理后继节点
successors = self.adjacency.get(action.id, [])
for succ_id in successors:
with self.lock:
self.indegree[succ_id] -= 1
if self.indegree[succ_id] == 0:
self.queue.append(succ_id)
# 标记任务完成
with self.lock:
self.running_tasks -= 1
def merge_context(self, context: ActionContext):
"""
合并上下文
@@ -221,7 +236,7 @@ class WorkflowChain(ChainBase):
self.workflowoper.fail(workflow_id, result=executor.errmsg)
return False, executor.errmsg
else:
logger.info(f"工作流 {workflow.name} 执行成")
logger.info(f"工作流 {workflow.name} 执行")
self.workflowoper.success(workflow_id)
return True, ""

View File

@@ -607,6 +607,8 @@ class GlobalVar(object):
STOP_EVENT: threading.Event = threading.Event()
# webpush订阅
SUBSCRIPTIONS: List[dict] = []
# 需应急停止的工作流
EMERGENCY_STOP_WORKFLOWS: List[str] = []
def stop_system(self):
"""
@@ -633,6 +635,26 @@ class GlobalVar(object):
"""
self.SUBSCRIPTIONS.append(subscription)
def stop_workflow(self, workflow_id: str):
"""
停止工作流
"""
if workflow_id not in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.append(workflow_id)
def workflow_resume(self, workflow_id: str):
"""
恢复工作流
"""
if workflow_id in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.remove(workflow_id)
def is_workflow_stopped(self, workflow_id: str):
"""
是否停止工作流
"""
return self.is_system_stopped or workflow_id in self.EMERGENCY_STOP_WORKFLOWS
# 实例化配置
settings = Settings()

View File

@@ -1,6 +1,7 @@
from time import sleep
from typing import Dict, Any, Tuple, List
from app.core.config import global_vars
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
@@ -54,7 +55,8 @@ class WorkFlowManager(metaclass=Singleton):
"""
pass
def excute(self, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
def excute(self, workflow_id: int, action: Action,
context: ActionContext = None) -> Tuple[bool, str, ActionContext]:
"""
执行工作流动作
"""
@@ -66,28 +68,30 @@ class WorkFlowManager(metaclass=Singleton):
# 执行
logger.info(f"执行动作: {action.id} - {action.name}")
try:
result_context = action_obj.execute(action.data, context)
result_context = action_obj.execute(workflow_id, action.data, context)
except Exception as err:
logger.error(f"{action.name} 执行失败: {err}")
return False, context
return False, f"{err}", context
loop = action.data.get("loop")
loop_interval = action.data.get("loop_interval")
if loop and loop_interval:
while not action_obj.done:
if global_vars.is_workflow_stopped(workflow_id):
break
# 等待
logger.info(f"{action.name} 等待 {loop_interval} 秒后继续执行 ...")
sleep(loop_interval)
# 执行
logger.info(f"继续执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.data, result_context)
result_context = action_obj.execute(workflow_id, action.data, result_context)
if action_obj.success:
logger.info(f"{action.name} 执行成功")
else:
logger.error(f"{action.name} 执行失败!")
return action_obj.success, result_context
return action_obj.success, action_obj.message, result_context
else:
logger.error(f"未找到动作: {action.type} - {action.name}")
return False, context
return False, " ", context
def list_actions(self) -> List[dict]:
"""

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from sqlalchemy import Column, Integer, JSON, Sequence, String
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_
from app.db import Base, db_query, db_update
@@ -63,7 +63,7 @@ class Workflow(Base):
@staticmethod
@db_update
def fail(db, wid: int, result: str):
db.query(Workflow).filter(Workflow.id == wid).update({
db.query(Workflow).filter(and_(Workflow.id == wid, Workflow.state != "P")).update({
"state": 'F',
"result": result,
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@@ -73,7 +73,7 @@ class Workflow(Base):
@staticmethod
@db_update
def success(db, wid: int, result: str = None):
db.query(Workflow).filter(Workflow.id == wid).update({
db.query(Workflow).filter(and_(Workflow.id == wid, Workflow.state != "P")).update({
"state": 'S',
"result": result,
"run_count": Workflow.run_count + 1,

View File

@@ -514,7 +514,7 @@ class Scheduler(metaclass=Singleton):
trigger=CronTrigger.from_crontab(workflow.timer),
id=job_id,
name=workflow.name,
kwargs={"job_id": job_id, "workflow_id": job_id},
kwargs={"job_id": job_id, "workflow_id": workflow.id},
replace_existing=True
)
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")

View File

@@ -87,6 +87,8 @@ class ChainEventType(Enum):
MediaRecognizeConvert = "media.recognize.convert"
# 推荐数据源
RecommendSource = "recommend.source"
# 工作流执行
WorkflowExecution = "workflow.execution"
# 系统配置Key字典

View File

@@ -3,12 +3,10 @@ from typing import Optional, List
from pydantic import BaseModel, Field
from app.schemas.context import Context, MediaInfo
from app.schemas.file import FileItem
from app.schemas.download import DownloadTask
from app.schemas.file import FileItem
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
from app.schemas.event import Event
class Workflow(BaseModel):
@@ -52,6 +50,15 @@ class Action(BaseModel):
data: Optional[dict] = Field({}, description="参数")
class ActionExecution(BaseModel):
"""
动作执行情况
"""
action: Optional[str] = Field(None, description="当前动作(名称)")
result: Optional[bool] = Field(None, description="执行结果")
message: Optional[str] = Field(None, description="执行消息")
class ActionContext(BaseModel):
"""
动作基础上下文,各动作通用数据
@@ -63,8 +70,8 @@ class ActionContext(BaseModel):
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
events: Optional[List[Event]] = Field([], description="事件列表")
execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史")
progress: Optional[int] = Field(0, description="执行进度(%")
class ActionFlow(BaseModel):

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.3.1'
FRONTEND_VERSION = 'v2.3.1'
APP_VERSION = 'v2.3.2'
FRONTEND_VERSION = 'v2.3.2'