Compare commits

...

13 Commits

Author SHA1 Message Date
jxxghp
d9ed7b09c7 v2.3.0
- 站点资源浏览支持关键字和分类搜索,优化了界面,修改了站点卡片点击时的交互行为
- 优化了APP模式下更多菜单、滚动条等多处UI细节
2025-02-18 17:05:24 +08:00
jxxghp
4dcb18f00e fix: site browse api 2025-02-18 16:32:10 +08:00
jxxghp
0a52fe0a7a refactor: site browse api 2025-02-17 19:01:05 +08:00
jxxghp
e5a4d11cf9 fix workflow 2025-02-17 15:08:24 +08:00
jxxghp
6c233f13de fix workflow chain 2025-02-17 12:38:29 +08:00
jxxghp
00aee3496c add workflow oper 2025-02-17 11:54:11 +08:00
jxxghp
77ae40e3d6 fix workflow 2025-02-17 11:40:32 +08:00
jxxghp
68cba44476 fix modules load 2025-02-16 17:24:17 +08:00
jxxghp
b86d06f632 add workflow lifecycle 2025-02-16 16:53:38 +08:00
jxxghp
0b7cf305a0 add action templates 2025-02-16 13:45:15 +08:00
jxxghp
21ae36bc3a add action templates 2025-02-16 12:52:29 +08:00
jxxghp
4e2d9e9165 Merge pull request #3899 from Mister-album/v2-sync 2025-02-15 08:10:15 +08:00
Mister-album
6cee308894 添加为指定字幕添加.default后缀设置为默认字幕功能 2025-02-14 19:58:29 +08:00
32 changed files with 605 additions and 80 deletions

View File

@@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from pydantic.main import BaseModel
from app.schemas import ActionContext
from app.schemas import ActionContext, ActionParams
class BaseAction(BaseModel, ABC):
@@ -21,5 +21,24 @@ class BaseAction(BaseModel, ABC):
pass
@abstractmethod
async def execute(self, params: dict, context: ActionContext) -> ActionContext:
def execute(self, params: ActionParams, context: ActionContext) -> ActionContext:
"""
执行动作
"""
raise NotImplementedError
@property
@abstractmethod
def done(self) -> bool:
"""
判断动作是否完成
"""
pass
@property
@abstractmethod
def success(self) -> bool:
"""
判断动作是否成功
"""
pass

View File

View File

View File

View File

42
app/actions/fetch_rss.py Normal file
View File

@@ -0,0 +1,42 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
class FetchRssParams(ActionParams):
"""
获取RSS资源列表参数
"""
url: str = Field(None, description="RSS地址")
proxy: Optional[bool] = Field(False, description="是否使用代理")
timeout: Optional[int] = Field(15, description="超时时间")
headers: Optional[dict] = Field(None, description="请求头")
recognize: Optional[bool] = Field(False, description="是否识别")
class FetchRssAction(BaseAction):
"""
获取RSS资源列表
"""
@property
def name(self) -> str:
return "获取RSS资源列表"
@property
def description(self) -> str:
return "请求RSS地址获取数据并解析为资源列表"
async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext:
pass
@property
def done(self) -> bool:
return True
@property
def success(self) -> bool:
return True

View File

View File

View File

@@ -0,0 +1,42 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.schemas import ActionParams, ActionContext
class SearchTorrentsParams(ActionParams):
"""
搜索站点资源参数
"""
name: str = Field(None, description="资源名称")
year: Optional[int] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
recognize: Optional[bool] = Field(False, description="是否识别")
class SearchTorrentsAction(BaseAction):
"""
搜索站点资源
"""
@property
def name(self) -> str:
return "搜索站点资源"
@property
def description(self) -> str:
return "根据关键字搜索站点种子资源"
@property
def done(self) -> bool:
return True
@property
def success(self) -> bool:
return True
async def execute(self, params: SearchTorrentsParams, context: ActionContext) -> ActionContext:
pass

View File

View File

View File

View File

@@ -1,4 +1,4 @@
from typing import List, Any
from typing import List, Any, Dict
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
@@ -259,8 +259,41 @@ def site_icon(site_id: int,
})
@router.get("/category/{site_id}", summary="站点分类", response_model=List[schemas.SiteCategory])
def site_category(site_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取站点分类
"""
site = Site.get(db, site_id)
if not site:
raise HTTPException(
status_code=404,
detail=f"站点 {site_id} 不存在",
)
indexer = SitesHelper().get_indexer(site.domain)
if not indexer:
raise HTTPException(
status_code=404,
detail=f"站点 {site.domain} 不支持",
)
category: Dict[str, List[dict]] = indexer.get('category') or []
if not category:
return []
result = []
for cats in category.values():
for cat in cats:
if cat not in result:
result.append(cat)
return result
@router.get("/resource/{site_id}", summary="站点资源", response_model=List[schemas.TorrentInfo])
def site_resource(site_id: int,
keyword: str = None,
cat: str = None,
page: int = 0,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_superuser)) -> Any:
"""
@@ -272,7 +305,7 @@ def site_resource(site_id: int,
status_code=404,
detail=f"站点 {site_id} 不存在",
)
torrents = TorrentsChain().browse(domain=site.domain)
torrents = TorrentsChain().browse(domain=site.domain, keyword=keyword, cat=cat, page=page)
if not torrents:
return []
return [torrent.to_dict() for torrent in torrents]

View File

@@ -322,13 +322,16 @@ class ChainBase(metaclass=ABCMeta):
return self.run_module("search_torrents", site=site, keywords=keywords,
mtype=mtype, page=page)
def refresh_torrents(self, site: dict) -> List[TorrentInfo]:
def refresh_torrents(self, site: dict, keyword: str = None, cat: str = None, page: int = 0) -> List[TorrentInfo]:
"""
获取站点最新一页的种子,多个站点需要多线程处理
:param site: 站点
:param keyword: 标题
:param cat: 分类
:param page: 页码
:reutrn: 种子资源列表
"""
return self.run_module("refresh_torrents", site=site)
return self.run_module("refresh_torrents", site=site, keyword=keyword, cat=cat, page=page)
def filter_torrents(self, rule_groups: List[str],
torrent_list: List[TorrentInfo],

View File

@@ -73,17 +73,20 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
logger.info(f'种子缓存数据清理完成')
@cached(cache=TTLCache(maxsize=128, ttl=595))
def browse(self, domain: str) -> List[TorrentInfo]:
def browse(self, domain: str, keyword: str = None, cat: str = None, page: int = 0) -> List[TorrentInfo]:
"""
浏览站点首页内容返回种子清单TTL缓存10分钟
:param domain: 站点域名
:param keyword: 搜索标题
:param cat: 搜索分类
:param page: 页码
"""
logger.info(f'开始获取站点 {domain} 最新种子 ...')
site = self.siteshelper.get_indexer(domain)
if not site:
logger.error(f'站点 {domain} 不存在!')
return []
return self.refresh_torrents(site=site)
return self.refresh_torrents(site=site, keyword=keyword, cat=cat, page=page)
@cached(cache=TTLCache(maxsize=128, ttl=295))
def rss(self, domain: str) -> List[TorrentInfo]:

51
app/chain/workflow.py Normal file
View File

@@ -0,0 +1,51 @@
from typing import List
from app.chain import ChainBase
from app.core.workflow import WorkFlowManager
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import Workflow, ActionContext, Action
class WorkflowChain(ChainBase):
"""
工作流链
"""
def __init__(self):
super().__init__()
self.workflowoper = WorkflowOper()
self.workflowmanager = WorkFlowManager()
def process(self, workflow_id: int) -> bool:
"""
处理工作流
"""
workflow = self.workflowoper.get(workflow_id)
if not workflow:
logger.warn(f"工作流 {workflow_id} 不存在")
return False
if not workflow.actions:
logger.warn(f"工作流 {workflow.name} 无动作")
return False
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
# 启用上下文
context = ActionContext()
self.workflowoper.start(workflow_id)
for act in workflow.actions:
action = Action(**act)
state, context = self.workflowmanager.excute(action, context)
self.workflowoper.step(workflow_id, action=action.name, context=context.dict())
if not state:
logger.error(f"动作 {action.name} 执行失败,工作流失败")
self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败")
return False
logger.info(f"工作流 {workflow.name} 执行完成")
self.workflowoper.success(workflow_id)
return True
def get_workflows(self) -> List[Workflow]:
"""
获取工作流列表
"""
return self.workflowoper.list_enabled()

View File

@@ -255,6 +255,8 @@ class ConfigModel(BaseModel):
)
# 启用分词搜索
TOKENIZED_SEARCH: bool = False
# 为指定默认字幕添加.default后缀
DEFAULT_SUB: Optional[str] = "zh-cn"
class Settings(BaseSettings, ConfigModel, LogConfigModel):

View File

@@ -1,24 +1,77 @@
from time import sleep
from typing import Dict, Any, Tuple
from app.actions import BaseAction
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
from app.utils.singleton import Singleton
class WorkFlowManager:
class WorkFlowManager(metaclass=Singleton):
"""
工作流管理器
"""
# 所有动作定义
_actions: Dict[str, BaseAction] = {}
def __init__(self):
self.workflows = {}
self.init()
def register(self, workflow):
def init(self):
"""
注册工作流
:param workflow: 工作流对象
:return:
初始化
"""
self.workflows[workflow.name] = workflow
def get_workflow(self, name):
def filter_func(obj: Any):
"""
过滤函数,确保只加载新定义的类
"""
if not isinstance(obj, type):
return False
if not hasattr(obj, 'execute') or not hasattr(obj, "name"):
return False
if obj.__name__ == "BaseAction":
return False
return obj.__module__.startswith("app.actions")
# 加载所有动作
self._actions = {}
actions = ModuleHelper.load(
"app.actions",
filter_func=lambda _, obj: filter_func(obj)
)
for action in actions:
logger.debug(f"加载动作: {action.__name__}")
self._actions[action.__name__] = action
def stop(self):
"""
获取工作流
:param name: 工作流名称
:return:
停止
"""
return self.workflows.get(name)
pass
def excute(self, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
"""
执行工作流动作
"""
if not context:
context = ActionContext()
if action.id in self._actions:
action_obj = self._actions[action.id]
logger.info(f"执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
if action.loop and action.loop_interval:
while not action_obj.done:
logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行")
sleep(action.loop_interval)
logger.info(f"继续执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, result_context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
logger.info(f"{action.name} 执行完成")
return action_obj.success, result_context
else:
logger.error(f"未找到动作: {action.id} - {action.name}")
return False, context

View File

@@ -2,7 +2,7 @@ from datetime import datetime
from sqlalchemy import Column, Integer, JSON, Sequence, String
from app.db import Base
from app.db import Base, db_query, db_update
class Workflow(Base):
@@ -17,8 +17,8 @@ class Workflow(Base):
description = Column(String)
# 定时器
timer = Column(String)
# 状态:N-新建 R-运行中 P-暂停 S-成功 F-失败
state = Column(String, nullable=False, index=True, default='N')
# 状态:W-等待 R-运行中 P-暂停 S-成功 F-失败
state = Column(String, nullable=False, index=True, default='W')
# 当前执行动作
current_action = Column(String)
# 任务执行结果
@@ -33,3 +33,55 @@ class Workflow(Base):
add_time = Column(String, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 最后执行时间
last_time = Column(String)
@staticmethod
@db_query
def get_enabled_workflows(db):
return db.query(Workflow).filter(Workflow.state != 'P').all()
@staticmethod
@db_query
def get_by_name(db, name: str):
return db.query(Workflow).filter(Workflow.name == name).first()
@staticmethod
@db_update
def update_state(db, wid: int, state: str):
db.query(Workflow).filter(Workflow.id == wid).update({"state": state})
return True
@staticmethod
@db_update
def start(db, wid: int):
db.query(Workflow).filter(Workflow.id == wid).update({
"state": 'R'
})
return True
@staticmethod
@db_update
def fail(db, wid: int, result: str):
db.query(Workflow).filter(Workflow.id == wid).update({
"state": 'F',
"result": result,
"run_count": Workflow.run_count + 1,
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
return True
@staticmethod
@db_update
def success(db, wid: int, result: str = None):
db.query(Workflow).filter(Workflow.id == wid).update({
"state": 'S',
"result": result,
"run_count": Workflow.run_count + 1,
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
return True
@staticmethod
@db_update
def update_current_action(db, wid: int, action: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({"current_action": action, "context": context})
return True

62
app/db/workflow_oper.py Normal file
View File

@@ -0,0 +1,62 @@
from typing import List, Tuple
from app.db import DbOper
from app.db.models.workflow import Workflow
class WorkflowOper(DbOper):
"""
工作流管理
"""
def add(self, **kwargs) -> Tuple[bool, str]:
"""
新增工作流
"""
wf = Workflow(**kwargs)
if not wf.get_by_name(self._db, kwargs.get("name")):
wf.create(self._db)
return True, "新增工作流成功"
return False, "工作流已存在"
def get(self, wid: int) -> Workflow:
"""
查询单个工作流
"""
return Workflow.get(self._db, wid)
def list_enabled(self) -> List[Workflow]:
"""
获取启用的工作流列表
"""
return Workflow.get_enabled_workflows(self._db)
def get_by_name(self, name: str) -> Workflow:
"""
按名称获取工作流
"""
return Workflow.get_by_name(self._db, name)
def start(self, wid: int) -> bool:
"""
启动
"""
return Workflow.start(self._db, wid)
def success(self, wid: int, result: str = None) -> bool:
"""
成功
"""
return Workflow.success(self._db, wid, result)
def fail(self, wid: int, result: str) -> bool:
"""
失败
"""
return Workflow.fail(self._db, wid, result)
def step(self, wid: int, action: str, context: dict) -> bool:
"""
步进
"""
return Workflow.update_current_action(self._db, wid, action, context)

View File

@@ -23,6 +23,7 @@ class ModuleHelper:
"""
submodules: list = []
loaded_modules = set()
packages = importlib.import_module(package_path)
for importer, package_name, _ in pkgutil.iter_modules(packages.__path__):
try:
@@ -35,6 +36,9 @@ class ModuleHelper:
if name.startswith('_'):
continue
if isinstance(obj, type) and filter_func(name, obj):
if name in loaded_modules:
continue
loaded_modules.add(name)
submodules.append(obj)
except Exception as err:
logger.debug(f'加载模块 {package_name} 失败:{str(err)} - {traceback.format_exc()}')

View File

@@ -676,11 +676,15 @@ class FileManagerModule(_ModuleBase):
".zh-tw": ".繁体中文"
}
new_sub_tag_list = [
new_file_type if t == 0 else "%s%s(%s)" % (new_file_type,
new_sub_tag_dict.get(
new_file_type, ""
),
t) for t in range(6)
(".default" + new_file_type if (
(settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn") or
(settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw") or
(settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")
) else new_file_type) if t == 0 else "%s%s(%s)" % (new_file_type,
new_sub_tag_dict.get(
new_file_type, ""
),
t) for t in range(6)
]
for new_sub_tag in new_sub_tag_list:
new_file: Path = target_file.with_name(target_file.stem + new_sub_tag + file_ext)

View File

@@ -76,12 +76,14 @@ class IndexerModule(_ModuleBase):
def search_torrents(self, site: dict,
keywords: List[str] = None,
mtype: MediaType = None,
cat: str = None,
page: int = 0) -> List[TorrentInfo]:
"""
搜索一个站点
:param site: 站点
:param keywords: 搜索关键词列表
:param mtype: 媒体类型
:param cat: 分类
:param page: 页码
:return: 资源列表
"""
@@ -156,6 +158,7 @@ class IndexerModule(_ModuleBase):
search_word=search_word,
indexer=site,
mtype=mtype,
cat=cat,
page=page
)
if error_flag:
@@ -204,30 +207,37 @@ class IndexerModule(_ModuleBase):
def __spider_search(indexer: dict,
search_word: str = None,
mtype: MediaType = None,
cat: str = None,
page: int = 0) -> Tuple[bool, List[dict]]:
"""
根据关键字搜索单个站点
:param: indexer: 站点配置
:param: search_word: 关键字
:param: cat: 分类
:param: page: 页码
:param: mtype: 媒体类型
:param: timeout: 超时时间
:return: 是否发生错误, 种子列表
"""
_spider = SiteSpider(indexer=indexer,
mtype=mtype,
keyword=search_word,
mtype=mtype,
cat=cat,
page=page)
return _spider.is_error, _spider.get_torrents()
def refresh_torrents(self, site: dict) -> Optional[List[TorrentInfo]]:
def refresh_torrents(self, site: dict,
keyword: str = None, cat: str = None, page: int = 0) -> Optional[List[TorrentInfo]]:
"""
获取站点最新一页的种子,多个站点需要多线程处理
:param site: 站点
:param keyword: 关键字
:param cat: 分类
:param page: 页码
:reutrn: 种子资源列表
"""
return self.search_torrents(site=site)
return self.search_torrents(site=site, keywords=[keyword], cat=cat, page=page)
def refresh_userdata(self, site: dict) -> Optional[SiteUserData]:
"""

View File

@@ -7,6 +7,7 @@ import pytz
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app import schemas
from app.chain import ChainBase
@@ -16,13 +17,14 @@ from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.chain.workflow import WorkflowChain
from app.core.config import settings
from app.core.event import EventManager
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas import Notification, NotificationType, Workflow
from app.schemas.types import EventType, SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.timer import TimerUtils
@@ -345,6 +347,10 @@ class Scheduler(metaclass=Singleton):
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 打印服务
@@ -401,52 +407,42 @@ class Scheduler(metaclass=Singleton):
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
def update_plugin_job(self, pid: str):
def init_workflow_jobs(self):
"""
更新插件定时服务
初始化工作流定时服务
"""
if not self._scheduler or not pid:
for workflow in WorkflowChain().get_workflows() or []:
self.update_workflow_job(workflow)
def remove_workflow_job(self, workflow: Workflow):
"""
移除工作流服务
"""
if not self._scheduler:
return
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
job_id = f"workflow-{workflow.id}"
service = self._jobs.pop(job_id, None)
if not service:
return
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system")
try:
# 在调度器中查找并移除对应的 job
job_removed = False
for job in list(self._scheduler.get_jobs()):
if job_id == job.id:
try:
self._scheduler.remove_job(job.id)
job_removed = True
except JobLookupError:
pass
break
if job_removed:
logger.info(f"移除工作流服务:{service.get('name')}")
except Exception as e:
logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败",
message=str(e),
role="system")
def remove_plugin_job(self, pid: str, job_id: str = None):
"""
@@ -494,6 +490,83 @@ class Scheduler(metaclass=Singleton):
message=str(e),
role="system")
def update_workflow_job(self, workflow: Workflow):
"""
更新工作流定时服务
"""
# 移除该工作流的全部服务
self.remove_workflow_job(workflow)
# 添加工作流服务
with self._lock:
try:
job_id = f"workflow-{workflow.id}"
self._jobs[job_id] = {
"func": WorkflowChain().process,
"name": workflow.name,
"running": False,
}
self._scheduler.add_job(
self.start,
trigger=CronTrigger.from_crontab(workflow.timer),
id=job_id,
name=workflow.name,
kwargs={"job_id": job_id, "workflow_id": job_id},
replace_existing=True
)
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")
except Exception as e:
logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败",
message=str(e),
role="system")
def update_plugin_job(self, pid: str):
"""
更新插件定时服务
"""
if not self._scheduler or not pid:
return
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
return
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system")
def list(self) -> List[schemas.ScheduleInfo]:
"""
当前所有任务

View File

@@ -20,3 +20,4 @@ from .exception import *
from .system import *
from .event import *
from .workflow import *
from .download import *

12
app/schemas/download.py Normal file
View File

@@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel, Field
class DownloadTask(BaseModel):
"""
下载任务
"""
download_id: Optional[str] = Field(None, description="任务ID")
downloader: Optional[str] = Field(None, description="下载器")
completed: Optional[bool] = Field(False, description="是否完成")

View File

@@ -115,3 +115,9 @@ class SiteUserData(BaseModel):
class SiteAuth(BaseModel):
site: Optional[str] = None
params: Optional[Dict[str, Union[int, str]]] = Field(default_factory=dict)
class SiteCategory(BaseModel):
id: Optional[int] = None
cat: Optional[str] = None
desc: Optional[str] = None

View File

@@ -1,13 +1,20 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, List
from pydantic import BaseModel, Field
from app.schemas.context import Context, MediaInfo
from app.schemas.file import FileItem
from app.schemas.download import DownloadTask
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
class Workflow(BaseModel):
"""
工作流信息
"""
id: Optional[str] = Field(None, description="工作流ID")
name: Optional[str] = Field(None, description="工作流名称")
description: Optional[str] = Field(None, description="工作流描述")
timer: Optional[str] = Field(None, description="定时器")
@@ -19,17 +26,38 @@ class Workflow(BaseModel):
add_time: Optional[str] = Field(None, description="创建时间")
last_time: Optional[str] = Field(None, description="最后执行时间")
class Config:
orm_mode = True
class ActionParams(BaseModel):
"""
动作基础参数
"""
pass
class Action(BaseModel):
"""
动作信息
"""
id: Optional[str] = Field(None, description="动作ID (类名)")
name: Optional[str] = Field(None, description="动作名称")
description: Optional[str] = Field(None, description="动作描述")
loop: Optional[bool] = Field(False, description="是否需要循环")
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
params: Optional[ActionParams] = Field({}, description="参数")
class ActionContext(BaseModel, ABC):
class ActionContext(BaseModel):
"""
动作上下文
动作基础上下文,各动作通用数据
"""
pass
content: Optional[str] = Field(None, description="文本类内容")
torrents: Optional[List[Context]] = Field([], description="资源列表")
medias: Optional[List[MediaInfo]] = Field([], description="媒体列表")
fileitems: Optional[List[FileItem]] = Field([], description="文件列表")
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")

View File

@@ -3,6 +3,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.startup.workflow_initializer import init_workflow, stop_workflow
from app.startup.modules_initializer import shutdown_modules, start_modules
from app.startup.plugins_initializer import init_plugins_async
from app.startup.routers_initializer import init_routers
@@ -16,6 +17,8 @@ async def lifespan(app: FastAPI):
print("Starting up...")
# 启动模块
start_modules(app)
# 初始化工作流动作
init_workflow(app)
# 初始化路由
init_routers(app)
# 初始化插件
@@ -35,3 +38,6 @@ async def lifespan(app: FastAPI):
print(f"Error during plugin installation shutdown: {e}")
# 清理模块
shutdown_modules(app)
# 关闭工作流
stop_workflow(app)

View File

@@ -0,0 +1,17 @@
from fastapi import FastAPI
from app.core.workflow import WorkFlowManager
def init_workflow(_: FastAPI):
"""
初始化动作
"""
WorkFlowManager()
def stop_workflow(_: FastAPI):
"""
停止动作
"""
WorkFlowManager().stop()

View File

@@ -63,3 +63,5 @@ OCR_HOST=https://movie-pilot.org
PLUGIN_MARKET=https://github.com/jxxghp/MoviePilot-Plugins,https://github.com/thsrite/MoviePilot-Plugins,https://github.com/InfinityPacer/MoviePilot-Plugins,https://github.com/honue/MoviePilot-Plugins
# 搜索多个名称true/false为true时搜索时会同时搜索中英文及原始名称搜索结果会更全面但会增加搜索时间为false时其中一个名称搜索到结果或全部名称搜索完毕即停止
SEARCH_MULTIPLE_NAME=true
# 为指定字幕添加.default后缀设置为默认字幕支持为'zh-cn''zh-tw''eng'添加默认字幕未定义或设置为None则不添加
DEFAULT_SUB=None

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.2.9'
FRONTEND_VERSION = 'v2.2.9'
APP_VERSION = 'v2.3.0'
FRONTEND_VERSION = 'v2.3.0'