Compare commits

...

43 Commits

Author SHA1 Message Date
jxxghp
4086ba4763 更新 version.py 2025-02-28 12:30:45 +08:00
jxxghp
6a9cdf71d7 fix AddDownloadAction 2025-02-28 12:12:52 +08:00
jxxghp
a9644c4f86 fix actions 2025-02-28 11:56:26 +08:00
jxxghp
cf62ad5e8e fix actions 2025-02-28 11:15:24 +08:00
jxxghp
f8ed16666c fix actions execute 2025-02-27 20:39:42 +08:00
jxxghp
37926b4c19 fix actions 2025-02-27 18:58:11 +08:00
jxxghp
b080a2003f fix actions 2025-02-27 17:08:38 +08:00
jxxghp
ab0008be86 fix actions 2025-02-27 13:09:01 +08:00
jxxghp
4a42b0d000 fix import 2025-02-26 21:13:41 +08:00
jxxghp
e3d4b19dac fix actionid type 2025-02-26 20:28:10 +08:00
jxxghp
403d600db4 fix workflow edit api 2025-02-26 19:06:30 +08:00
jxxghp
835e6e8891 fix workflow scheduler 2025-02-26 18:32:25 +08:00
jxxghp
eec25113b5 fix workflow scheduler 2025-02-26 18:24:27 +08:00
jxxghp
a7c4161f91 fix workflow executor 2025-02-26 12:57:57 +08:00
jxxghp
799eb9e6ef add workflow executor 2025-02-26 08:37:37 +08:00
jxxghp
88993cb67b fix workflow api 2025-02-25 17:27:21 +08:00
jxxghp
0dc9c98c06 fix workflow api 2025-02-25 13:35:32 +08:00
jxxghp
c1c91cec44 fix workflow api 2025-02-25 13:25:56 +08:00
jxxghp
19b6927320 fix workflow process 2025-02-25 12:42:15 +08:00
jxxghp
0889ebc8b8 fix workflow schema 2025-02-25 08:25:19 +08:00
jxxghp
fb249c0ea5 fix workflow excute 2025-02-25 08:22:02 +08:00
jxxghp
feb22ff0a7 Merge pull request #3922 from WingGao/v2 2025-02-22 17:51:13 +08:00
WingGao
3c95156ce1 fix: alist不应该缓存失败的结果 2025-02-22 15:05:04 +08:00
jxxghp
8b6dca6a46 fix bug 2025-02-22 11:22:21 +08:00
jxxghp
43907eea26 fix 2025-02-22 11:12:14 +08:00
jxxghp
67145a80d0 add workflow apis 2025-02-22 10:35:57 +08:00
jxxghp
0b3138fec6 fix actions 2025-02-22 09:57:32 +08:00
jxxghp
b84896b4f9 Merge pull request #3919 from InfinityPacer/feature/plugin 2025-02-22 07:46:02 +08:00
InfinityPacer
efd046d2f8 fix(plugin): handle None response for online plugins retrieval 2025-02-22 00:34:35 +08:00
jxxghp
06fcf817bb Merge pull request #3917 from gtsicko/v2 2025-02-21 07:29:23 +08:00
gtsicko
16a94d9054 fix: 修复带路径的WECHAT_PROXY不生效 2025-02-20 23:41:14 +08:00
jxxghp
5bf502188d fix 2025-02-20 19:32:58 +08:00
jxxghp
5269b4bc82 fix #3914
feat:搜索支持指定站点
2025-02-20 13:03:12 +08:00
jxxghp
e3f8ed9886 add downloads path 2025-02-20 10:51:22 +08:00
jxxghp
74de554fb0 Merge pull request #3914 from TimoYoung/v2 2025-02-19 18:01:49 +08:00
jxxghp
b41de1a982 fix actions 2025-02-19 17:44:14 +08:00
Timo_Young
25f7d9ccdd Merge branch 'jxxghp:v2' into v2 2025-02-19 17:28:22 +08:00
yangyux
9646745181 fix: mtype为空且tmdbid在movie和tv中都存在时的识别错误问题 2025-02-19 17:27:38 +08:00
jxxghp
1317d9c4f0 fix actions 2025-02-19 16:43:42 +08:00
jxxghp
351029a842 fix AddDownloadAction 2025-02-19 15:24:13 +08:00
jxxghp
15e1fb61ac fix actions 2025-02-19 08:33:15 +08:00
jxxghp
1889a829b5 fix workflow process 2025-02-19 08:16:35 +08:00
jxxghp
53a14fce38 fix workflow process 2025-02-19 08:15:49 +08:00
44 changed files with 1580 additions and 184 deletions

View File

@@ -1,39 +1,45 @@
from abc import ABC, abstractmethod
from pydantic.main import BaseModel
from app.chain import ChainBase
from app.schemas import ActionContext, ActionParams
class BaseAction(BaseModel, ABC):
class ActionChain(ChainBase):
pass
class BaseAction(ABC):
"""
工作流动作基类
"""
# 完成标志
_done_flag = False
@classmethod
@property
@abstractmethod
def name(self) -> str:
def name(cls) -> str:
pass
@classmethod
@property
@abstractmethod
def description(cls) -> str:
pass
@classmethod
@property
@abstractmethod
def data(cls) -> dict:
pass
@property
@abstractmethod
def description(self) -> str:
pass
@abstractmethod
def execute(self, params: ActionParams, context: ActionContext) -> ActionContext:
"""
执行动作
"""
raise NotImplementedError
@property
@abstractmethod
def done(self) -> bool:
"""
判断动作是否完成
"""
pass
return self._done_flag
@property
@abstractmethod
@@ -42,3 +48,16 @@ class BaseAction(BaseModel, ABC):
判断动作是否成功
"""
pass
def job_done(self):
"""
标记动作完成
"""
self._done_flag = True
@abstractmethod
def execute(self, params: ActionParams, context: ActionContext) -> ActionContext:
"""
执行动作
"""
raise NotImplementedError

View File

@@ -0,0 +1,103 @@
from pydantic import Field
from app.actions import BaseAction
from app.chain.download import DownloadChain
from app.chain.media import MediaChain
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas import ActionParams, ActionContext, DownloadTask, MediaType
class AddDownloadParams(ActionParams):
"""
添加下载资源参数
"""
downloader: str = Field(None, description="下载器")
save_path: str = Field(None, description="保存路径")
only_lack: bool = Field(False, description="仅下载缺失的资源")
class AddDownloadAction(BaseAction):
"""
添加下载资源
"""
# 已添加的下载
_added_downloads = []
_has_error = False
def __init__(self):
super().__init__()
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
return "添加下载"
@classmethod
@property
def description(cls) -> str:
return "根据资源列表添加下载任务"
@classmethod
@property
def data(cls) -> dict:
return AddDownloadParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
将上下文中的torrents添加到下载任务中
"""
params = AddDownloadParams(**params)
for t in context.torrents:
if not t.meta_info:
t.meta_info = MetaInfo(title=t.title, subtitle=t.description)
if not t.media_info:
t.media_info = self.mediachain.recognize_media(meta=t.meta_info)
if not t.media_info:
self._has_error = True
logger.warning(f"{t.title} 未识别到媒体信息,无法下载")
continue
if params.only_lack:
exists_info = self.downloadchain.media_exists(t.media_info)
if exists_info:
if t.media_info.type == MediaType.MOVIE:
# 电影
logger.warning(f"{t.title} 媒体库中已存在,跳过")
continue
else:
# 电视剧
exists_seasons = exists_info.seasons or {}
if len(t.meta_info.season_list) > 1:
# 多季不下载
logger.warning(f"{t.meta_info.title} 有多季,跳过")
continue
else:
exists_episodes = exists_seasons.get(t.meta_info.begin_season)
if exists_episodes:
if set(t.meta_info.episode_list).issubset(exists_episodes):
logger.warning(f"{t.meta_info.title}{t.meta_info.begin_season} 季第 {t.meta_info.episode_list} 集已存在,跳过")
continue
did = self.downloadchain.download_single(context=t,
downloader=params.downloader,
save_path=params.save_path)
if did:
self._added_downloads.append(did)
else:
self._has_error = True
if self._added_downloads:
logger.info(f"已添加 {len(self._added_downloads)} 个下载任务")
context.downloads.extend(
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)
self.job_done()
return context

View File

@@ -0,0 +1,79 @@
from app.actions import BaseAction
from app.chain.subscribe import SubscribeChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.db.subscribe_oper import SubscribeOper
from app.log import logger
from app.schemas import ActionParams, ActionContext
class AddSubscribeParams(ActionParams):
"""
添加订阅参数
"""
pass
class AddSubscribeAction(BaseAction):
"""
添加订阅
"""
_added_subscribes = []
_has_error = False
def __init__(self):
super().__init__()
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
@classmethod
@property
def name(cls) -> str:
return "添加订阅"
@classmethod
@property
def description(cls) -> str:
return "根据媒体列表添加订阅"
@classmethod
@property
def data(cls) -> dict:
return AddSubscribeParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
将medias中的信息添加订阅如果订阅不存在的话
"""
for media in context.medias:
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
logger.info(f"{media.title} 已存在订阅")
continue
# 添加订阅
sid, message = self.subscribechain.add(mtype=mediainfo.type,
title=mediainfo.title,
year=mediainfo.year,
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season,
doubanid=mediainfo.douban_id,
bangumiid=mediainfo.bangumi_id,
username=settings.SUPERUSER)
if sid:
self._added_subscribes.append(sid)
else:
self._has_error = True
if self._added_subscribes:
logger.info(f"已添加 {len(self._added_subscribes)} 个订阅")
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
self.job_done()
return context

View File

@@ -0,0 +1,61 @@
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext
from app.log import logger
class FetchDownloadsParams(ActionParams):
"""
获取下载任务参数
"""
pass
class FetchDownloadsAction(BaseAction):
"""
获取下载任务
"""
_downloads = []
def __init__(self):
super().__init__()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
return "获取下载任务"
@classmethod
@property
def description(cls) -> str:
return "获取下载任务,更新任务状态"
@classmethod
@property
def data(cls) -> dict:
return FetchDownloadsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
更新downloads中的下载任务状态
"""
__all_complete = False
for download in self._downloads:
logger.info(f"获取下载任务 {download.download_id} 状态 ...")
torrents = self.chain.list_torrents(hashs=[download.download_id])
if not torrents:
download.completed = True
continue
for t in torrents:
download.path = t.path
if t.progress >= 100:
logger.info(f"下载任务 {download.download_id} 已完成")
download.completed = True
if all([d.completed for d in self._downloads]):
self.job_done()
return context

View File

@@ -0,0 +1,156 @@
from typing import List
from pydantic import Field
from app.actions import BaseAction
from app.chain.recommend import RecommendChain
from app.schemas import ActionParams, ActionContext
from app.core.config import settings
from app.core.event import eventmanager
from app.log import logger
from app.schemas import RecommendSourceEventData, MediaInfo
from app.schemas.types import ChainEventType
from app.utils.http import RequestUtils
class FetchMediasParams(ActionParams):
"""
获取媒体数据参数
"""
sources: List[str] = Field([], description="媒体数据来源")
class FetchMediasAction(BaseAction):
"""
获取媒体数据
"""
_inner_sources = []
_medias = []
def __init__(self):
super().__init__()
self.__inner_sources = [
{
"func": RecommendChain().tmdb_trending,
"name": '流行趋势',
},
{
"func": RecommendChain().douban_movie_showing,
"name": '正在热映',
},
{
"func": RecommendChain().bangumi_calendar,
"name": 'Bangumi每日放送',
},
{
"func": RecommendChain().tmdb_movies,
"name": 'TMDB热门电影',
},
{
"func": RecommendChain().tmdb_tvs,
"name": 'TMDB热门电视剧',
},
{
"func": RecommendChain().douban_movie_hot,
"name": '豆瓣热门电影',
},
{
"func": RecommendChain().douban_tv_hot,
"name": '豆瓣热门电视剧',
},
{
"func": RecommendChain().douban_tv_animation,
"name": '豆瓣热门动漫',
},
{
"func": RecommendChain().douban_movies,
"name": '豆瓣最新电影',
},
{
"func": RecommendChain().douban_tvs,
"name": '豆瓣最新电视剧',
},
{
"func": RecommendChain().douban_movie_top250,
"name": '豆瓣电影TOP250',
},
{
"func": RecommendChain().douban_tv_weekly_chinese,
"name": '豆瓣国产剧集榜',
},
{
"func": RecommendChain().douban_tv_weekly_global,
"name": '豆瓣全球剧集榜',
}
]
# 广播事件,请示额外的推荐数据源支持
event_data = RecommendSourceEventData()
event = eventmanager.send_event(ChainEventType.RecommendSource, event_data)
# 使用事件返回的上下文数据
if event and event.event_data:
event_data: RecommendSourceEventData = event.event_data
if event_data.extra_sources:
self.__inner_sources.extend([s.dict() for s in event_data.extra_sources])
@classmethod
@property
def name(cls) -> str:
return "获取媒体数据"
@classmethod
@property
def description(cls) -> str:
return "获取榜单等媒体数据列表"
@classmethod
@property
def data(cls) -> dict:
return FetchMediasParams().dict()
@property
def success(self) -> bool:
return True if self._medias else False
def __get_source(self, source: str):
"""
获取数据源
"""
for s in self.__inner_sources:
if s['name'] == source:
return s
return None
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
获取媒体数据填充到medias
"""
params = FetchMediasParams(**params)
for name in params.sources:
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
if self._medias:
context.medias.extend(self._medias)
self.job_done()
return context

View File

@@ -2,8 +2,13 @@ from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.actions import BaseAction, ActionChain
from app.core.config import settings
from app.core.context import Context
from app.core.metainfo import MetaInfo
from app.helper.rss import RssHelper
from app.log import logger
from app.schemas import ActionParams, ActionContext, TorrentInfo
class FetchRssParams(ActionParams):
@@ -13,8 +18,9 @@ class FetchRssParams(ActionParams):
url: str = Field(None, description="RSS地址")
proxy: Optional[bool] = Field(False, description="是否使用代理")
timeout: Optional[int] = Field(15, description="超时时间")
headers: Optional[dict] = Field(None, description="请求头")
recognize: Optional[bool] = Field(False, description="是否识别")
content_type: Optional[str] = Field(None, description="Content-Type")
referer: Optional[str] = Field(None, description="Referer")
ua: Optional[str] = Field(None, description="User-Agent")
class FetchRssAction(BaseAction):
@@ -22,21 +28,83 @@ class FetchRssAction(BaseAction):
获取RSS资源列表
"""
@property
def name(self) -> str:
return "获取RSS资源列表"
_rss_torrents = []
_has_error = False
@property
def description(self) -> str:
return "请求RSS地址获取数据并解析为资源列表"
async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext:
pass
def __init__(self):
super().__init__()
self.rsshelper = RssHelper()
self.chain = ActionChain()
@classmethod
@property
def done(self) -> bool:
return True
def name(cls) -> str:
return "获取RSS资源"
@classmethod
@property
def description(cls) -> str:
return "订阅RSS地址获取资源"
@classmethod
@property
def data(cls) -> dict:
return FetchRssParams().dict()
@property
def success(self) -> bool:
return True
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
请求RSS地址获取数据并解析为资源列表
"""
params = FetchRssParams(**params)
if not params.url:
return context
headers = {}
if params.content_type:
headers["Content-Type"] = params.content_type
if params.referer:
headers["Referer"] = params.referer
if params.ua:
headers["User-Agent"] = params.ua
rss_items = self.rsshelper.parse(url=params.url,
proxy=settings.PROXY if params.proxy else None,
timeout=params.timeout,
headers=headers)
if rss_items is None or rss_items is False:
logger.error(f'RSS地址 {params.url} 请求失败!')
self._has_error = True
return context
if not rss_items:
logger.error(f'RSS地址 {params.url} 未获取到RSS数据')
return context
# 组装种子
for item in rss_items:
if not item.get("title"):
continue
torrentinfo = TorrentInfo(
title=item.get("title"),
enclosure=item.get("enclosure"),
page_url=item.get("link"),
size=item.get("size"),
pubdate=item["pubdate"].strftime("%Y-%m-%d %H:%M:%S") if item.get("pubdate") else None,
)
meta = MetaInfo(title=torrentinfo.title, subtitle=torrentinfo.description)
mediainfo = self.chain.recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))
if self._rss_torrents:
logger.info(f"已获取 {len(self._rss_torrents)} 个RSS资源")
context.torrents.extend(self._rss_torrents)
self.job_done()
return context

View File

@@ -0,0 +1,77 @@
from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction
from app.chain.search import SearchChain
from app.log import logger
from app.schemas import ActionParams, ActionContext, MediaType
class FetchTorrentsParams(ActionParams):
"""
获取站点资源参数
"""
name: str = Field(None, description="资源名称")
year: Optional[str] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
sites: Optional[List[int]] = Field([], description="站点列表")
class FetchTorrentsAction(BaseAction):
"""
搜索站点资源
"""
_torrents = []
def __init__(self):
super().__init__()
self.searchchain = SearchChain()
@classmethod
@property
def name(cls) -> str:
return "搜索站点资源"
@classmethod
@property
def description(cls) -> str:
return "根据关键字搜索站点种子资源"
@classmethod
@property
def data(cls) -> dict:
return FetchTorrentsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
搜索站点,获取资源列表
"""
params = FetchTorrentsParams(**params)
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites, cache_local=False)
for torrent in torrents:
if params.year and torrent.meta_info.year != params.year:
continue
if params.type and torrent.media_info and torrent.media_info.type != MediaType(params.type):
continue
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
if self._torrents:
context.torrents.extend(self._torrents)
logger.info(f"搜索到 {len(self._torrents)} 条资源")
self.job_done()
return context

View File

@@ -0,0 +1,65 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
class FilterMediasParams(ActionParams):
"""
过滤媒体数据参数
"""
type: Optional[str] = Field(None, description="媒体类型 (电影/电视剧)")
category: Optional[str] = Field(None, description="媒体类别 (二级分类)")
vote: Optional[int] = Field(0, description="评分")
year: Optional[str] = Field(None, description="年份")
class FilterMediasAction(BaseAction):
"""
过滤媒体数据
"""
_medias = []
@classmethod
@property
def name(cls) -> str:
return "过滤媒体数据"
@classmethod
@property
def description(cls) -> str:
return "对媒体数据列表进行过滤"
@classmethod
@property
def data(cls) -> dict:
return FilterMediasParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
过滤medias中媒体数据
"""
params = FilterMediasParams(**params)
for media in context.medias:
if params.type and media.type != params.type:
continue
if params.category and media.category != params.category:
continue
if params.vote and media.vote_average < params.vote:
continue
if params.year and media.year != params.year:
continue
self._medias.append(media)
if self._medias:
context.medias = self._medias
self.job_done()
return context

View File

@@ -0,0 +1,81 @@
from typing import Optional, List
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.helper.torrent import TorrentHelper
from app.schemas import ActionParams, ActionContext
class FilterTorrentsParams(ActionParams):
"""
过滤资源数据参数
"""
rule_groups: Optional[List[str]] = Field([], description="规则组")
quality: Optional[str] = Field(None, description="资源质量")
resolution: Optional[str] = Field(None, description="资源分辨率")
effect: Optional[str] = Field(None, description="特效")
include: Optional[str] = Field(None, description="包含规则")
exclude: Optional[str] = Field(None, description="排除规则")
size: Optional[str] = Field(None, description="资源大小范围MB")
class FilterTorrentsAction(BaseAction):
"""
过滤资源数据
"""
_torrents = []
def __init__(self):
super().__init__()
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
return "过滤资源"
@classmethod
@property
def description(cls) -> str:
return "对资源列表数据进行过滤"
@classmethod
@property
def data(cls) -> dict:
return FilterTorrentsParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
过滤torrents中的资源
"""
params = FilterTorrentsParams(**params)
for torrent in context.torrents:
if self.torrenthelper.filter_torrent(
torrent_info=torrent.torrent_info,
filter_params={
"quality": params.quality,
"resolution": params.resolution,
"effect": params.effect,
"include": params.include,
"exclude": params.exclude,
"size": params.size
}
):
if self.chain.filter_torrents(
rule_groups=params.rule_groups,
torrent_list=[torrent.torrent_info],
mediainfo=torrent.media_info
):
self._torrents.append(torrent)
context.torrents = self._torrents
self.job_done()
return context

View File

@@ -0,0 +1,69 @@
from pathlib import Path
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.core.metainfo import MetaInfoPath
from app.log import logger
class ScrapeFileParams(ActionParams):
"""
刮削文件参数
"""
pass
class ScrapeFileAction(BaseAction):
"""
刮削文件
"""
_scraped_files = []
_has_error = False
def __init__(self):
super().__init__()
self.storagechain = StorageChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
return "刮削文件"
@classmethod
@property
def description(cls) -> str:
return "刮削媒体信息和图片"
@classmethod
@property
def data(cls) -> dict:
return ScrapeFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
刮削fileitems中的所有文件
"""
for fileitem in context.fileitems:
if fileitem in self._scraped_files:
continue
if not self.storagechain.exists(fileitem):
continue
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = self.mediachain.recognize_media(meta)
if not mediainfo:
self._has_error = True
logger.info(f"{fileitem.path} 未识别到媒体信息,无法刮削")
continue
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
self.job_done()
return context

View File

@@ -1,42 +0,0 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
class SearchTorrentsParams(ActionParams):
"""
搜索站点资源参数
"""
name: str = Field(None, description="资源名称")
year: Optional[int] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
recognize: Optional[bool] = Field(False, description="是否识别")
class SearchTorrentsAction(BaseAction):
"""
搜索站点资源
"""
@property
def name(self) -> str:
return "搜索站点资源"
@property
def description(self) -> str:
return "根据关键字搜索站点种子资源"
@property
def done(self) -> bool:
return True
@property
def success(self) -> bool:
return True
async def execute(self, params: SearchTorrentsParams, context: ActionContext) -> ActionContext:
pass

View File

@@ -0,0 +1,51 @@
import copy
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.core.event import eventmanager
class SendEventParams(ActionParams):
"""
发送事件参数
"""
pass
class SendEventAction(BaseAction):
"""
发送事件
"""
@classmethod
@property
def name(cls) -> str:
return "发送事件"
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有事件"
@classmethod
@property
def data(cls) -> dict:
return SendEventParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
发送events中的事件
"""
if context.events:
# 按优先级排序,优先级高的先发送
context.events.sort(key=lambda x: x.priority, reverse=True)
for event in copy.deepcopy(context.events):
eventmanager.send_event(etype=event.event_type, data=event.event_data)
context.events.remove(event)
self.job_done()
return context

View File

@@ -0,0 +1,61 @@
import copy
from typing import List, Optional, Union
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext
class SendMessageParams(ActionParams):
"""
发送消息参数
"""
client: Optional[List[str]] = Field([], description="消息渠道")
userid: Optional[Union[str, int]] = Field(None, description="用户ID")
class SendMessageAction(BaseAction):
"""
发送消息
"""
def __init__(self):
super().__init__()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
return "发送消息"
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有消息"
@classmethod
@property
def data(cls) -> dict:
return SendMessageParams().dict()
@property
def success(self) -> bool:
return self.done
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
发送messages中的消息
"""
for message in copy.deepcopy(context.messages):
if params.client:
message.source = params.client
if params.userid:
message.userid = params.userid
self.chain.post_message(message)
context.messages.remove(message)
context.messages = []
self.job_done()
return context

View File

@@ -0,0 +1,74 @@
from pathlib import Path
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.log import logger
class TransferFileParams(ActionParams):
"""
整理文件参数
"""
pass
class TransferFileAction(BaseAction):
"""
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
self.transferchain = TransferChain()
self.storagechain = StorageChain()
@classmethod
@property
def name(cls) -> str:
return "整理文件"
@classmethod
@property
def description(cls) -> str:
return "整理下载队列中的文件"
@classmethod
@property
def data(cls) -> dict:
return TransferFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, params: dict, context: ActionContext) -> ActionContext:
"""
从downloads中整理文件记录到fileitems
"""
for download in context.downloads:
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done()
return context

View File

@@ -31,6 +31,7 @@ def search_by_id(mediaid: str,
title: str = None,
year: int = None,
season: str = None,
sites: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据TMDBID/豆瓣ID精确搜索站点资源 tmdb:/douban:/bangumi:
@@ -39,6 +40,10 @@ def search_by_id(mediaid: str,
mtype = MediaType(mtype)
if season:
season = int(season)
if sites:
site_list = [int(site) for site in sites.split(",") if site]
else:
site_list = None
torrents = None
# 根据前缀识别媒体ID
if mediaid.startswith("tmdb:"):
@@ -48,11 +53,13 @@ def search_by_id(mediaid: str,
doubaninfo = MediaChain().get_doubaninfo_by_tmdbid(tmdbid=tmdbid, mtype=mtype)
if doubaninfo:
torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"),
mtype=mtype, area=area, season=season)
mtype=mtype, area=area, season=season,
sites=site_list)
else:
return schemas.Response(success=False, message="未识别到豆瓣媒体信息")
else:
torrents = SearchChain().search_by_id(tmdbid=tmdbid, mtype=mtype, area=area, season=season)
torrents = SearchChain().search_by_id(tmdbid=tmdbid, mtype=mtype, area=area, season=season,
sites=site_list)
elif mediaid.startswith("douban:"):
doubanid = mediaid.replace("douban:", "")
if settings.RECOGNIZE_SOURCE == "themoviedb":
@@ -62,11 +69,13 @@ def search_by_id(mediaid: str,
if tmdbinfo.get('season') and not season:
season = tmdbinfo.get('season')
torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=mtype, area=area, season=season)
mtype=mtype, area=area, season=season,
sites=site_list)
else:
return schemas.Response(success=False, message="未识别到TMDB媒体信息")
else:
torrents = SearchChain().search_by_id(doubanid=doubanid, mtype=mtype, area=area, season=season)
torrents = SearchChain().search_by_id(doubanid=doubanid, mtype=mtype, area=area, season=season,
sites=site_list)
elif mediaid.startswith("bangumi:"):
bangumiid = int(mediaid.replace("bangumi:", ""))
if settings.RECOGNIZE_SOURCE == "themoviedb":
@@ -74,7 +83,8 @@ def search_by_id(mediaid: str,
tmdbinfo = MediaChain().get_tmdbinfo_by_bangumiid(bangumiid=bangumiid)
if tmdbinfo:
torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=mtype, area=area, season=season)
mtype=mtype, area=area, season=season,
sites=site_list)
else:
return schemas.Response(success=False, message="未识别到TMDB媒体信息")
else:
@@ -82,7 +92,8 @@ def search_by_id(mediaid: str,
doubaninfo = MediaChain().get_doubaninfo_by_bangumiid(bangumiid=bangumiid)
if doubaninfo:
torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"),
mtype=mtype, area=area, season=season)
mtype=mtype, area=area, season=season,
sites=site_list)
else:
return schemas.Response(success=False, message="未识别到豆瓣媒体信息")
else:
@@ -133,12 +144,13 @@ def search_by_id(mediaid: str,
@router.get("/title", summary="模糊搜索资源", response_model=schemas.Response)
def search_by_title(keyword: str = None,
page: int = 0,
site: int = None,
sites: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据名称模糊搜索站点资源,支持分页,关键词为空是返回首页资源
"""
torrents = SearchChain().search_by_title(title=keyword, page=page, site=site)
torrents = SearchChain().search_by_title(title=keyword, page=page,
sites=[int(site) for site in sites.split(",") if site] if sites else None)
if not torrents:
return schemas.Response(success=False, message="未搜索到任何资源")
return schemas.Response(success=True, data=[torrent.to_dict() for torrent in torrents])

View File

@@ -1,3 +1,131 @@
from fastapi import APIRouter
from datetime import datetime
from typing import List, Any
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
router = APIRouter()
@router.get("/", summary="所有工作流", response_model=List[schemas.Workflow])
def list_workflows(db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取工作流列表
"""
return Workflow.list(db)
@router.post("/", summary="创建工作流", response_model=schemas.Response)
def create_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
创建工作流
"""
if Workflow.get_by_name(db, workflow.name):
return schemas.Response(success=False, message="已存在相同名称的工作流")
if not workflow.add_time:
workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
if not workflow.state:
workflow.state = "P"
Workflow(**workflow.dict()).create(db)
return schemas.Response(success=True, message="创建工作流成功")
@router.get("/actions", summary="所有动作", response_model=List[dict])
def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取所有动作
"""
return WorkFlowManager().list_actions()
@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow)
def get_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取工作流详情
"""
return Workflow.get(db, workflow_id)
@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response)
def update_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
更新工作流
"""
wf = Workflow.get(db, workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
wf.update(db, workflow.dict())
return schemas.Response(success=True, message="更新成功")
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
Workflow.delete(db, workflow_id)
return schemas.Response(success=True, message="删除成功")
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workflow_id: int,
from_begin: bool = True,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
执行工作流
"""
state, errmsg = WorkflowChain().process(workflow_id, from_begin=from_begin)
if not state:
return schemas.Response(success=False, message=errmsg)
return schemas.Response(success=True)
@router.post("/{workflow_id}/start", summary="启用工作流", response_model=schemas.Response)
def start_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
启用工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().update_workflow_job(workflow)
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@router.post("/{workflow_id}/pause", summary="停用工作流", response_model=schemas.Response)
def pause_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
停用工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)

View File

@@ -35,7 +35,8 @@ class SearchChain(ChainBase):
self.torrenthelper = TorrentHelper()
def search_by_id(self, tmdbid: int = None, doubanid: str = None,
mtype: MediaType = None, area: str = "title", season: int = None) -> List[Context]:
mtype: MediaType = None, area: str = "title", season: int = None,
sites: List[int] = None) -> List[Context]:
"""
根据TMDBID/豆瓣ID搜索资源精确匹配不过滤本地存在的资源
:param tmdbid: TMDB ID
@@ -43,6 +44,7 @@ class SearchChain(ChainBase):
:param mtype: 媒体,电影 or 电视剧
:param area: 搜索范围title or imdbid
:param season: 季数
:param sites: 站点ID列表
"""
mediainfo = self.recognize_media(tmdbid=tmdbid, doubanid=doubanid, mtype=mtype)
if not mediainfo:
@@ -55,25 +57,27 @@ class SearchChain(ChainBase):
season: NotExistMediaInfo(episodes=[])
}
}
results = self.process(mediainfo=mediainfo, area=area, no_exists=no_exists)
results = self.process(mediainfo=mediainfo, sites=sites, area=area, no_exists=no_exists)
# 保存到本地文件
bytes_results = pickle.dumps(results)
self.save_cache(bytes_results, self.__result_temp_file)
return results
def search_by_title(self, title: str, page: int = 0, site: int = None) -> List[Context]:
def search_by_title(self, title: str, page: int = 0,
sites: List[int] = None, cache_local: bool = True) -> List[Context]:
"""
根据标题搜索资源,不识别不过滤,直接返回站点内容
:param title: 标题,为空时返回所有站点首页内容
:param page: 页码
:param site: 站点ID
:param sites: 站点ID列表
:param cache_local: 是否缓存到本地
"""
if title:
logger.info(f'开始搜索资源,关键词:{title} ...')
else:
logger.info(f'开始浏览资源,站点:{site} ...')
logger.info(f'开始浏览资源,站点:{sites} ...')
# 搜索
torrents = self.__search_all_sites(keywords=[title], sites=[site] if site else None, page=page) or []
torrents = self.__search_all_sites(keywords=[title], sites=sites, page=page) or []
if not torrents:
logger.warn(f'{title} 未搜索到资源')
return []
@@ -81,8 +85,9 @@ class SearchChain(ChainBase):
contexts = [Context(meta_info=MetaInfo(title=torrent.title, subtitle=torrent.description),
torrent_info=torrent) for torrent in torrents]
# 保存到本地文件
bytes_results = pickle.dumps(contexts)
self.save_cache(bytes_results, self.__result_temp_file)
if cache_local:
bytes_results = pickle.dumps(contexts)
self.save_cache(bytes_results, self.__result_temp_file)
return contexts
def last_search_results(self) -> List[Context]:

View File

@@ -84,6 +84,12 @@ class StorageChain(ChainBase):
"""
return self.run_module("rename_file", fileitem=fileitem, name=name)
def exists(self, fileitem: schemas.FileItem) -> Optional[bool]:
"""
判断文件或目录是否存在
"""
return True if self.get_item(fileitem) else False
def get_item(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
查询目录或文件

View File

@@ -1,10 +1,168 @@
from typing import List
import base64
import pickle
import threading
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from typing import List, Tuple
from pydantic.fields import Callable
from app.chain import ChainBase
from app.core.config import global_vars
from app.core.workflow import WorkFlowManager
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import Workflow, ActionContext, Action
from app.schemas import ActionContext, ActionFlow, Action
class WorkflowExecutor:
"""
工作流执行器
"""
def __init__(self, workflow: Workflow, step_callback: Callable = None):
"""
初始化工作流执行器
:param workflow: 工作流对象
:param step_callback: 步骤回调函数
"""
# 工作流数据
self.workflow = workflow
self.step_callback = step_callback
self.actions = {action['id']: Action(**action) for action in workflow.actions}
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
self.success = True
self.errmsg = ""
# 工作流管理器
self.workflowmanager = WorkFlowManager()
# 线程安全队列
self.queue = deque()
# 锁用于保证线程安全
self.lock = threading.Lock()
# 线程池
self.executor = ThreadPoolExecutor()
# 跟踪运行中的任务数
self.running_tasks = 0
# 构建邻接表、入度表
self.adjacency = defaultdict(list)
self.indegree = defaultdict(int)
for flow in self.flows:
source = flow.source
target = flow.target
self.adjacency[source].append(target)
self.indegree[target] += 1
# 初始化所有节点的入度确保未被引用的节点入度为0
for action_id in self.actions:
if action_id not in self.indegree:
self.indegree[action_id] = 0
# 初始上下文
if workflow.current_action and workflow.context:
# Base64解码
decoded_data = base64.b64decode(workflow.context["content"])
# 反序列化数据
self.context = pickle.loads(decoded_data)
else:
self.context = ActionContext()
# 初始化队列入度为0的节点
for action_id in self.actions:
if self.indegree[action_id] == 0:
self.queue.append(action_id)
def execute(self):
"""
执行工作流
"""
while True:
with self.lock:
# 退出条件:队列为空且无运行任务
if not self.queue and self.running_tasks == 0:
break
# 退出条件:出现了错误
if not self.success:
break
if not self.queue:
sleep(1)
continue
# 取出队首节点
node_id = self.queue.popleft()
# 标记任务开始
self.running_tasks += 1
# 已停机
if global_vars.is_system_stopped:
break
# 已执行的跳过
if (self.workflow.current_action
and node_id in self.workflow.current_action.split(',')):
continue
# 提交任务到线程池
future = self.executor.submit(
self.execute_node,
node_id,
self.context
)
future.add_done_callback(self.on_node_complete)
def execute_node(self, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
"""
执行单个节点操作返回修改后的上下文和节点ID
"""
action = self.actions[node_id]
state, result_ctx = self.workflowmanager.excute(action, context=context)
return action, state, result_ctx
def on_node_complete(self, future):
"""
节点完成回调:更新上下文、处理后继节点
"""
action, state, result_ctx = future.result()
# 节点执行失败
if not state:
self.success = False
self.errmsg = f"{action.name} 失败"
# 标记任务完成
with self.lock:
self.running_tasks -= 1
return
with self.lock:
# 更新主上下文
self.merge_context(result_ctx)
# 回调
if self.step_callback:
self.step_callback(action, self.context)
# 处理后继节点
successors = self.adjacency.get(action.id, [])
for succ_id in successors:
with self.lock:
self.indegree[succ_id] -= 1
if self.indegree[succ_id] == 0:
self.queue.append(succ_id)
# 标记任务完成
with self.lock:
self.running_tasks -= 1
def merge_context(self, context: ActionContext):
"""
合并上下文
"""
for key, value in context.dict().items():
if not getattr(self.context, key, None):
setattr(self.context, key, value)
class WorkflowChain(ChainBase):
@@ -15,34 +173,57 @@ class WorkflowChain(ChainBase):
def __init__(self):
super().__init__()
self.workflowoper = WorkflowOper()
self.workflowmanager = WorkFlowManager()
def process(self, workflow_id: int) -> bool:
def process(self, workflow_id: int, from_begin: bool = True) -> Tuple[bool, str]:
"""
处理工作流
:param workflow_id: 工作流ID
:param from_begin: 是否从头开始默认为True
"""
def save_step(action: Action, context: ActionContext):
"""
保存上下文到数据库
"""
# 序列化数据
serialized_data = pickle.dumps(context)
# 使用Base64编码字节流
encoded_data = base64.b64encode(serialized_data).decode('utf-8')
self.workflowoper.step(workflow_id, action_id=action.id, context={
"content": encoded_data
})
# 重置工作流
if from_begin:
self.workflowoper.reset(workflow_id)
# 查询工作流数据
workflow = self.workflowoper.get(workflow_id)
if not workflow:
logger.warn(f"工作流 {workflow_id} 不存在")
return False
return False, "工作流不存在"
if not workflow.actions:
logger.warn(f"工作流 {workflow.name} 无动作")
return False
return False, "工作流无动作"
if not workflow.flows:
logger.warn(f"工作流 {workflow.name} 无流程")
return False, "工作流无流程"
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
# 启用上下文
context = ActionContext()
self.workflowoper.start(workflow_id)
for act in workflow.actions:
action = Action(**act)
state, context = self.workflowmanager.excute(action, context)
self.workflowoper.step(workflow_id, action=action.name, context=context.dict())
if not state:
logger.error(f"动作 {action.name} 执行失败,工作流失败")
self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败")
return False
logger.info(f"工作流 {workflow.name} 执行完成")
self.workflowoper.success(workflow_id)
return True
# 执行工作流
executor = WorkflowExecutor(workflow, step_callback=save_step)
executor.execute()
if not executor.success:
logger.info(f"工作流 {workflow.name} 执行失败{executor.errmsg}")
self.workflowoper.fail(workflow_id, result=executor.errmsg)
return False, executor.errmsg
else:
logger.info(f"工作流 {workflow.name} 执行成功")
self.workflowoper.success(workflow_id)
return True, ""
def get_workflows(self) -> List[Workflow]:
"""

View File

@@ -793,10 +793,9 @@ class PluginManager(metaclass=Singleton):
# 已安装插件
installed_apps = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件
online_plugins = self.pluginhelper.get_plugins(market, package_version) or {}
if not online_plugins:
if not package_version:
logger.warning(f"获取插件库失败:{market},请检查 GitHub 网络连接")
online_plugins = self.pluginhelper.get_plugins(market, package_version)
if online_plugins is None:
logger.warning(f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
return []
ret_plugins = []
add_time = len(online_plugins)

View File

@@ -1,7 +1,6 @@
from time import sleep
from typing import Dict, Any, Tuple
from typing import Dict, Any, Tuple, List
from app.actions import BaseAction
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
@@ -14,7 +13,7 @@ class WorkFlowManager(metaclass=Singleton):
"""
# 所有动作定义
_actions: Dict[str, BaseAction] = {}
_actions: Dict[str, Any] = {}
def __init__(self):
self.init()
@@ -44,7 +43,10 @@ class WorkFlowManager(metaclass=Singleton):
)
for action in actions:
logger.debug(f"加载动作: {action.__name__}")
self._actions[action.__name__] = action
try:
self._actions[action.__name__] = action
except Exception as err:
logger.error(f"加载动作失败: {action.__name__} - {err}")
def stop(self):
"""
@@ -58,20 +60,47 @@ class WorkFlowManager(metaclass=Singleton):
"""
if not context:
context = ActionContext()
if action.id in self._actions:
action_obj = self._actions[action.id]
if action.type in self._actions:
# 实例化
action_obj = self._actions[action.type]()
# 执行
logger.info(f"执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
if action.loop and action.loop_interval:
try:
result_context = action_obj.execute(action.data, context)
except Exception as err:
logger.error(f"{action.name} 执行失败: {err}")
return False, context
loop = action.data.get("loop")
loop_interval = action.data.get("loop_interval")
if loop and loop_interval:
while not action_obj.done:
logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行")
sleep(action.loop_interval)
# 等待
logger.info(f"{action.name} 等待 {loop_interval} 秒后继续执行 ...")
sleep(loop_interval)
# 执行
logger.info(f"继续执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, result_context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
logger.info(f"{action.name} 执行")
result_context = action_obj.execute(action.data, result_context)
if action_obj.success:
logger.info(f"{action.name} 执行成")
else:
logger.error(f"{action.name} 执行失败!")
return action_obj.success, result_context
else:
logger.error(f"未找到动作: {action.id} - {action.name}")
logger.error(f"未找到动作: {action.type} - {action.name}")
return False, context
def list_actions(self) -> List[dict]:
"""
获取所有动作
"""
return [
{
"type": key,
"name": action.name,
"description": action.description,
"data": {
"label": action.name,
**action.data
}
} for key, action in self._actions.items()
]

View File

@@ -8,3 +8,4 @@ from .systemconfig import SystemConfig
from .transferhistory import TransferHistory
from .user import User
from .userconfig import UserConfig
from .workflow import Workflow

View File

@@ -19,7 +19,7 @@ class Workflow(Base):
timer = Column(String)
# 状态W-等待 R-运行中 P-暂停 S-成功 F-失败
state = Column(String, nullable=False, index=True, default='W')
# 当前执行动作
# 执行动作,分隔)
current_action = Column(String)
# 任务执行结果
result = Column(String)
@@ -27,6 +27,8 @@ class Workflow(Base):
run_count = Column(Integer, default=0)
# 任务列表
actions = Column(JSON, default=list)
# 任务流
flows = Column(JSON, default=list)
# 执行上下文
context = Column(JSON, default=dict)
# 创建时间
@@ -64,7 +66,6 @@ class Workflow(Base):
db.query(Workflow).filter(Workflow.id == wid).update({
"state": 'F',
"result": result,
"run_count": Workflow.run_count + 1,
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
return True
@@ -82,6 +83,19 @@ class Workflow(Base):
@staticmethod
@db_update
def update_current_action(db, wid: int, action: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({"current_action": action, "context": context})
def reset(db, wid: int):
db.query(Workflow).filter(Workflow.id == wid).update({
"state": 'W',
"result": None,
"current_action": None,
})
return True
@staticmethod
@db_update
def update_current_action(db, wid: int, action_id: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({
"current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id,
"context": context
})
return True

View File

@@ -55,8 +55,14 @@ class WorkflowOper(DbOper):
"""
return Workflow.fail(self._db, wid, result)
def step(self, wid: int, action: str, context: dict) -> bool:
def step(self, wid: int, action_id: str, context: dict) -> bool:
"""
步进
"""
return Workflow.update_current_action(self._db, wid, action, context)
return Workflow.update_current_action(self._db, wid, action_id, context)
def reset(self, wid: int) -> bool:
"""
重置
"""
return Workflow.reset(self._db, wid)

View File

@@ -63,6 +63,7 @@ class PluginHelper(metaclass=Singleton):
return json.loads(res.text)
except json.JSONDecodeError:
logger.error(f"插件包数据解析失败:{res.text}")
return None
return {}
def get_plugin_package_version(self, pid: str, repo_url: str, package_version: str = None) -> Optional[str]:

View File

@@ -225,27 +225,27 @@ class RssHelper:
}
@staticmethod
def parse(url, proxy: bool = False, timeout: int = 15, headers: dict = None) -> Union[List[dict], None]:
def parse(url, proxy: bool = False, timeout: int = 15, headers: dict = None) -> Union[List[dict], None, bool]:
"""
解析RSS订阅URL获取RSS中的种子信息
:param url: RSS地址
:param proxy: 是否使用代理
:param timeout: 请求超时
:param headers: 自定义请求头
:return: 种子信息列表如为None代表Rss过期
:return: 种子信息列表如为None代表Rss过期如果为False则为错误
"""
# 开始处理
ret_array: list = []
if not url:
return []
return False
try:
ret = RequestUtils(proxies=settings.PROXY if proxy else None,
timeout=timeout, headers=headers).get_res(url)
if not ret:
return []
return False
except Exception as err:
logger.error(f"获取RSS失败{str(err)} - {traceback.format_exc()}")
return []
return False
if ret:
ret_xml = ""
try:
@@ -322,6 +322,7 @@ class RssHelper:
]
if ret_xml in _rss_expired_msg:
return None
return False
return ret_array
def get_rss_link(self, url: str, cookie: str, ua: str, proxy: bool = False) -> Tuple[str, str]:

View File

@@ -33,7 +33,7 @@ class RuleHelper:
return group
return None
def get_rule_group_by_media(self, media: MediaInfo, group_names: list = None) -> List[FilterRuleGroup]:
def get_rule_group_by_media(self, media: MediaInfo = None, group_names: list = None) -> List[FilterRuleGroup]:
"""
根据媒体信息获取规则组
"""
@@ -44,9 +44,9 @@ class RuleHelper:
for group in rule_groups:
if not group.media_type:
ret_groups.append(group)
elif not group.category and group.media_type == media.type.value:
elif media and not group.category and group.media_type == media.type.value:
ret_groups.append(group)
elif group.category == media.category:
elif media and group.category == media.category:
ret_groups.append(group)
return ret_groups

View File

@@ -445,6 +445,27 @@ class TorrentHelper(metaclass=Singleton):
logger.info(f"{torrent_info.title} 不匹配特效规则 {effect}")
return False
# 大小
size_range = filter_params.get("size")
if size_range:
if size_range.find("-") != -1:
# 区间
size_min, size_max = size_range.split("-")
size_min = float(size_min.strip()) * 1024 * 1024
size_max = float(size_max.strip()) * 1024 * 1024
if torrent_info.size < size_min or torrent_info.size > size_max:
return False
elif size_range.startswith(">"):
# 大于
size_min = float(size_range[1:].strip()) * 1024 * 1024
if torrent_info.size < size_min:
return False
elif size_range.startswith("<"):
# 小于
size_max = float(size_range[1:].strip()) * 1024 * 1024
if torrent_info.size > size_max:
return False
return True
@staticmethod

View File

@@ -67,14 +67,14 @@ class Alist(StorageBase, metaclass=Singleton):
return self.__generate_token
@property
@cached(maxsize=1, ttl=60 * 60 * 24 * 2 - 60 * 5)
@cached(maxsize=1, ttl=60 * 60 * 24 * 2 - 60 * 5, skip_empty=True)
def __generate_token(self) -> str:
"""
如果设置永久令牌则返回永久令牌,否则使用账号密码生成一个临时 token
缓存2天提前5分钟更新
"""
conf = self.get_conf()
token = conf.get("token")
token = conf.get("token")
if token:
return str(token)
resp: Response = RequestUtils(headers={

View File

@@ -207,13 +207,14 @@ class NexusPhpSiteUserInfo(SiteParserBase):
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁") or contains(.//text(), ">")]/@href')
#防止识别到详情页
next_page_text = html.xpath(
'//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁") or contains(.//text(), ">")]/@href')
# 防止识别到详情页
while next_page_text:
next_page = next_page_text.pop().strip()
if not next_page.startswith('details.php'):
break;
break
next_page = None
# fix up page url

View File

@@ -239,7 +239,9 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
path=torrent_path,
hash=torrent.get('hash'),
size=torrent.get('total_size'),
tags=torrent.get('tags')
tags=torrent.get('tags'),
progress=torrent.get('progress') * 100,
state="paused" if torrent.get('state') in ("paused", "pausedDL") else "downloading",
))
elif status == TorrentStatus.TRANSFER:
# 获取已完成且未整理的

View File

@@ -118,11 +118,12 @@ class TheMovieDbModule(_ModuleBase):
# 识别匹配
if not cache_info or not cache:
info = None
# 缓存没有或者强制不使用缓存
if tmdbid:
# 直接查询详情
info = self.tmdb.get_info(mtype=mtype, tmdbid=tmdbid)
elif meta:
if not info and meta:
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
@@ -172,8 +173,8 @@ class TheMovieDbModule(_ModuleBase):
if info and not info.get("genres"):
info = self.tmdb.get_info(mtype=info.get("media_type"),
tmdbid=info.get("id"))
else:
logger.error("识别媒体信息时未提供元数据或tmdbid")
elif not info:
logger.error("识别媒体信息时未提供元数据或唯一且有效的tmdbid")
return None
# 保存到缓存

View File

@@ -578,7 +578,7 @@ class TmdbApi:
genre_ids.append(genre.get('id'))
return genre_ids
# 查询TMDB详ngeq
# 查询TMDB详
if mtype == MediaType.MOVIE:
tmdb_info = self.__get_movie_detail(tmdbid)
if tmdb_info:
@@ -588,13 +588,20 @@ class TmdbApi:
if tmdb_info:
tmdb_info['media_type'] = MediaType.TV
else:
tmdb_info = self.__get_tv_detail(tmdbid)
if tmdb_info:
tmdb_info_tv = self.__get_tv_detail(tmdbid)
tmdb_info_movie = self.__get_movie_detail(tmdbid)
if tmdb_info_tv and tmdb_info_movie:
tmdb_info = None
logger.warn(f"无法判断tmdb_id:{tmdbid} 是电影还是电视剧")
elif tmdb_info_tv:
tmdb_info = tmdb_info_tv
tmdb_info['media_type'] = MediaType.TV
elif tmdb_info_movie:
tmdb_info = tmdb_info_movie
tmdb_info['media_type'] = MediaType.MOVIE
else:
tmdb_info = self.__get_movie_detail(tmdbid)
if tmdb_info:
tmdb_info['media_type'] = MediaType.MOVIE
tmdb_info = None
logger.warn(f"tmdb_id:{tmdbid} 未查询到媒体信息")
if tmdb_info:
# 转换genreid

View File

@@ -246,7 +246,9 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
title=torrent.name,
path=Path(torrent.download_dir) / torrent.name,
hash=torrent.hashString,
tags=",".join(torrent.labels or [])
tags=",".join(torrent.labels or []),
progress=torrent.progress,
state="paused" if torrent.status == "stopped" else "downloading",
))
elif status == TorrentStatus.DOWNLOADING:
# 获取正在下载的任务

View File

@@ -32,13 +32,13 @@ class WeChat:
_proxy = None
# 企业微信发送消息URL
_send_msg_url = "/cgi-bin/message/send?access_token={access_token}"
_send_msg_url = "cgi-bin/message/send?access_token={access_token}"
# 企业微信获取TokenURL
_token_url = "/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
_token_url = "cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
# 企业微信创建菜单URL
_create_menu_url = "/cgi-bin/menu/create?access_token={access_token}&agentid={agentid}"
_create_menu_url = "cgi-bin/menu/create?access_token={access_token}&agentid={agentid}"
# 企业微信删除菜单URL
_delete_menu_url = "/cgi-bin/menu/delete?access_token={access_token}&agentid={agentid}"
_delete_menu_url = "cgi-bin/menu/delete?access_token={access_token}&agentid={agentid}"
def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None,
WECHAT_APP_ID: str = None, WECHAT_PROXY: str = None, **kwargs):

View File

@@ -494,6 +494,9 @@ class Scheduler(metaclass=Singleton):
"""
更新工作流定时服务
"""
if not self._scheduler:
return
# 移除该工作流的全部服务
self.remove_workflow_job(workflow)
# 添加工作流服务
@@ -503,6 +506,7 @@ class Scheduler(metaclass=Singleton):
self._jobs[job_id] = {
"func": WorkflowChain().process,
"name": workflow.name,
"provider_name": "工作流",
"running": False,
}
self._scheduler.add_job(
@@ -547,7 +551,7 @@ class Scheduler(metaclass=Singleton):
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"provider_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
@@ -584,14 +588,14 @@ class Scheduler(metaclass=Singleton):
# 将正在运行的任务提取出来 (保障一次性任务正常显示)
for job_id, service in self._jobs.items():
name = service.get("name")
plugin_name = service.get("plugin_name")
if service.get("running") and name and plugin_name:
provider_name = service.get("provider_name")
if service.get("running") and name and provider_name:
if name not in added:
added.append(name)
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=name,
provider=plugin_name,
provider=provider_name,
status="正在运行",
))
# 获取其他待执行任务
@@ -611,7 +615,7 @@ class Scheduler(metaclass=Singleton):
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=job.name,
provider=service.get("plugin_name", "[系统]"),
provider=service.get("provider_name", "[系统]"),
status=status,
next_run=next_run
))

View File

@@ -9,4 +9,5 @@ class DownloadTask(BaseModel):
"""
download_id: Optional[str] = Field(None, description="任务ID")
downloader: Optional[str] = Field(None, description="下载器")
path: Optional[str] = Field(None, description="下载路径")
completed: Optional[bool] = Field(False, description="是否完成")

View File

@@ -6,6 +6,15 @@ from pydantic import BaseModel, Field, root_validator
from app.schemas import MessageChannel, FileItem
class Event(BaseModel):
"""
事件模型
"""
event_type: str = Field(..., description="事件类型")
event_data: Optional[dict] = Field({}, description="事件数据")
priority: Optional[int] = Field(0, description="事件优先级")
class BaseEventData(BaseModel):
"""
事件数据的基类,所有具体事件数据类应继承自此类

View File

@@ -21,6 +21,8 @@ class TransferTorrent(BaseModel):
tags: Optional[str] = None
size: Optional[int] = 0
userid: Optional[str] = None
progress: Optional[float] = 0
state: Optional[str] = None
class DownloadingTorrent(BaseModel):

View File

@@ -8,21 +8,23 @@ from app.schemas.download import DownloadTask
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
from app.schemas.event import Event
class Workflow(BaseModel):
"""
工作流信息
"""
id: Optional[str] = Field(None, description="工作流ID")
id: Optional[int] = Field(None, description="工作流ID")
name: Optional[str] = Field(None, description="工作流名称")
description: Optional[str] = Field(None, description="工作流描述")
timer: Optional[str] = Field(None, description="定时器")
state: Optional[str] = Field(None, description="状态")
current_action: Optional[str] = Field(None, description="当前执行动作")
current_action: Optional[str] = Field(None, description="执行动作")
result: Optional[str] = Field(None, description="任务执行结果")
run_count: Optional[int] = Field(0, description="已执行次数")
actions: Optional[list] = Field([], description="任务列表")
flows: Optional[list] = Field([], description="任务流")
add_time: Optional[str] = Field(None, description="创建时间")
last_time: Optional[str] = Field(None, description="最后执行时间")
@@ -34,19 +36,20 @@ class ActionParams(BaseModel):
"""
动作基础参数
"""
pass
loop: Optional[bool] = Field(False, description="是否需要循环")
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
class Action(BaseModel):
"""
动作信息
"""
id: Optional[str] = Field(None, description="动作ID (类名)")
id: Optional[str] = Field(None, description="动作ID")
type: Optional[str] = Field(None, description="动作类型 (类名)")
name: Optional[str] = Field(None, description="动作名称")
description: Optional[str] = Field(None, description="动作描述")
loop: Optional[bool] = Field(False, description="是否需要循环")
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
params: Optional[ActionParams] = Field({}, description="参数")
position: Optional[dict] = Field({}, description="位置")
data: Optional[dict] = Field({}, description="参数")
class ActionContext(BaseModel):
@@ -61,3 +64,14 @@ class ActionContext(BaseModel):
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
events: Optional[List[Event]] = Field([], description="事件列表")
class ActionFlow(BaseModel):
"""
工作流流程
"""
id: Optional[str] = Field(None, description="流程ID")
source: Optional[str] = Field(None, description="源动作")
target: Optional[str] = Field(None, description="目标动作")
animated: Optional[bool] = Field(True, description="是否动画流程")

View File

@@ -7,9 +7,8 @@ Create Date: 2024-12-24 13:29:32.225532
"""
import contextlib
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
from alembic import op
# revision identifiers, used by Alembic.
revision = '55390f1f77c1'

View File

@@ -0,0 +1,29 @@
"""2.1.2
Revision ID: 610bb05ddeef
Revises: 279a949d81b6
Create Date: 2025-02-24 07:52:00.042837
"""
import contextlib
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
# revision identifiers, used by Alembic.
revision = '610bb05ddeef'
down_revision = '279a949d81b6'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
op.add_column('workflow', sa.Column('flows', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
pass

View File

@@ -7,9 +7,8 @@ Create Date: 2025-02-06 18:28:00.644571
"""
import contextlib
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
from alembic import op
# revision identifiers, used by Alembic.
revision = 'ca5461f314f2'

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.3.0'
FRONTEND_VERSION = 'v2.3.0'
APP_VERSION = 'v2.3.1'
FRONTEND_VERSION = 'v2.3.1'