Compare commits

...

22 Commits

Author SHA1 Message Date
jxxghp
8896867bb3 更新 fetch_medias.py 2025-03-02 14:23:37 +08:00
jxxghp
ba7c9eec7b fix 2025-03-02 13:16:46 +08:00
jxxghp
9b95fde8d1 v2.3.3
- 增加了多个索引和认证站点支持
- HDDolby切换为使用API(需要调整站点设置,否则无法正常刷新站点数据)
- 调整了IYUU认证使用的域名地址
- 继续完善任务工作流
2025-03-02 12:48:32 +08:00
jxxghp
2851f16395 feat:actions增加缓存机制 2025-03-02 12:27:36 +08:00
jxxghp
0d63dfb931 fix actions 2025-03-02 11:15:52 +08:00
jxxghp
37558e3135 更新 hddolby.py 2025-03-02 10:24:17 +08:00
jxxghp
96021e42a2 fix 2025-03-02 10:08:03 +08:00
jxxghp
c32b845515 feat:actions增加识别选项 2025-03-02 09:45:24 +08:00
jxxghp
147d980c54 fix hddolby 2025-03-02 08:51:09 +08:00
jxxghp
f91c43dde9 fix hddolby 2025-03-02 08:08:46 +08:00
jxxghp
4cf5cb06a0 fix hddolby 2025-03-02 08:06:25 +08:00
jxxghp
8e4b4c3144 add hddolby userdata api 2025-03-01 21:28:15 +08:00
jxxghp
c302013696 add hddolby api 2025-03-01 21:24:01 +08:00
jxxghp
37cb94c59d add hddolby api 2025-03-01 21:08:37 +08:00
jxxghp
01f7c6bc2b fix 2025-03-01 18:55:16 +08:00
jxxghp
8bd6ccb0de fix 完善事件和消息发送 2025-03-01 18:34:39 +08:00
jxxghp
ed8895dfbb v2.3.2
- 任务工作流支持手动停止、支持导入导出流程数据、完善动作组件等
2025-03-01 15:51:15 +08:00
jxxghp
a55632051b fix fetch_medias action 2025-03-01 13:54:29 +08:00
jxxghp
7e347a458d add ScanFileAction 2025-02-28 21:23:44 +08:00
jxxghp
cce71f23e2 add ScanFileAction 2025-02-28 21:11:51 +08:00
jxxghp
d68461a127 更新 scheduler.py 2025-02-28 19:37:39 +08:00
jxxghp
1bd12a9411 feat:工作流手动中止 2025-02-28 19:02:38 +08:00
32 changed files with 1045 additions and 254 deletions

View File

@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
from typing import List, Any, Union
from app.chain import ChainBase
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas import ActionContext, ActionParams
@@ -13,25 +15,35 @@ class BaseAction(ABC):
工作流动作基类
"""
# 动作ID
_action_id = None
# 完成标志
_done_flag = False
# 执行信息
_message = ""
# 缓存键值
_cache_key = "WorkflowCache-%s"
def __init__(self, action_id: str):
self._action_id = action_id
self.systemconfigoper = SystemConfigOper()
@classmethod
@property
@abstractmethod
def name(cls) -> str:
def name(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def description(cls) -> str:
def description(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def data(cls) -> dict:
def data(cls) -> dict: # noqa
pass
@property
@@ -49,14 +61,45 @@ class BaseAction(ABC):
"""
pass
def job_done(self):
@property
def message(self) -> str:
"""
执行信息
"""
return self._message
def job_done(self, message: str = None):
"""
标记动作完成
"""
self._message = message
self._done_flag = True
def check_cache(self, workflow_id: int, key: str) -> bool:
"""
检查是否处理过
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
return key in action_cache
def save_cache(self, workflow_id: int, data: Union[list, str]):
"""
保存缓存
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
if isinstance(data, list):
action_cache.extend(data)
else:
action_cache.append(data)
workflow_cache[self._action_id] = action_cache
self.systemconfigoper.set(workflow_key, workflow_cache)
@abstractmethod
def execute(self, params: ActionParams, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext:
"""
执行动作
"""

View File

@@ -1,8 +1,11 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.download import DownloadChain
from app.chain.media import MediaChain
from app.core.config import global_vars
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas import ActionParams, ActionContext, DownloadTask, MediaType
@@ -12,9 +15,10 @@ class AddDownloadParams(ActionParams):
"""
添加下载资源参数
"""
downloader: str = Field(None, description="下载器")
save_path: str = Field(None, description="保存路径")
only_lack: bool = Field(False, description="仅下载缺失的资源")
downloader: Optional[str] = Field(None, description="下载器")
save_path: Optional[str] = Field(None, description="保存路径")
labels: Optional[str] = Field(None, description="标签(,分隔)")
only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源")
class AddDownloadAction(BaseAction):
@@ -26,50 +30,58 @@ class AddDownloadAction(BaseAction):
_added_downloads = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "添加下载"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "根据资源列表添加下载任务"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return AddDownloadParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
将上下文中的torrents添加到下载任务中
"""
params = AddDownloadParams(**params)
_started = False
for t in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{t.torrent_info.site}-{t.torrent_info.title}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{t.torrent_info.title} 已添加过下载,跳过")
continue
if not t.meta_info:
t.meta_info = MetaInfo(title=t.title, subtitle=t.description)
t.meta_info = MetaInfo(title=t.torrent_info.title, subtitle=t.torrent_info.description)
if not t.media_info:
t.media_info = self.mediachain.recognize_media(meta=t.meta_info)
if not t.media_info:
self._has_error = True
logger.warning(f"{t.title} 未识别到媒体信息,无法下载")
logger.warning(f"{t.torrent_info.title} 未识别到媒体信息,无法下载")
continue
if params.only_lack:
exists_info = self.downloadchain.media_exists(t.media_info)
if exists_info:
if t.media_info.type == MediaType.MOVIE:
# 电影
logger.warning(f"{t.title} 媒体库中已存在,跳过")
logger.warning(f"{t.torrent_info.title} 媒体库中已存在,跳过")
continue
else:
# 电视剧
@@ -85,19 +97,23 @@ class AddDownloadAction(BaseAction):
logger.warning(f"{t.meta_info.title}{t.meta_info.begin_season} 季第 {t.meta_info.episode_list} 集已存在,跳过")
continue
_started = True
did = self.downloadchain.download_single(context=t,
downloader=params.downloader,
save_path=params.save_path)
save_path=params.save_path,
label=params.labels)
if did:
self._added_downloads.append(did)
else:
self._has_error = True
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._added_downloads:
logger.info(f"已添加 {len(self._added_downloads)} 个下载任务")
context.downloads.extend(
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)
elif _started:
self._has_error = True
self.job_done()
self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务")
return context

View File

@@ -1,6 +1,6 @@
from app.actions import BaseAction
from app.chain.subscribe import SubscribeChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.context import MediaInfo
from app.db.subscribe_oper import SubscribeOper
from app.log import logger
@@ -22,41 +22,50 @@ class AddSubscribeAction(BaseAction):
_added_subscribes = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "添加订阅"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "根据媒体列表添加订阅"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return AddSubscribeParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
将medias中的信息添加订阅如果订阅不存在的话
"""
_started = False
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{media.type}-{media.title}-{media.year}-{media.season}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{media.title} {media.year} 已添加过订阅,跳过")
continue
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
logger.info(f"{media.title} 已存在订阅")
continue
# 添加订阅
_started = True
sid, message = self.subscribechain.add(mtype=mediainfo.type,
title=mediainfo.title,
year=mediainfo.year,
@@ -67,13 +76,15 @@ class AddSubscribeAction(BaseAction):
username=settings.SUPERUSER)
if sid:
self._added_subscribes.append(sid)
else:
self._has_error = True
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._added_subscribes:
logger.info(f"已添加 {len(self._added_subscribes)} 个订阅")
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
elif _started:
self._has_error = True
self.job_done()
self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅")
return context

View File

@@ -1,4 +1,5 @@
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.log import logger
@@ -17,35 +18,37 @@ class FetchDownloadsAction(BaseAction):
_downloads = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取下载任务"
@classmethod
@property
def description(cls) -> str:
return "获取下载任务,更新任务状态"
def description(cls) -> str: # noqa
return "获取下载队列中的任务状态"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchDownloadsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
更新downloads中的下载任务状态
"""
__all_complete = False
for download in self._downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
logger.info(f"获取下载任务 {download.download_id} 状态 ...")
torrents = self.chain.list_torrents(hashs=[download.download_id])
if not torrents:
@@ -56,6 +59,9 @@ class FetchDownloadsAction(BaseAction):
if t.progress >= 100:
logger.info(f"下载任务 {download.download_id} 已完成")
download.completed = True
else:
logger.info(f"下载任务 {download.download_id} 未完成")
download.completed = False
if all([d.completed for d in self._downloads]):
self.job_done()
return context

View File

@@ -1,11 +1,11 @@
from typing import List
from typing import List, Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.recommend import RecommendChain
from app.schemas import ActionParams, ActionContext
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.event import eventmanager
from app.log import logger
from app.schemas import RecommendSourceEventData, MediaInfo
@@ -17,7 +17,9 @@ class FetchMediasParams(ActionParams):
"""
获取媒体数据参数
"""
sources: List[str] = Field([], description="媒体数据来源")
source_type: Optional[str] = Field("ranking", description="来源")
sources: Optional[List[str]] = Field([], description="榜单")
api_path: Optional[str] = Field(None, description="API路径")
class FetchMediasAction(BaseAction):
@@ -26,11 +28,11 @@ class FetchMediasAction(BaseAction):
"""
_inner_sources = []
_medias = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.__inner_sources = [
{
@@ -98,22 +100,22 @@ class FetchMediasAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取媒体数据"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "获取榜单等媒体数据列表"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchMediasParams().dict()
@property
def success(self) -> bool:
return True if self._medias else False
return not self._has_error
def __get_source(self, source: str):
"""
@@ -124,33 +126,49 @@ class FetchMediasAction(BaseAction):
return s
return None
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
获取媒体数据填充到medias
"""
params = FetchMediasParams(**params)
for name in params.sources:
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
try:
if params.source_type == "ranking":
for name in params.sources:
if global_vars.is_workflow_stopped(workflow_id):
break
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
api_url = f"http://127.0.0.1:{settings.PORT}{params.api_path}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
if results:
logger.info(f"{params.api_path} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
except Exception as e:
logger.error(f"获取媒体数据失败: {e}")
self._has_error = True
if self._medias:
context.medias.extend(self._medias)
self.job_done()
self.job_done(f"获取到 {len(self._medias)} 条媒数据")
return context

View File

@@ -3,7 +3,7 @@ from typing import Optional
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.context import Context
from app.core.metainfo import MetaInfo
from app.helper.rss import RssHelper
@@ -21,6 +21,7 @@ class FetchRssParams(ActionParams):
content_type: Optional[str] = Field(None, description="Content-Type")
referer: Optional[str] = Field(None, description="Referer")
ua: Optional[str] = Field(None, description="User-Agent")
match_media: Optional[str] = Field(None, description="匹配媒体信息")
class FetchRssAction(BaseAction):
@@ -31,31 +32,31 @@ class FetchRssAction(BaseAction):
_rss_torrents = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.rsshelper = RssHelper()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取RSS资源"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "订阅RSS地址获取资源"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchRssParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
请求RSS地址获取数据并解析为资源列表
"""
@@ -86,6 +87,8 @@ class FetchRssAction(BaseAction):
# 组装种子
for item in rss_items:
if global_vars.is_workflow_stopped(workflow_id):
break
if not item.get("title"):
continue
torrentinfo = TorrentInfo(
@@ -96,15 +99,17 @@ class FetchRssAction(BaseAction):
pubdate=item["pubdate"].strftime("%Y-%m-%d %H:%M:%S") if item.get("pubdate") else None,
)
meta = MetaInfo(title=torrentinfo.title, subtitle=torrentinfo.description)
mediainfo = self.chain.recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue
mediainfo = None
if params.match_media:
mediainfo = self.chain.recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))
if self._rss_torrents:
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
context.torrents.extend(self._rss_torrents)
self.job_done()
self.job_done(f"获取到 {len(self._rss_torrents)} 个资源")
return context

View File

@@ -1,9 +1,12 @@
import random
import time
from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction
from app.chain.search import SearchChain
from app.core.config import global_vars
from app.log import logger
from app.schemas import ActionParams, ActionContext, MediaType
@@ -12,11 +15,13 @@ class FetchTorrentsParams(ActionParams):
"""
获取站点资源参数
"""
name: str = Field(None, description="资源名称")
search_type: Optional[str] = Field("keyword", description="搜索类型")
name: Optional[str] = Field(None, description="资源名称")
year: Optional[str] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
sites: Optional[List[int]] = Field([], description="站点列表")
match_media: Optional[bool] = Field(False, description="匹配媒体信息")
class FetchTorrentsAction(BaseAction):
@@ -26,52 +31,73 @@ class FetchTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.searchchain = SearchChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "搜索站点资源"
@classmethod
@property
def description(cls) -> str:
return "根据关键字搜索站点种子资源"
def description(cls) -> str: # noqa
return "搜索站点种子资源列表"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchTorrentsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
搜索站点,获取资源列表
"""
params = FetchTorrentsParams(**params)
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites, cache_local=False)
for torrent in torrents:
if params.year and torrent.meta_info.year != params.year:
continue
if params.type and torrent.media_info and torrent.media_info.type != MediaType(params.type):
continue
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
if params.search_type == "keyword":
# 按关键字搜索
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites, cache_local=False)
for torrent in torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if params.year and torrent.meta_info.year != params.year:
continue
if params.type and torrent.media_info and torrent.media_info.type != MediaType(params.type):
continue
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
if params.match_media:
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
else:
# 搜索媒体列表
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
torrents = self.searchchain.search_by_id(tmdbid=media.tmdb_id,
doubanid=media.douban_id,
mtype=MediaType(media.type),
sites=params.sites)
for torrent in torrents:
self._torrents.append(torrent)
# 随机休眠 5-30秒
sleep_time = random.randint(5, 30)
logger.info(f"随机休眠 {sleep_time} 秒 ...")
time.sleep(sleep_time)
if self._torrents:
context.torrents.extend(self._torrents)
logger.info(f"搜索到 {len(self._torrents)} 条资源")
logger.info(f"搜索到 {len(self._torrents)} 条资源")
self.job_done()
self.job_done(f"搜索到 {len(self._torrents)} 个资源")
return context

View File

@@ -3,6 +3,8 @@ from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
from app.log import logger
from app.schemas import ActionParams, ActionContext
@@ -11,7 +13,6 @@ class FilterMediasParams(ActionParams):
过滤媒体数据参数
"""
type: Optional[str] = Field(None, description="媒体类型 (电影/电视剧)")
category: Optional[str] = Field(None, description="媒体类别 (二级分类)")
vote: Optional[int] = Field(0, description="评分")
year: Optional[str] = Field(None, description="年份")
@@ -25,41 +26,42 @@ class FilterMediasAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "过滤媒体数据"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "对媒体数据列表进行过滤"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FilterMediasParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
过滤medias中媒体数据
"""
params = FilterMediasParams(**params)
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
if params.type and media.type != params.type:
continue
if params.category and media.category != params.category:
continue
if params.vote and media.vote_average < params.vote:
continue
if params.year and media.year != params.year:
continue
self._medias.append(media)
if self._medias:
context.medias = self._medias
logger.info(f"过滤后剩余 {len(self._medias)} 条媒体数据")
self.job_done()
context.medias = self._medias
self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据")
return context

View File

@@ -3,7 +3,9 @@ from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import ActionParams, ActionContext
@@ -27,36 +29,38 @@ class FilterTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "过滤资源"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "对资源列表数据进行过滤"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FilterTorrentsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
过滤torrents中的资源
"""
params = FilterTorrentsParams(**params)
for torrent in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if self.torrenthelper.filter_torrent(
torrent_info=torrent.torrent_info,
filter_params={
@@ -75,7 +79,9 @@ class FilterTorrentsAction(BaseAction):
):
self._torrents.append(torrent)
logger.info(f"过滤后剩余 {len(self._torrents)} 个资源")
context.torrents = self._torrents
self.job_done()
self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源")
return context

84
app/actions/scan_file.py Normal file
View File

@@ -0,0 +1,84 @@
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.chain.storage import StorageChain
from app.core.config import global_vars, settings
from app.log import logger
from app.schemas import ActionParams, ActionContext
class ScanFileParams(ActionParams):
"""
整理文件参数
"""
# 存储
storage: Optional[str] = Field("local", description="存储")
directory: Optional[str] = Field(None, description="目录")
class ScanFileAction(BaseAction):
"""
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
@classmethod
@property
def name(cls) -> str: # noqa
return "扫描目录"
@classmethod
@property
def description(cls) -> str: # noqa
return "扫描目录文件到队列"
@classmethod
@property
def data(cls) -> dict: # noqa
return ScanFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
扫描目录中的所有文件记录到fileitems
"""
params = ScanFileParams(**params)
if not params.storage or not params.directory:
return context
fileitem = self.storagechain.get_file_item(params.storage, Path(params.directory))
if not fileitem:
logger.error(f"目录不存在: 【{params.storage}{params.directory}")
self._has_error = True
return context
files = self.storagechain.list_files(fileitem, recursion=True)
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
# 检查缓存
cache_key = f"{file.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{file.path} 已处理过,跳过")
continue
self._fileitems.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done(f"扫描到 {len(self._fileitems)} 个文件")
return context

View File

@@ -1,6 +1,7 @@
from pathlib import Path
from app.actions import BaseAction
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
@@ -23,47 +24,61 @@ class ScrapeFileAction(BaseAction):
_scraped_files = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "刮削文件"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "刮削媒体信息和图片"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return ScrapeFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
刮削fileitems中的所有文件
"""
# 失败次数
_failed_count = 0
for fileitem in context.fileitems:
if global_vars.is_workflow_stopped(workflow_id):
break
if fileitem in self._scraped_files:
continue
if not self.storagechain.exists(fileitem):
continue
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已刮削过,跳过")
continue
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = self.mediachain.recognize_media(meta)
if not mediainfo:
self._has_error = True
_failed_count += 1
logger.info(f"{fileitem.path} 未识别到媒体信息,无法刮削")
continue
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
self.job_done()
if not self._scraped_files and _failed_count:
self._has_error = True
self.job_done(f"成功刮削 {len(self._scraped_files)} 个文件,失败 {_failed_count}")
return context

View File

@@ -1,8 +1,7 @@
import copy
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.core.event import eventmanager
from app.schemas import ActionParams, ActionContext
from app.schemas.types import ChainEventType
class SendEventParams(ActionParams):
@@ -19,33 +18,31 @@ class SendEventAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "发送事件"
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有事件"
def description(cls) -> str: # noqa
return "发送任务执行事件"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return SendEventParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送events中的事件
发送工作流事件,以更插件干预工作流执行
"""
if context.events:
# 按优先级排序,优先级高的先发送
context.events.sort(key=lambda x: x.priority, reverse=True)
for event in copy.deepcopy(context.events):
eventmanager.send_event(etype=event.event_type, data=event.event_data)
context.events.remove(event)
# 触发资源下载事件,更新执行上下文
event = eventmanager.send_event(ChainEventType.WorkflowExecution, context)
if event and event.event_data:
context = event.event_data
self.job_done()
return context

View File

@@ -1,10 +1,9 @@
import copy
from typing import List, Optional, Union
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext
from app.schemas import ActionParams, ActionContext, Notification
class SendMessageParams(ActionParams):
@@ -20,42 +19,55 @@ class SendMessageAction(BaseAction):
发送消息
"""
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "发送消息"
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有消息"
def description(cls) -> str: # noqa
return "发送任务执行消息"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return SendMessageParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送messages中的消息
"""
for message in copy.deepcopy(context.messages):
if params.client:
message.source = params.client
if params.userid:
message.userid = params.userid
self.chain.post_message(message)
context.messages.remove(message)
context.messages = []
params = SendMessageParams(**params)
msg_text = f"当前进度:{context.progress}%"
index = 1
if context.execute_history:
for history in context.execute_history:
if not history.message:
continue
msg_text += f"\n{index}. {history.action}{history.message}"
index += 1
# 发送消息
if not params.client:
params.client = [None]
for client in params.client:
self.chain.post_message(
Notification(
source=client,
userid=params.userid,
title="【工作流执行结果】",
text=msg_text,
link="#/workflow"
)
)
self.job_done()
return context

View File

@@ -1,6 +1,12 @@
import copy
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
from app.db.transferhistory_oper import TransferHistoryOper
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
@@ -11,7 +17,8 @@ class TransferFileParams(ActionParams):
"""
整理文件参数
"""
pass
# 来源
source: Optional[str] = Field("downloads", description="来源")
class TransferFileAction(BaseAction):
@@ -22,53 +29,109 @@ class TransferFileAction(BaseAction):
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "整理文件"
@classmethod
@property
def description(cls) -> str:
return "整理下载队列中的文件"
def description(cls) -> str: # noqa
return "整理队列中的文件"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return TransferFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
从downloads中整理文件记录到fileitems
downloads / fileitems 中整理文件记录到fileitems
"""
for download in context.downloads:
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
def check_continue():
"""
检查是否继续整理文件
"""
if global_vars.is_workflow_stopped(workflow_id):
return False
return True
params = TransferFileParams(**params)
# 失败次数
_failed_count = 0
if params.source == "downloads":
# 从下载任务中整理文件
for download in context.downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
# 检查缓存
cache_key = f"{download.download_id}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{download.path} 已整理过,跳过")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
_failed_count += 1
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
self.save_cache(workflow_id, cache_key)
else:
# 从 fileitems 中整理文件
for fileitem in copy.deepcopy(context.fileitems):
if not check_continue():
break
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已整理过,跳过")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {fileitem.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False,
continue_callback=check_continue)
if not state:
_failed_count += 1
logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {fileitem.path} 完成")
# 从 fileitems 中移除已整理的文件
context.fileitems.remove(fileitem)
self._fileitems.append(fileitem)
# 记录已整理的文件
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)
elif _failed_count:
self._has_error = True
self.job_done()
self.job_done(f"整理成功 {len(self._fileitems)} 个文件,失败 {_failed_count}")
return context

View File

@@ -5,9 +5,11 @@ from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.core.config import global_vars
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
@@ -83,8 +85,12 @@ def delete_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
Workflow.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")
@@ -111,7 +117,9 @@ def start_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
Scheduler().update_workflow_job(workflow)
# 更新状态
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@@ -126,6 +134,29 @@ def pause_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)
@router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response)
def reset_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
重置工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 重置工作流
workflow.reset(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True)

View File

@@ -347,7 +347,7 @@ class ChainBase(metaclass=ABCMeta):
torrent_list=torrent_list, mediainfo=mediainfo)
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None
) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
@@ -357,11 +357,12 @@ class ChainBase(metaclass=ABCMeta):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 种子分类
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
return self.run_module("download", content=content, download_dir=download_dir,
cookie=cookie, episodes=episodes, category=category,
cookie=cookie, episodes=episodes, category=category, label=label,
downloader=downloader)
def download_added(self, context: Context, download_dir: Path, torrent_path: Path = None) -> None:

View File

@@ -209,7 +209,8 @@ class DownloadChain(ChainBase):
save_path: str = None,
userid: Union[str, int] = None,
username: str = None,
media_category: str = None) -> Optional[str]:
media_category: str = None,
label: str = None) -> Optional[str]:
"""
下载及发送通知
:param context: 资源上下文
@@ -222,6 +223,7 @@ class DownloadChain(ChainBase):
:param userid: 用户ID
:param username: 调用下载的用户名/插件名
:param media_category: 自定义媒体类别
:param label: 自定义标签
"""
# 发送资源下载事件,允许外部拦截下载
event_data = ResourceDownloadEventData(
@@ -310,6 +312,7 @@ class DownloadChain(ChainBase):
episodes=episodes,
download_dir=download_dir,
category=_media.category,
label=label,
downloader=downloader or _site_downloader)
if result:
_downloader, _hash, _layout, error_msg = result

View File

@@ -905,7 +905,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, background: bool = True,
manual: bool = False) -> Tuple[bool, str]:
manual: bool = False, continue_callback: Callable = None) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
@@ -926,6 +926,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param force: 是否强制整理
:param background: 是否后台运行
:param manual: 是否手动整理
:param continue_callback: 继续处理回调
返回:成功标识,错误信息
"""
@@ -991,6 +992,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
if continue_callback and not continue_callback():
break
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
@@ -1111,6 +1114,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
for transfer_task in transfer_tasks:
if global_vars.is_system_stopped:
break
if continue_callback and not continue_callback():
break
# 更新进度
__process_msg = f"正在整理 {processed_num + fail_num + 1}/{total_num}{transfer_task.fileitem.name} ..."
logger.info(__process_msg)

View File

@@ -14,7 +14,7 @@ from app.core.workflow import WorkFlowManager
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import ActionContext, ActionFlow, Action
from app.schemas import ActionContext, ActionFlow, Action, ActionExecution
class WorkflowExecutor:
@@ -33,6 +33,8 @@ class WorkflowExecutor:
self.step_callback = step_callback
self.actions = {action['id']: Action(**action) for action in workflow.actions}
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
self.total_actions = len(self.actions)
self.finished_actions = 0
self.success = True
self.errmsg = ""
@@ -64,6 +66,7 @@ class WorkflowExecutor:
# 初始上下文
if workflow.current_action and workflow.context:
logger.info(f"工作流已执行动作:{workflow.current_action}")
# Base64解码
decoded_data = base64.b64decode(workflow.context["content"])
# 反序列化数据
@@ -71,7 +74,9 @@ class WorkflowExecutor:
else:
self.context = ActionContext()
# 初始化队列入度为0的节点
# 恢复工作流
global_vars.workflow_resume(self.workflow.id)
# 初始化队列添加入度为0的节点
for action_id in self.actions:
if self.indegree[action_id] == 0:
self.queue.append(action_id)
@@ -89,7 +94,7 @@ class WorkflowExecutor:
if not self.success:
break
if not self.queue:
sleep(1)
sleep(0.1)
continue
# 取出队首节点
node_id = self.queue.popleft()
@@ -97,7 +102,8 @@ class WorkflowExecutor:
self.running_tasks += 1
# 已停机
if global_vars.is_system_stopped:
if global_vars.is_workflow_stopped(self.workflow.id):
global_vars.workflow_resume(self.workflow.id)
break
# 已执行的跳过
@@ -108,54 +114,66 @@ class WorkflowExecutor:
# 提交任务到线程池
future = self.executor.submit(
self.execute_node,
self.workflow.id,
node_id,
self.context
)
future.add_done_callback(self.on_node_complete)
def execute_node(self, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
def execute_node(self, workflow_id: int, node_id: int,
context: ActionContext) -> Tuple[Action, bool, str, ActionContext]:
"""
执行单个节点操作返回修改后的上下文和节点ID
"""
action = self.actions[node_id]
state, result_ctx = self.workflowmanager.excute(action, context=context)
return action, state, result_ctx
state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, message, result_ctx
def on_node_complete(self, future):
"""
节点完成回调:更新上下文、处理后继节点
"""
action, state, result_ctx = future.result()
action, state, message, result_ctx = future.result()
# 节点执行失败
if not state:
self.success = False
self.errmsg = f"{action.name} 失败"
try:
self.finished_actions += 1
# 更新当前进度
self.context.progress = round(self.finished_actions / self.total_actions) * 100
# 补充执行历史
self.context.execute_history.append(
ActionExecution(
action=action.name,
result=state,
message=message
)
)
# 节点执行失败
if not state:
self.success = False
self.errmsg = f"{action.name} 失败"
return
with self.lock:
# 更新主上下文
self.merge_context(result_ctx)
# 回调
if self.step_callback:
self.step_callback(action, self.context)
# 处理后继节点
successors = self.adjacency.get(action.id, [])
for succ_id in successors:
with self.lock:
self.indegree[succ_id] -= 1
if self.indegree[succ_id] == 0:
self.queue.append(succ_id)
finally:
# 标记任务完成
with self.lock:
self.running_tasks -= 1
return
with self.lock:
# 更新主上下文
self.merge_context(result_ctx)
# 回调
if self.step_callback:
self.step_callback(action, self.context)
# 处理后继节点
successors = self.adjacency.get(action.id, [])
for succ_id in successors:
with self.lock:
self.indegree[succ_id] -= 1
if self.indegree[succ_id] == 0:
self.queue.append(succ_id)
# 标记任务完成
with self.lock:
self.running_tasks -= 1
def merge_context(self, context: ActionContext):
"""
合并上下文
@@ -221,7 +239,7 @@ class WorkflowChain(ChainBase):
self.workflowoper.fail(workflow_id, result=executor.errmsg)
return False, executor.errmsg
else:
logger.info(f"工作流 {workflow.name} 执行成")
logger.info(f"工作流 {workflow.name} 执行")
self.workflowoper.success(workflow_id)
return True, ""

View File

@@ -607,6 +607,8 @@ class GlobalVar(object):
STOP_EVENT: threading.Event = threading.Event()
# webpush订阅
SUBSCRIPTIONS: List[dict] = []
# 需应急停止的工作流
EMERGENCY_STOP_WORKFLOWS: List[str] = []
def stop_system(self):
"""
@@ -633,6 +635,26 @@ class GlobalVar(object):
"""
self.SUBSCRIPTIONS.append(subscription)
def stop_workflow(self, workflow_id: str):
"""
停止工作流
"""
if workflow_id not in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.append(workflow_id)
def workflow_resume(self, workflow_id: str):
"""
恢复工作流
"""
if workflow_id in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.remove(workflow_id)
def is_workflow_stopped(self, workflow_id: str):
"""
是否停止工作流
"""
return self.is_system_stopped or workflow_id in self.EMERGENCY_STOP_WORKFLOWS
# 实例化配置
settings = Settings()

View File

@@ -1,6 +1,7 @@
from time import sleep
from typing import Dict, Any, Tuple, List
from app.core.config import global_vars
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
@@ -54,7 +55,8 @@ class WorkFlowManager(metaclass=Singleton):
"""
pass
def excute(self, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
def excute(self, workflow_id: int, action: Action,
context: ActionContext = None) -> Tuple[bool, str, ActionContext]:
"""
执行工作流动作
"""
@@ -62,32 +64,34 @@ class WorkFlowManager(metaclass=Singleton):
context = ActionContext()
if action.type in self._actions:
# 实例化
action_obj = self._actions[action.type]()
action_obj = self._actions[action.type](action.id)
# 执行
logger.info(f"执行动作: {action.id} - {action.name}")
try:
result_context = action_obj.execute(action.data, context)
result_context = action_obj.execute(workflow_id, action.data, context)
except Exception as err:
logger.error(f"{action.name} 执行失败: {err}")
return False, context
return False, f"{err}", context
loop = action.data.get("loop")
loop_interval = action.data.get("loop_interval")
if loop and loop_interval:
while not action_obj.done:
if global_vars.is_workflow_stopped(workflow_id):
break
# 等待
logger.info(f"{action.name} 等待 {loop_interval} 秒后继续执行 ...")
sleep(loop_interval)
# 执行
logger.info(f"继续执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.data, result_context)
result_context = action_obj.execute(workflow_id, action.data, result_context)
if action_obj.success:
logger.info(f"{action.name} 执行成功")
else:
logger.error(f"{action.name} 执行失败!")
return action_obj.success, result_context
return action_obj.success, action_obj.message, result_context
else:
logger.error(f"未找到动作: {action.type} - {action.name}")
return False, context
return False, " ", context
def list_actions(self) -> List[dict]:
"""

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from sqlalchemy import Column, Integer, JSON, Sequence, String
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_
from app.db import Base, db_query, db_update
@@ -63,7 +63,7 @@ class Workflow(Base):
@staticmethod
@db_update
def fail(db, wid: int, result: str):
db.query(Workflow).filter(Workflow.id == wid).update({
db.query(Workflow).filter(and_(Workflow.id == wid, Workflow.state != "P")).update({
"state": 'F',
"result": result,
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@@ -73,7 +73,7 @@ class Workflow(Base):
@staticmethod
@db_update
def success(db, wid: int, result: str = None):
db.query(Workflow).filter(Workflow.id == wid).update({
db.query(Workflow).filter(and_(Workflow.id == wid, Workflow.state != "P")).update({
"state": 'S',
"result": result,
"run_count": Workflow.run_count + 1,
@@ -88,6 +88,7 @@ class Workflow(Base):
"state": 'W',
"result": None,
"current_action": None,
"run_count": 0,
})
return True
@@ -95,7 +96,7 @@ class Workflow(Base):
@db_update
def update_current_action(db, wid: int, action_id: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({
"current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id,
"current_action": Workflow.current_action + f",{action_id}" if Workflow.current_action else action_id,
"context": context
})
return True

View File

@@ -10,6 +10,7 @@ from app.log import logger
from app.modules import _ModuleBase
from app.modules.indexer.parser import SiteParserBase
from app.modules.indexer.spider.haidan import HaiDanSpider
from app.modules.indexer.spider.hddolby import HddolbySpider
from app.modules.indexer.spider.mtorrent import MTorrentSpider
from app.modules.indexer.spider.tnode import TNodeSpider
from app.modules.indexer.spider.torrentleech import TorrentLeech
@@ -153,6 +154,12 @@ class IndexerModule(_ModuleBase):
keyword=search_word,
mtype=mtype
)
elif site.get('parser') == "HDDolby":
error_flag, result = HddolbySpider(site).search(
keyword=search_word,
mtype=mtype,
page=page
)
else:
error_flag, result = self.__spider_search(
search_word=search_word,

View File

@@ -32,6 +32,7 @@ class SiteSchema(Enum):
TNode = "TNode"
MTorrent = "MTorrent"
Yema = "Yema"
HDDolby = "HDDolby"
class SiteParserBase(metaclass=ABCMeta):
@@ -155,11 +156,17 @@ class SiteParserBase(metaclass=ABCMeta):
解析站点信息
:return:
"""
# 获取站点首页html
self._index_html = self._get_page_content(url=self._site_url)
# 检查是否已经登录
if not self._parse_logged_in(self._index_html):
return
# Cookie模式时获取站点首页html
if self.request_mode == "apikey":
if not self.apikey and not self.token:
logger.warn(f"{self._site_name} 未设置cookie 或 apikey/token跳过后续操作")
return
self._index_html = {}
else:
# 检查是否已经登录
self._index_html = self._get_page_content(url=self._site_url)
if not self._parse_logged_in(self._index_html):
return
# 解析站点页面
self._parse_site_page(self._index_html)
# 解析用户基础信息
@@ -293,9 +300,13 @@ class SiteParserBase(metaclass=ABCMeta):
req_headers = None
proxies = settings.PROXY if self._proxy else None
if self._ua or headers or self._addition_headers:
req_headers = {
"User-Agent": f"{self._ua}"
}
if self.request_mode == "apikey":
req_headers = {}
else:
req_headers = {
"User-Agent": f"{self._ua}"
}
if headers:
req_headers.update(headers)

View File

@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional, Tuple
from app.modules.indexer.parser import SiteParserBase, SiteSchema
from app.utils.string import StringUtils
class HDDolbySiteUserInfo(SiteParserBase):
schema = SiteSchema.HDDolby
request_mode = "apikey"
# 用户级别字典
HDDolby_sysRoleList = {
"0": "Peasant",
"1": "User",
"2": "Power User",
"3": "Elite User",
"4": "Crazy User",
"5": "Insane User",
"6": "Veteran User",
"7": "Extreme User",
"8": "Ultimate User",
"9": "Nexus Master",
"10": "VIP",
"11": "Retiree",
"12": "Helper",
"13": "Seeder",
"14": "Transferrer",
"15": "Uploader",
"16": "Torrent Manager",
"17": "Forum Moderator",
"18": "Coder",
"19": "Moderator",
"20": "Administrator",
"21": "Sysop",
"22": "Staff Leader",
}
def _parse_site_page(self, html_text: str):
"""
获取站点页面地址
"""
# 更换api地址
self._base_url = f"https://api.{StringUtils.get_url_domain(self._base_url)}"
self._user_traffic_page = None
self._user_detail_page = None
self._user_basic_page = "api/v1/user/data"
self._user_basic_params = {}
self._user_basic_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*"
}
self._sys_mail_unread_page = None
self._user_mail_unread_page = None
self._mail_unread_params = {}
self._torrent_seeding_page = "api/v1/user/peers"
self._torrent_seeding_params = {}
self._torrent_seeding_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*"
}
self._addition_headers = {
"x-api-key": self.apikey,
}
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
"""
解析用户基本信息这里把_parse_user_traffic_info和_parse_user_detail_info合并到这里
"""
if not html_text:
return None
detail = json.loads(html_text)
if not detail or detail.get("status") != 0:
return
user_infos = detail.get("data")
"""
{
"id": "1",
"added": "2019-03-03 15:30:36",
"last_access": "2025-02-18 19:48:04",
"class": "22",
"uploaded": "852071699418375",
"downloaded": "1885536536176",
"seedbonus": "99774808.0",
"sebonus": "3739023.7",
"unread_messages": "0",
}
"""
if not user_infos:
return
user_info = user_infos[0]
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = self.HDDolby_sysRoleList.get(user_info.get("class") or "1")
self.join_at = user_info.get("added")
self.upload = int(user_info.get("uploaded") or '0')
self.download = int(user_info.get("downloaded") or '0')
self.ratio = round(self.upload / self.download, 2) if self.download else 0
self.bonus = float(user_info.get("seedbonus") or "0")
self.message_unread = int(user_info.get("unread_messages") or '0')
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户流量信息
"""
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户详细信息
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
if not html_text:
return None
seeding_info = json.loads(html_text)
if not seeding_info or seeding_info.get("status") != 0:
return None
torrents = seeding_info.get("data", [])
page_seeding_size = 0
page_seeding_info = []
for info in torrents:
size = info.get("size")
seeder = info.get("seeders") or 1
page_seeding_size += size
page_seeding_info.append([seeder, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
return None
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
解析未读消息链接,这里直接读出详情
"""
pass
def _parse_message_content(self, html_text) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
解析消息内容
"""
pass

View File

@@ -0,0 +1,211 @@
from typing import Tuple, List
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
class HddolbySpider:
"""
HDDolby API
"""
_indexerid = None
_domain = None
_domain_host = None
_name = ""
_proxy = None
_cookie = None
_ua = None
_apikey = None
_size = 40
_pageurl = None
_timeout = 15
_searchurl = None
# 分类
_movie_category = [401, 405]
_tv_category = [402, 403, 404, 405]
# 标签
_labels = {
"gf": "官方",
"gy": "国语",
"yy": "粤语",
"ja": "日语",
"ko": "韩语",
"zz": "中文字幕",
"jz": "禁转",
"xz": "限转",
"diy": "DIY",
"sf": "首发",
"yq": "应求",
"m0": "零魔",
"yc": "原创",
"gz": "官字",
"db": "Dolby Vision",
"hdr10": "HDR10",
"hdrm": "HDR10+",
"tx": "特效",
"lz": "连载",
"wj": "完结",
"hdrv": "HDR Vivid",
"hlg": "HLG",
"hq": "高码率",
"hfr": "高帧率",
}
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:
self._indexerid = indexer.get('id')
self._domain = indexer.get('domain')
self._domain_host = StringUtils.get_url_domain(self._domain)
self._name = indexer.get('name')
if indexer.get('proxy'):
self._proxy = settings.PROXY
self._cookie = indexer.get('cookie')
self._ua = indexer.get('ua')
self._apikey = indexer.get('apikey')
self._timeout = indexer.get('timeout') or 15
self._searchurl = f"https://api.{self._domain_host}/api/v1/torrent/search"
self._pageurl = f"{self._domain}details.php?id=%s&hit=1"
def search(self, keyword: str, mtype: MediaType = None, page: int = 0) -> Tuple[bool, List[dict]]:
"""
搜索
"""
if mtype == MediaType.TV:
categories = self._tv_category
elif mtype == MediaType.MOVIE:
categories = self._movie_category
else:
categories = list(set(self._movie_category + self._tv_category))
# 输入参数
params = {
"keyword": keyword,
"page_number": page,
"page_size": 100,
"categories": categories,
"visible": 1,
}
res = RequestUtils(
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"x-api-key": self._apikey
},
cookies=self._cookie,
proxies=self._proxy,
referer=f"{self._domain}",
timeout=self._timeout
).post_res(url=self._searchurl, json=params)
torrents = []
if res and res.status_code == 200:
results = res.json().get('data', []) or []
for result in results:
"""
{
"id": 120202,
"promotion_time_type": 0,
"promotion_until": "0000-00-00 00:00:00",
"category": 402,
"medium": 6,
"codec": 1,
"standard": 2,
"team": 10,
"audiocodec": 14,
"leechers": 0,
"seeders": 1,
"name": "[DBY] Lost S06 2010 Complete 1080p Netflix WEB-DL AVC DDP5.1-DBTV",
"small_descr": "lost ",
"times_completed": 0,
"size": 33665425886,
"added": "2025-02-18 19:47:56",
"url": 0,
"hr": 0,
"tmdb_type": "tv",
"tmdb_id": 4607,
"imdb_id": null,
"tags": "gf"
}
"""
# 类别
category_value = result.get('category')
if category_value in self._tv_category:
category = MediaType.TV.value
elif category_value in self._movie_category:
category = MediaType.MOVIE.value
else:
category = MediaType.UNKNOWN.value
# 标签
torrentLabelIds = result.get('tags', "").split(";") or []
torrentLabels = []
for labelId in torrentLabelIds:
if self._labels.get(labelId) is not None:
torrentLabels.append(self._labels.get(labelId))
# 种子信息
torrent = {
'title': result.get('name'),
'description': result.get('small_descr'),
'enclosure': self.__get_download_url(result.get('id'), result.get('downhash')),
'pubdate': result.get('added'),
'size': result.get('size'),
'seeders': result.get('seeders'),
'peers': result.get('leechers'),
'grabs': result.get('times_completed'),
'downloadvolumefactor': self.__get_downloadvolumefactor(result.get('promotion_time_type')),
'uploadvolumefactor': self.__get_uploadvolumefactor(result.get('promotion_time_type')),
'freedate': result.get('promotion_until'),
'page_url': self._pageurl % (self._domain, result.get('id')),
'labels': torrentLabels,
'category': category
}
torrents.append(torrent)
elif res is not None:
logger.warn(f"{self._name} 搜索失败,错误码:{res.status_code}")
return True, []
else:
logger.warn(f"{self._name} 搜索失败,无法连接 {self._domain}")
return True, []
return False, torrents
@staticmethod
def __get_downloadvolumefactor(discount: int) -> float:
"""
获取下载系数
"""
discount_dict = {
2: 0,
5: 0.5,
6: 1,
7: 0.3
}
if discount:
return discount_dict.get(discount, 1)
return 1
@staticmethod
def __get_uploadvolumefactor(discount: int) -> float:
"""
获取上传系数
"""
discount_dict = {
3: 2,
4: 2,
6: 2
}
if discount:
return discount_dict.get(discount, 1)
return 1
def __get_download_url(self, torrent_id: int, downhash: str) -> str:
"""
获取下载链接返回base64编码的json字符串及URL
"""
return f"{self._domain}download.php?id={torrent_id}&downhash={downhash}"

View File

@@ -78,7 +78,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
server.reconnect()
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
根据种子文件,选择并添加下载任务
@@ -87,6 +87,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 分类
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
@@ -118,7 +119,9 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
# 生成随机Tag
tag = StringUtils.generate_random_str(10)
if settings.TORRENT_TAG:
if label:
tags = label.split(',') + [tag]
elif settings.TORRENT_TAG:
tags = [tag, settings.TORRENT_TAG]
else:
tags = [tag]

View File

@@ -79,7 +79,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
server.reconnect()
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
根据种子文件,选择并添加下载任务
@@ -88,6 +88,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 分类TR中未使用
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
@@ -118,8 +119,11 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
# 如果要选择文件则先暂停
is_paused = True if episodes else False
# 标签
if settings.TORRENT_TAG:
if label:
labels = label.split(',')
elif settings.TORRENT_TAG:
labels = [settings.TORRENT_TAG]
else:
labels = None

View File

@@ -514,7 +514,7 @@ class Scheduler(metaclass=Singleton):
trigger=CronTrigger.from_crontab(workflow.timer),
id=job_id,
name=workflow.name,
kwargs={"job_id": job_id, "workflow_id": job_id},
kwargs={"job_id": job_id, "workflow_id": workflow.id},
replace_existing=True
)
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")

View File

@@ -87,6 +87,8 @@ class ChainEventType(Enum):
MediaRecognizeConvert = "media.recognize.convert"
# 推荐数据源
RecommendSource = "recommend.source"
# 工作流执行
WorkflowExecution = "workflow.execution"
# 系统配置Key字典

View File

@@ -3,12 +3,10 @@ from typing import Optional, List
from pydantic import BaseModel, Field
from app.schemas.context import Context, MediaInfo
from app.schemas.file import FileItem
from app.schemas.download import DownloadTask
from app.schemas.file import FileItem
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
from app.schemas.event import Event
class Workflow(BaseModel):
@@ -52,6 +50,15 @@ class Action(BaseModel):
data: Optional[dict] = Field({}, description="参数")
class ActionExecution(BaseModel):
"""
动作执行情况
"""
action: Optional[str] = Field(None, description="当前动作(名称)")
result: Optional[bool] = Field(None, description="执行结果")
message: Optional[str] = Field(None, description="执行消息")
class ActionContext(BaseModel):
"""
动作基础上下文,各动作通用数据
@@ -63,8 +70,8 @@ class ActionContext(BaseModel):
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
events: Optional[List[Event]] = Field([], description="事件列表")
execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史")
progress: Optional[int] = Field(0, description="执行进度(%")
class ActionFlow(BaseModel):

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.3.1'
FRONTEND_VERSION = 'v2.3.1'
APP_VERSION = 'v2.3.3'
FRONTEND_VERSION = 'v2.3.3'