Compare commits

..

22 Commits

Author SHA1 Message Date
jxxghp
e662338d6f Merge pull request #3995 from KoWming/v2 2025-03-10 12:48:31 +08:00
KoWming
2c1d6817dd Update security.py 2025-03-10 12:46:06 +08:00
jxxghp
5d4a3fec1f v2.3.4
- 新增支持设定消息发送的时间范围
- 探索标签页支持拖动排序
- 修复演员头像不显示的问题
- 修复站点流控不生效的问题
- 修复短时间内重复保存设定后定时任务消失的问题
- 修复工作流执行数据叠加的问题
2025-03-10 10:08:34 +08:00
jxxghp
6603a30e7e fix MessageQueueManager 2025-03-10 10:02:32 +08:00
jxxghp
81d08ca517 fix MessageQueueManager 2025-03-10 08:24:28 +08:00
jxxghp
e04506a614 fix workflow message link 2025-03-09 21:07:52 +08:00
jxxghp
39756512ae feat: 支持消息发送时间范围 2025-03-09 19:34:05 +08:00
jxxghp
71c29ea5e7 fix ide warnings 2025-03-09 18:35:52 +08:00
jxxghp
87ce266b14 fix warnings 2025-03-09 16:48:32 +08:00
jxxghp
ed6d856c24 Merge remote-tracking branch 'origin/v2' into v2 2025-03-09 16:33:01 +08:00
jxxghp
d3ecbef946 fix warnings 2025-03-09 08:37:05 +08:00
jxxghp
7b24f5eb21 fix:站点流控 2025-03-07 08:19:28 +08:00
jxxghp
e1f82e338a fix:定时任务初始化加锁 2025-03-07 08:07:57 +08:00
jxxghp
a835d34a01 Merge pull request #3975 from so1ve/patch-1 2025-03-06 06:54:11 +08:00
Ray
79d70c9977 fix: 标签为"官组"的种子应识别为官种 2025-03-05 22:10:28 +08:00
jxxghp
aea82723cb Merge pull request #3965 from mackerel-12138/fix_s0_scrap 2025-03-05 11:56:22 +08:00
zhanglijun
d47ff0b31a 修复s0集信息错误 2025-03-04 23:18:41 +08:00
jxxghp
affcb9d5c3 fix bug 2025-03-04 14:22:32 +08:00
jxxghp
9be2686733 Merge pull request #3957 from thsrite/v2 2025-03-03 14:22:06 +08:00
thsrite
7126fed2b5 fix docker container log duplicate printing 2025-03-03 13:44:38 +08:00
jxxghp
5bc4330e1c 修复HDDolby 2025-03-02 14:55:18 +08:00
jxxghp
b25ac7116e 更新 hddolby.py 2025-03-02 14:41:55 +08:00
46 changed files with 671 additions and 489 deletions

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Any, Union
from typing import Union
from app.chain import ChainBase
from app.db.systemconfig_oper import SystemConfigOper

View File

@@ -15,10 +15,10 @@ class AddDownloadParams(ActionParams):
"""
添加下载资源参数
"""
downloader: Optional[str] = Field(None, description="下载器")
save_path: Optional[str] = Field(None, description="保存路径")
labels: Optional[str] = Field(None, description="标签(,分隔)")
only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源")
downloader: Optional[str] = Field(default=None, description="下载器")
save_path: Optional[str] = Field(default=None, description="保存路径")
labels: Optional[str] = Field(default=None, description="标签(,分隔)")
only_lack: Optional[bool] = Field(default=False, description="仅下载缺失的资源")
class AddDownloadAction(BaseAction):
@@ -34,6 +34,8 @@ class AddDownloadAction(BaseAction):
super().__init__(action_id)
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
self._added_downloads = []
self._has_error = False
@classmethod
@property

View File

@@ -26,6 +26,8 @@ class AddSubscribeAction(BaseAction):
super().__init__(action_id)
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
self._added_subscribes = []
self._has_error = False
@classmethod
@property

View File

@@ -21,6 +21,7 @@ class FetchDownloadsAction(BaseAction):
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
self._downloads = []
@classmethod
@property

View File

@@ -17,9 +17,9 @@ class FetchMediasParams(ActionParams):
"""
获取媒体数据参数
"""
source_type: Optional[str] = Field("ranking", description="来源")
sources: Optional[List[str]] = Field([], description="榜单")
api_path: Optional[str] = Field(None, description="API路径")
source_type: Optional[str] = Field(default="ranking", description="来源")
sources: Optional[List[str]] = Field(default=[], description="榜单")
api_path: Optional[str] = Field(default=None, description="API路径")
class FetchMediasAction(BaseAction):
@@ -34,6 +34,8 @@ class FetchMediasAction(BaseAction):
def __init__(self, action_id: str):
super().__init__(action_id)
self._medias = []
self._has_error = False
self.__inner_sources = [
{
"func": RecommendChain().tmdb_trending,

View File

@@ -15,13 +15,13 @@ class FetchRssParams(ActionParams):
"""
获取RSS资源列表参数
"""
url: str = Field(None, description="RSS地址")
proxy: Optional[bool] = Field(False, description="是否使用代理")
timeout: Optional[int] = Field(15, description="超时时间")
content_type: Optional[str] = Field(None, description="Content-Type")
referer: Optional[str] = Field(None, description="Referer")
ua: Optional[str] = Field(None, description="User-Agent")
match_media: Optional[str] = Field(None, description="匹配媒体信息")
url: str = Field(default=None, description="RSS地址")
proxy: Optional[bool] = Field(default=False, description="是否使用代理")
timeout: Optional[int] = Field(default=15, description="超时时间")
content_type: Optional[str] = Field(default=None, description="Content-Type")
referer: Optional[str] = Field(default=None, description="Referer")
ua: Optional[str] = Field(default=None, description="User-Agent")
match_media: Optional[str] = Field(default=None, description="匹配媒体信息")
class FetchRssAction(BaseAction):
@@ -36,6 +36,8 @@ class FetchRssAction(BaseAction):
super().__init__(action_id)
self.rsshelper = RssHelper()
self.chain = ActionChain()
self._rss_torrents = []
self._has_error = False
@classmethod
@property

View File

@@ -15,13 +15,13 @@ class FetchTorrentsParams(ActionParams):
"""
获取站点资源参数
"""
search_type: Optional[str] = Field("keyword", description="搜索类型")
name: Optional[str] = Field(None, description="资源名称")
year: Optional[str] = Field(None, description="年份")
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
sites: Optional[List[int]] = Field([], description="站点列表")
match_media: Optional[bool] = Field(False, description="匹配媒体信息")
search_type: Optional[str] = Field(default="keyword", description="搜索类型")
name: Optional[str] = Field(default=None, description="资源名称")
year: Optional[str] = Field(default=None, description="年份")
type: Optional[str] = Field(default=None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(default=None, description="季度")
sites: Optional[List[int]] = Field(default=[], description="站点列表")
match_media: Optional[bool] = Field(default=False, description="匹配媒体信息")
class FetchTorrentsAction(BaseAction):
@@ -34,6 +34,7 @@ class FetchTorrentsAction(BaseAction):
def __init__(self, action_id: str):
super().__init__(action_id)
self.searchchain = SearchChain()
self._torrents = []
@classmethod
@property

View File

@@ -12,9 +12,9 @@ class FilterMediasParams(ActionParams):
"""
过滤媒体数据参数
"""
type: Optional[str] = Field(None, description="媒体类型 (电影/电视剧)")
vote: Optional[int] = Field(0, description="评分")
year: Optional[str] = Field(None, description="年份")
type: Optional[str] = Field(default=None, description="媒体类型 (电影/电视剧)")
vote: Optional[int] = Field(default=0, description="评分")
year: Optional[str] = Field(default=None, description="年份")
class FilterMediasAction(BaseAction):
@@ -24,6 +24,10 @@ class FilterMediasAction(BaseAction):
_medias = []
def __init__(self, action_id: str):
super().__init__(action_id)
self._medias = []
@classmethod
@property
def name(cls) -> str: # noqa

View File

@@ -13,13 +13,13 @@ class FilterTorrentsParams(ActionParams):
"""
过滤资源数据参数
"""
rule_groups: Optional[List[str]] = Field([], description="规则组")
quality: Optional[str] = Field(None, description="资源质量")
resolution: Optional[str] = Field(None, description="资源分辨率")
effect: Optional[str] = Field(None, description="特效")
include: Optional[str] = Field(None, description="包含规则")
exclude: Optional[str] = Field(None, description="排除规则")
size: Optional[str] = Field(None, description="资源大小范围MB")
rule_groups: Optional[List[str]] = Field(default=[], description="规则组")
quality: Optional[str] = Field(default=None, description="资源质量")
resolution: Optional[str] = Field(default=None, description="资源分辨率")
effect: Optional[str] = Field(default=None, description="特效")
include: Optional[str] = Field(default=None, description="包含规则")
exclude: Optional[str] = Field(default=None, description="排除规则")
size: Optional[str] = Field(default=None, description="资源大小范围MB")
class FilterTorrentsAction(BaseAction):
@@ -33,6 +33,7 @@ class FilterTorrentsAction(BaseAction):
super().__init__(action_id)
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()
self._torrents = []
@classmethod
@property

View File

@@ -15,8 +15,8 @@ class ScanFileParams(ActionParams):
整理文件参数
"""
# 存储
storage: Optional[str] = Field("local", description="存储")
directory: Optional[str] = Field(None, description="目录")
storage: Optional[str] = Field(default="local", description="存储")
directory: Optional[str] = Field(default=None, description="目录")
class ScanFileAction(BaseAction):
@@ -30,6 +30,8 @@ class ScanFileAction(BaseAction):
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self._fileitems = []
self._has_error = False
@classmethod
@property

View File

@@ -28,6 +28,8 @@ class ScrapeFileAction(BaseAction):
super().__init__(action_id)
self.storagechain = StorageChain()
self.mediachain = MediaChain()
self._scraped_files = []
self._has_error = False
@classmethod
@property

View File

@@ -4,14 +4,15 @@ from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext, Notification
from core.config import settings
class SendMessageParams(ActionParams):
"""
发送消息参数
"""
client: Optional[List[str]] = Field([], description="消息渠道")
userid: Optional[Union[str, int]] = Field(None, description="用户ID")
client: Optional[List[str]] = Field(default=[], description="消息渠道")
userid: Optional[Union[str, int]] = Field(default=None, description="用户ID")
class SendMessageAction(BaseAction):
@@ -57,7 +58,7 @@ class SendMessageAction(BaseAction):
index += 1
# 发送消息
if not params.client:
params.client = [None]
params.client = [""]
for client in params.client:
self.chain.post_message(
Notification(
@@ -65,7 +66,7 @@ class SendMessageAction(BaseAction):
userid=params.userid,
title="【工作流执行结果】",
text=msg_text,
link="#/workflow"
link=settings.MP_DOMAIN("#/workflow")
)
)

View File

@@ -18,7 +18,7 @@ class TransferFileParams(ActionParams):
整理文件参数
"""
# 来源
source: Optional[str] = Field("downloads", description="来源")
source: Optional[str] = Field(default="downloads", description="来源")
class TransferFileAction(BaseAction):
@@ -34,6 +34,8 @@ class TransferFileAction(BaseAction):
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
self._fileitems = []
self._has_error = False
@classmethod
@property

View File

@@ -29,7 +29,7 @@ def search_by_id(mediaid: str,
mtype: str = None,
area: str = "title",
title: str = None,
year: int = None,
year: str = None,
season: str = None,
sites: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:

View File

@@ -24,7 +24,7 @@ from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.message import MessageHelper, MessageQueueManager
from app.helper.progress import ProgressHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper
@@ -479,6 +479,7 @@ def reload_module(_: User = Depends(get_current_active_superuser)):
"""
重新加载模块(仅管理员)
"""
MessageQueueManager().init_config()
ModuleManager().reload()
Scheduler().init()
Monitor().init()

View File

@@ -16,7 +16,7 @@ from app.core.meta import MetaBase
from app.core.module import ModuleManager
from app.db.message_oper import MessageOper
from app.db.user_oper import UserOper
from app.helper.message import MessageHelper
from app.helper.message import MessageHelper, MessageQueueManager
from app.helper.service import ServiceConfigHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
@@ -38,6 +38,9 @@ class ChainBase(metaclass=ABCMeta):
self.eventmanager = EventManager()
self.messageoper = MessageOper()
self.messagehelper = MessageHelper()
self.messagequeue = MessageQueueManager(
send_callback=self.run_module
)
self.useroper = UserOper()
@staticmethod
@@ -491,11 +494,6 @@ class ChainBase(metaclass=ABCMeta):
:param message: 消息体
:return: 成功或失败
"""
logger.info(f"发送消息channel={message.channel}"
f"source={message.source},"
f"title={message.title}, "
f"text={message.text}"
f"userid={message.userid}")
# 保存原消息
self.messagehelper.put(message, role="user", title=message.title)
self.messageoper.add(**message.dict())
@@ -545,13 +543,13 @@ class ChainBase(metaclass=ABCMeta):
# 按设定发送
self.eventmanager.send_event(etype=EventType.NoticeMessage,
data={**send_message.dict(), "type": send_message.mtype})
self.run_module("post_message", message=send_message)
self.messagequeue.send_message("post_message", message=send_message)
if not send_orignal:
return
# 发送消息事件
self.eventmanager.send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype})
# 按原消息发送
self.run_module("post_message", message=message)
self.messagequeue.send_message("post_message", message=message)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -563,7 +561,7 @@ class ChainBase(metaclass=ABCMeta):
note_list = [media.to_dict() for media in medias]
self.messagehelper.put(message, role="user", note=note_list, title=message.title)
self.messageoper.add(**message.dict(), note=note_list)
return self.run_module("post_medias_message", message=message, medias=medias)
return self.messagequeue.send_message("post_medias_message", message=message, medias=medias)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -575,7 +573,7 @@ class ChainBase(metaclass=ABCMeta):
note_list = [torrent.torrent_info.to_dict() for torrent in torrents]
self.messagehelper.put(message, role="user", note=note_list, title=message.title)
self.messageoper.add(**message.dict(), note=note_list)
return self.run_module("post_torrents_message", message=message, torrents=torrents)
return self.messagequeue.send_message("post_torrents_message", message=message, torrents=torrents)
def metadata_img(self, mediainfo: MediaInfo, season: int = None, episode: int = None) -> Optional[dict]:
"""

View File

@@ -1,5 +1,5 @@
import threading
from typing import List, Union, Optional, Generator
from typing import List, Union, Optional, Generator, Any
from app.chain import ChainBase
from app.core.cache import cached
@@ -27,8 +27,8 @@ class MediaServerChain(ChainBase):
"""
return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden)
def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
def items(self, server: str, library_id: Union[str, int],
start_index: int = 0, limit: Optional[int] = -1) -> Generator[Any, None, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据

View File

@@ -312,11 +312,6 @@ class SearchChain(ChainBase):
for indexer in self.siteshelper.get_indexers():
# 检查站点索引开关
if not sites or indexer.get("id") in sites:
# 站点流控
state, msg = self.siteshelper.check(indexer.get("domain"))
if state:
logger.warn(msg)
continue
indexer_sites.append(indexer)
if not indexer_sites:
logger.warn('未开启任何有效站点,无法搜索资源')

View File

@@ -52,6 +52,7 @@ class SiteChain(ChainBase):
"1ptba.com": self.__indexphp_test,
"star-space.net": self.__indexphp_test,
"yemapt.org": self.__yema_test,
"hddolby.com": self.__hddolby_test,
}
def refresh_userdata(self, site: dict = None) -> Optional[SiteUserData]:
@@ -251,6 +252,32 @@ class SiteChain(ChainBase):
site.url = f"{site.url}index.php"
return self.__test(site)
@staticmethod
def __hddolby_test(site: Site) -> Tuple[bool, str]:
"""
判断站点是否已经登陆hddolby
"""
url = f"{site.url}api/v1/user/data"
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"x-api-key": site.apikey,
}
res = RequestUtils(
headers=headers,
proxies=settings.PROXY if site.proxy else None,
timeout=site.timeout or 15
).get_res(url=url)
if res is None:
return False, "无法打开网站!"
if res.status_code == 200:
user_info = res.json()
if user_info and user_info.get("status") == 0:
return True, "连接成功"
return False, "APIKEY已过期"
else:
return False, f"错误:{res.status_code} {res.reason}"
@staticmethod
def __parse_favicon(url: str, cookie: str, ua: str) -> Tuple[str, Optional[str]]:
"""

View File

@@ -1262,7 +1262,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
订阅相关的下载和文件信息
"""
if not subscribe:
return
return None
# 返回订阅数据
subscribe_info = schemas.SubscrbieInfo()

View File

@@ -606,7 +606,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}")
def __handle_transfer(self, task: TransferTask,
callback: Optional[Callable] = None) -> Tuple[bool, str]:
callback: Optional[Callable] = None) -> Optional[Tuple[bool, str]]:
"""
处理整理任务
"""
@@ -671,9 +671,17 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 获取集数据
if task.mediainfo.type == MediaType.TV and not task.episodes_info:
# 判断注意season为0的情况
season_num = task.mediainfo.season
if season_num is None and task.meta.season_seq:
if task.meta.season_seq.isdigit():
season_num = int(task.meta.season_seq)
# 默认值1
if season_num is None:
season_num = 1
task.episodes_info = self.tmdbchain.tmdb_episodes(
tmdbid=task.mediainfo.tmdb_id,
season=task.mediainfo.season or task.meta.begin_season or 1
season=season_num
)
# 查询整理目标目录

View File

@@ -363,7 +363,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
raise ValueError(f"配置项 '{field_name}' 的值 '{value}' 无法转换成正确的类型") from e
logger.error(
f"配置项 '{field_name}' 的值 '{value}' 无法转换成正确的类型,使用默认值 '{default}',错误信息: {e}")
return default, True
return default, True
@validator('*', pre=True, always=True)
def generic_type_validator(cls, value: Any, field): # noqa

View File

@@ -121,7 +121,7 @@ class ModuleManager(metaclass=Singleton):
获取实现了同一方法的模块列表
"""
if not self._running_modules:
return []
return
for _, module in self._running_modules.items():
if hasattr(module, method) \
and ObjectUtils.check_method(getattr(module, method)):
@@ -132,7 +132,7 @@ class ModuleManager(metaclass=Singleton):
获取指定类型的模块列表
"""
if not self._running_modules:
return []
return
for _, module in self._running_modules.items():
if hasattr(module, 'get_type') \
and module.get_type() == module_type:
@@ -143,7 +143,7 @@ class ModuleManager(metaclass=Singleton):
获取指定子类型的模块
"""
if not self._running_modules:
return []
return
for _, module in self._running_modules.items():
if hasattr(module, 'get_subtype') \
and module.get_subtype() == module_subtype:

View File

@@ -4,7 +4,8 @@ import hmac
import json
import os
import traceback
from datetime import datetime, timedelta
import datetime
from datetime import timedelta
from typing import Any, Union, Annotated, Optional
import jwt
@@ -69,13 +70,13 @@ def create_access_token(
if expires_delta is not None:
if expires_delta.total_seconds() <= 0:
raise ValueError("过期时间必须为正数")
expire = datetime.utcnow() + expires_delta
expire = datetime.datetime.now(datetime.UTC) + expires_delta
else:
expire = datetime.utcnow() + default_expire
expire = datetime.datetime.now(datetime.UTC) + default_expire
to_encode = {
"exp": expire,
"iat": datetime.utcnow(),
"iat": datetime.datetime.now(datetime.UTC),
"sub": str(userid),
"username": username,
"super_user": super_user,
@@ -102,7 +103,7 @@ def __set_or_refresh_resource_token_cookie(request: Request, response: Response,
decoded_token = jwt.decode(resource_token, settings.RESOURCE_SECRET_KEY, algorithms=[ALGORITHM])
exp = decoded_token.get("exp")
if exp:
remaining_time = datetime.utcfromtimestamp(exp) - datetime.utcnow()
remaining_time = datetime.datetime.fromtimestamp(exp, tz=datetime.UTC) - datetime.datetime.now(datetime.UTC)
# 根据剩余时长提前刷新令牌
if remaining_time < timedelta(seconds=(settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS / 3)):
raise jwt.ExpiredSignatureError

View File

@@ -63,6 +63,8 @@ class WorkFlowManager(metaclass=Singleton):
if not context:
context = ActionContext()
if action.type in self._actions:
# 实例化之前,清理掉类对象的数据
# 实例化
action_obj = self._actions[action.type](action.id)
# 执行

View File

@@ -1,4 +1,4 @@
from typing import Callable, Any
from typing import Callable, Any, Optional
from playwright.sync_api import sync_playwright, Page
from cf_clearance import sync_cf_retry, sync_stealth
@@ -61,7 +61,7 @@ class PlaywrightHelper:
ua: str = None,
proxies: dict = None,
headless: bool = False,
timeout: int = 20) -> str:
timeout: int = 20) -> Optional[str]:
"""
获取网页源码
:param url: 网页地址

View File

@@ -1,9 +1,152 @@
from __future__ import annotations
import json
import queue
import threading
import time
from typing import Optional, Any, Union
from datetime import datetime
from typing import Any, Union
from typing import List, Optional, Callable
from app.utils.singleton import Singleton
from app.core.config import global_vars
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas.types import SystemConfigKey
from app.utils.singleton import Singleton, SingletonClass
from app.log import logger
class MessageQueueManager(metaclass=SingletonClass):
"""
消息发送队列管理器
"""
schedule_periods: List[tuple[int, int, int, int]] = []
def __init__(
self,
send_callback: Optional[Callable] = None,
check_interval: int = 10
) -> None:
"""
消息队列管理器初始化
:param send_callback: 实际发送消息的回调函数
:param check_interval: 时间检查间隔(秒)
"""
self.init_config()
self.queue: queue.Queue[Any] = queue.Queue()
self.send_callback = send_callback
self.check_interval = check_interval
self._running = True
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.thread.start()
def init_config(self):
"""
初始化配置
"""
self.schedule_periods = self._parse_schedule(
SystemConfigOper().get(SystemConfigKey.NotificationSendTime)
)
@staticmethod
def _parse_schedule(periods: Union[list, dict]) -> List[tuple[int, int, int, int]]:
"""
将字符串时间格式转换为分钟数元组
"""
parsed = []
if not periods:
return parsed
if not isinstance(periods, list):
periods = [periods]
for period in periods:
if not period:
continue
start_h, start_m = map(int, period['start'].split(':'))
end_h, end_m = map(int, period['end'].split(':'))
parsed.append((start_h, start_m, end_h, end_m))
return parsed
@staticmethod
def _time_to_minutes(time_str: str) -> int:
"""
'HH:MM' 格式转换为分钟数
"""
hours, minutes = map(int, time_str.split(':'))
return hours * 60 + minutes
def _is_in_scheduled_time(self, current_time: datetime) -> bool:
"""
检查当前时间是否在允许发送的时间段内
"""
if not self.schedule_periods:
return True
current_minutes = current_time.hour * 60 + current_time.minute
for period in self.schedule_periods:
s_h, s_m, e_h, e_m = period
start = s_h * 60 + s_m
end = e_h * 60 + e_m
if start <= end:
if start <= current_minutes <= end:
return True
else:
if current_minutes >= start or current_minutes <= end:
return True
return False
def send_message(self, *args, **kwargs) -> None:
"""
发送消息(立即发送或加入队列)
"""
if self._is_in_scheduled_time(datetime.now()):
self._send(*args, **kwargs)
else:
self.queue.put({
"args": args,
"kwargs": kwargs
})
logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}")
def _send(self, *args, **kwargs) -> None:
"""
实际发送消息(可通过回调函数自定义)
"""
if self.send_callback:
try:
logger.info(f"发送消息:{kwargs}")
self.send_callback(*args, **kwargs)
except Exception as e:
logger.error(f"发送消息错误:{str(e)}")
def _monitor_loop(self) -> None:
"""
后台线程循环检查时间并处理队列
"""
while self._running:
current_time = datetime.now()
if self._is_in_scheduled_time(current_time):
while not self.queue.empty():
if global_vars.is_system_stopped:
break
if not self._is_in_scheduled_time(datetime.now()):
break
try:
message = self.queue.get_nowait()
self._send(*message['args'], **message['kwargs'])
logger.info(f"队列剩余消息:{self.queue.qsize()}")
except queue.Empty:
break
time.sleep(self.check_interval)
def stop(self) -> None:
"""
停止队列管理器
"""
self._running = False
self.thread.join()
class MessageHelper(metaclass=Singleton):

View File

@@ -448,58 +448,6 @@ class PluginHelper(metaclass=Singleton):
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
@staticmethod
def __pip_uninstall_and_install_with_fallback(requirements_file: Path) -> Tuple[bool, str]:
"""
先卸载 requirements.txt 中的依赖,再按照自动降级策略重新安装,不使用 PIP 缓存
:param requirements_file: 依赖的 requirements.txt 文件路径
:return: (是否成功, 错误信息)
"""
# 读取 requirements.txt 文件中的依赖列表
try:
with open(requirements_file, "r", encoding="utf-8") as f:
dependencies = [line.strip() for line in f if line.strip() and not line.startswith("#")]
except Exception as e:
return False, f"无法读取 requirements.txt 文件:{str(e)}"
# 1. 先卸载所有依赖包
for dep in dependencies:
pip_uninstall_command = ["pip", "uninstall", "-y", dep]
logger.debug(f"尝试卸载依赖:{dep},命令:{' '.join(pip_uninstall_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_uninstall_command)
if success:
logger.debug(f"依赖 {dep} 卸载成功,输出:{message}")
else:
error_message = f"卸载依赖 {dep} 失败,错误信息:{message}"
logger.error(error_message)
# 2. 重新安装所有依赖,使用自动降级策略
strategies = []
# 添加策略到列表中
if settings.PIP_PROXY:
strategies.append(("镜像站",
["pip", "install", "-r", str(requirements_file),
"-i", settings.PIP_PROXY, "--no-cache-dir"]))
if settings.PROXY_HOST:
strategies.append(("代理",
["pip", "install", "-r", str(requirements_file),
"--proxy", settings.PROXY_HOST, "--no-cache-dir"]))
strategies.append(("直连", ["pip", "install", "-r", str(requirements_file), "--no-cache-dir"]))
# 遍历策略进行安装
for strategy_name, pip_command in strategies:
logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_command)
if success:
logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}")
return True, message
else:
logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}")
return False, "[PIP] 所有策略均安装依赖失败,请检查网络连接或 PIP 配置"
@staticmethod
def __pip_install_with_fallback(requirements_file: Path) -> Tuple[bool, str]:
"""

View File

@@ -246,12 +246,12 @@ class LoggerManager:
else:
# 使用默认日志文件
logfile = self._default_log_file
# 获取调用者的模块的logger
_logger = self._loggers.get(logfile)
if not _logger:
_logger = self.__setup_logger(log_file=logfile)
self._loggers[logfile] = _logger
with LoggerManager._lock: # 添加锁
# 获取调用者的模块的logger
_logger = self._loggers.get(logfile)
if not _logger:
_logger = self.__setup_logger(log_file=logfile)
self._loggers[logfile] = _logger
# 调用logger的方法打印日志
if hasattr(_logger, method):
log_method = getattr(_logger, method)

View File

@@ -3,7 +3,7 @@ import re
import traceback
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union, Dict, Generator, Tuple
from typing import List, Optional, Union, Dict, Generator, Tuple, Any
from requests import Response
@@ -13,6 +13,7 @@ from app.log import logger
from app.schemas.types import MediaType
from app.utils.http import RequestUtils
from app.utils.url import UrlUtils
from schemas import MediaServerItem
class Emby:
@@ -545,7 +546,7 @@ class Emby:
return False
return False
def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> bool:
def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> Optional[bool]:
"""
按类型、名称、年份来刷新媒体库
:param items: 已识别的需要刷新媒体库的媒体信息列表
@@ -668,8 +669,8 @@ class Emby:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e))
return None
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
def get_items(self, parent: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据

View File

@@ -201,12 +201,12 @@ class Alist(StorageBase, metaclass=Singleton):
if resp is None:
logging.warning(f"请求获取目录 {fileitem.path} 的文件列表失败无法连接alist服务")
return
return None
if resp.status_code != 200:
logging.warning(
f"请求获取目录 {fileitem.path} 的文件列表失败,状态码:{resp.status_code}"
)
return
return None
result = resp.json()
@@ -214,7 +214,7 @@ class Alist(StorageBase, metaclass=Singleton):
logging.warning(
f'获取目录 {fileitem.path} 的文件列表失败,错误信息:{result["message"]}'
)
return
return None
return [
schemas.FileItem(
@@ -259,15 +259,15 @@ class Alist(StorageBase, metaclass=Singleton):
"""
if resp is None:
logging.warning(f"请求创建目录 {path} 失败无法连接alist服务")
return
return None
if resp.status_code != 200:
logging.warning(f"请求创建目录 {path} 失败,状态码:{resp.status_code}")
return
return None
result = resp.json()
if result["code"] != 200:
logging.warning(f'创建目录 {path} 失败,错误信息:{result["message"]}')
return
return None
return self.get_item(path)
@@ -349,15 +349,15 @@ class Alist(StorageBase, metaclass=Singleton):
"""
if resp is None:
logging.warning(f"请求获取文件 {path} 失败无法连接alist服务")
return
return None
if resp.status_code != 200:
logging.warning(f"请求获取文件 {path} 失败,状态码:{resp.status_code}")
return
return None
result = resp.json()
if result["code"] != 200:
logging.debug(f'获取文件 {path} 失败,错误信息:{result["message"]}')
return
return None
return schemas.FileItem(
storage=self.schema.value,
@@ -513,15 +513,15 @@ class Alist(StorageBase, metaclass=Singleton):
"""
if not resp:
logging.warning(f"请求获取文件 {path} 失败无法连接alist服务")
return
return None
if resp.status_code != 200:
logging.warning(f"请求获取文件 {path} 失败,状态码:{resp.status_code}")
return
return None
result = resp.json()
if result["code"] != 200:
logging.warning(f'获取文件 {path} 失败,错误信息:{result["message"]}')
return
return None
if result["data"]["raw_url"]:
download_url = result["data"]["raw_url"]
@@ -569,7 +569,7 @@ class Alist(StorageBase, metaclass=Singleton):
if resp.status_code != 200:
logging.warning(f"请求上传文件 {path} 失败,状态码:{resp.status_code}")
return
return None
new_item = self.get_item(Path(fileitem.path) / path.name)
if new_item and new_name and new_name != path.name:

View File

@@ -52,7 +52,7 @@ class FilterModule(_ModuleBase):
},
# 官种
"GZ": {
"include": [r'官方', r'官种'],
"include": [r'官方', r'官种', r'官组'],
"match": ["labels"]
},
# 特效字幕
@@ -259,7 +259,7 @@ class FilterModule(_ModuleBase):
return None if not matched else torrent
def __match_group(self, torrent: TorrentInfo, rule_group: Union[list, str]) -> bool:
def __match_group(self, torrent: TorrentInfo, rule_group: Union[list, str]) -> Optional[bool]:
"""
判断种子是否匹配规则组
"""

View File

@@ -122,6 +122,12 @@ class IndexerModule(_ModuleBase):
logger.warn(f"{site.get('name')} 不支持中文搜索")
continue
# 站点流控
state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain")))
if state:
logger.warn(msg)
continue
# 去除搜索关键字中的特殊字符
if search_word:
search_word = StringUtils.clear(search_word, replace_word=" ", allow_space=True)

View File

@@ -54,7 +54,7 @@ class IptSiteUserInfo(SiteParserBase):
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
html = etree.HTML(html_text)
if not StringUtils.is_valid_html_element(html):
return
return None
# seeding start
seeding_end_pos = 3
if html.xpath('//tr/td[text() = "Leechers"]'):

View File

@@ -65,7 +65,7 @@ class TNodeSiteUserInfo(SiteParserBase):
"""
seeding_info = json.loads(html_text)
if seeding_info.get("status") != 200:
return
return None
torrents = seeding_info.get("data", {}).get("torrents", [])

View File

@@ -162,7 +162,7 @@ class HddolbySpider:
'downloadvolumefactor': self.__get_downloadvolumefactor(result.get('promotion_time_type')),
'uploadvolumefactor': self.__get_uploadvolumefactor(result.get('promotion_time_type')),
'freedate': result.get('promotion_until'),
'page_url': self._pageurl % (self._domain, result.get('id')),
'page_url': self._pageurl % result.get('id'),
'labels': torrentLabels,
'category': category
}

View File

@@ -1,6 +1,6 @@
import json
from datetime import datetime
from typing import List, Union, Optional, Dict, Generator, Tuple
from typing import List, Union, Optional, Dict, Generator, Tuple, Any
from requests import Response
@@ -10,6 +10,7 @@ from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.url import UrlUtils
from schemas import MediaServerItem
class Jellyfin:
@@ -548,7 +549,7 @@ class Jellyfin:
logger.error(f"连接Items/Id/Ancestors出错" + str(e))
return None
def refresh_root_library(self) -> bool:
def refresh_root_library(self) -> Optional[bool]:
"""
通知Jellyfin刷新整个媒体库
"""
@@ -762,7 +763,7 @@ class Jellyfin:
return None
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
-> Generator[MediaServerItem | None | Any, Any, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据

View File

@@ -14,6 +14,7 @@ from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.url import UrlUtils
from schemas import MediaServerItem
class Plex:
@@ -367,7 +368,7 @@ class Plex:
return False
return self._plex.library.update()
def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> bool:
def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> Optional[bool]:
"""
按路径刷新媒体库 item: target_path
"""
@@ -512,7 +513,7 @@ class Plex:
)
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
-> Generator[MediaServerItem | None, Any, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
@@ -855,7 +856,7 @@ class Plex:
:param kwargs: 其他请求参数如headers, cookies, proxies等
"""
if not self._session:
return
return None
try:
url = UrlUtils.adapt_request_url(host=self._host, endpoint=endpoint)
kwargs.setdefault("headers", self.__get_request_headers())

View File

@@ -30,6 +30,9 @@ from app.utils.singleton import Singleton
from app.utils.timer import TimerUtils
lock = threading.Lock()
class SchedulerChain(ChainBase):
pass
@@ -56,85 +59,6 @@ class Scheduler(metaclass=Singleton):
"""
初始化定时服务
"""
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
"name": "同步CookieCloud站点",
"func": SiteChain().sync_cookies,
"running": False,
},
"mediaserver_sync": {
"name": "同步媒体服务器",
"func": MediaServerChain().sync,
"running": False,
},
"subscribe_tmdb": {
"name": "订阅元数据更新",
"func": SubscribeChain().check,
"running": False,
},
"subscribe_search": {
"name": "订阅搜索补全",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "R"
}
},
"new_subscribe_search": {
"name": "新增订阅搜索",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "N"
}
},
"subscribe_refresh": {
"name": "订阅刷新",
"func": SubscribeChain().refresh,
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
"running": False,
},
"clear_cache": {
"name": "缓存清理",
"func": self.clear_cache,
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": self.user_auth,
"running": False,
},
"scheduler_job": {
"name": "公共定时服务",
"func": SchedulerChain().scheduler_job,
"running": False,
},
"random_wallpager": {
"name": "壁纸缓存",
"func": TmdbChain().get_trending_wallpapers,
"running": False,
},
"sitedata_refresh": {
"name": "站点数据刷新",
"func": SiteChain().refresh_userdatas,
"running": False,
},
"recommend_refresh": {
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
}
}
# 停止定时服务
self.stop()
@@ -143,221 +67,302 @@ class Scheduler(metaclass=Singleton):
if settings.DEV:
return
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(100)
})
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL \
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
kwargs={
'job_id': 'cookiecloud'
with lock:
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
"name": "同步CookieCloud站点",
"func": SiteChain().sync_cookies,
"running": False,
},
"mediaserver_sync": {
"name": "同步媒体服务器",
"func": MediaServerChain().sync,
"running": False,
},
"subscribe_tmdb": {
"name": "订阅元数据更新",
"func": SubscribeChain().check,
"running": False,
},
"subscribe_search": {
"name": "订阅搜索补全",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "R"
}
},
"new_subscribe_search": {
"name": "新增订阅搜索",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "N"
}
},
"subscribe_refresh": {
"name": "订阅刷新",
"func": SubscribeChain().refresh,
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
"running": False,
},
"clear_cache": {
"name": "缓存清理",
"func": self.clear_cache,
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": self.user_auth,
"running": False,
},
"scheduler_job": {
"name": "公共定时服务",
"func": SchedulerChain().scheduler_job,
"running": False,
},
"random_wallpager": {
"name": "壁纸缓存",
"func": TmdbChain().get_trending_wallpapers,
"running": False,
},
"sitedata_refresh": {
"name": "站点数据刷新",
"func": SiteChain().refresh_userdatas,
"running": False,
},
"recommend_refresh": {
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
}
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL \
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'mediaserver_sync'
}
)
# 新增订阅时搜索5分钟检查一次
self._scheduler.add_job(
self.start,
"interval",
id="new_subscribe_search",
name="新增订阅搜索",
minutes=5,
kwargs={
'job_id': 'new_subscribe_search'
}
)
# 检查更新订阅TMDB数据每隔6小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
)
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(100)
})
# 订阅状态每隔24小时搜索一次
if settings.SUBSCRIBE_SEARCH:
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_search",
name="订阅搜索补全",
hours=24,
kwargs={
'job_id': 'subscribe_search'
}
)
if settings.SUBSCRIBE_MODE == "spider":
# 站点首页种子定时刷新模式
triggers = TimerUtils.random_scheduler(num_executions=32)
for trigger in triggers:
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL \
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"cron",
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
kwargs={
'job_id': 'cookiecloud'
}
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL \
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'mediaserver_sync'
}
)
# 新增订阅时搜索5分钟检查一次
self._scheduler.add_job(
self.start,
"interval",
id="new_subscribe_search",
name="新增订阅搜索",
minutes=5,
kwargs={
'job_id': 'new_subscribe_search'
}
)
# 检查更新订阅TMDB数据每隔6小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
)
# 订阅状态每隔24小时搜索一次
if settings.SUBSCRIBE_SEARCH:
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_search",
name="订阅搜索补全",
hours=24,
kwargs={
'job_id': 'subscribe_search'
}
)
if settings.SUBSCRIBE_MODE == "spider":
# 站点首页种子定时刷新模式
triggers = TimerUtils.random_scheduler(num_executions=32)
for trigger in triggers:
self._scheduler.add_job(
self.start,
"cron",
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
kwargs={
'job_id': 'subscribe_refresh'
})
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL \
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
kwargs={
'job_id': 'subscribe_refresh'
})
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL \
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
}
)
# 关注订阅分享每1小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_refresh'
'job_id': 'subscribe_follow'
}
)
# 关注订阅分享每1小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_follow'
}
)
# 下载器文件转移每5分钟
self._scheduler.add_job(
self.start,
"interval",
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'transfer'
}
)
# 后台刷新TMDB壁纸
self._scheduler.add_job(
self.start,
"interval",
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'random_wallpager'
}
)
# 公共定时服务
self._scheduler.add_job(
self.start,
"interval",
id="scheduler_job",
name="公共定时服务",
minutes=10,
kwargs={
'job_id': 'scheduler_job'
}
)
# 缓存清理服务每隔24小时
self._scheduler.add_job(
self.start,
"interval",
id="clear_cache",
name="缓存清理",
hours=settings.CACHE_CONF["meta"] / 3600,
kwargs={
'job_id': 'clear_cache'
}
)
# 定时检查用户认证每隔10分钟
self._scheduler.add_job(
self.start,
"interval",
id="user_auth",
name="用户认证检查",
minutes=10,
kwargs={
'job_id': 'user_auth'
}
)
# 站点数据刷新
if settings.SITEDATA_REFRESH_INTERVAL:
# 下载器文件转移每5分钟
self._scheduler.add_job(
self.start,
"interval",
id="sitedata_refresh",
name="站点数据刷新",
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'sitedata_refresh'
'job_id': 'transfer'
}
)
# 推荐缓存
self._scheduler.add_job(
self.start,
"interval",
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 后台刷新TMDB壁纸
self._scheduler.add_job(
self.start,
"interval",
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'random_wallpager'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 公共定时服务
self._scheduler.add_job(
self.start,
"interval",
id="scheduler_job",
name="公共定时服务",
minutes=10,
kwargs={
'job_id': 'scheduler_job'
}
)
# 打印服务
logger.debug(self._scheduler.print_jobs())
# 缓存清理服务每隔24小时
self._scheduler.add_job(
self.start,
"interval",
id="clear_cache",
name="缓存清理",
hours=settings.CACHE_CONF["meta"] / 3600,
kwargs={
'job_id': 'clear_cache'
}
)
# 启动定时服务
self._scheduler.start()
# 定时检查用户认证每隔10分钟
self._scheduler.add_job(
self.start,
"interval",
id="user_auth",
name="用户认证检查",
minutes=10,
kwargs={
'job_id': 'user_auth'
}
)
# 站点数据刷新
if settings.SITEDATA_REFRESH_INTERVAL:
self._scheduler.add_job(
self.start,
"interval",
id="sitedata_refresh",
name="站点数据刷新",
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
kwargs={
'job_id': 'sitedata_refresh'
}
)
# 推荐缓存
self._scheduler.add_job(
self.start,
"interval",
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 打印服务
logger.debug(self._scheduler.print_jobs())
# 启动定时服务
self._scheduler.start()
def start(self, job_id: str, *args, **kwargs):
"""
@@ -496,7 +501,6 @@ class Scheduler(metaclass=Singleton):
"""
if not self._scheduler:
return
# 移除该工作流的全部服务
self.remove_workflow_job(workflow)
# 添加工作流服务
@@ -625,17 +629,18 @@ class Scheduler(metaclass=Singleton):
"""
关闭定时服务
"""
try:
if self._scheduler:
logger.info("正在停止定时任务...")
self._event.set()
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
logger.info("定时任务停止完成")
except Exception as e:
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
with lock:
try:
if self._scheduler:
logger.info("正在停止定时任务...")
self._event.set()
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
logger.info("定时任务停止完成")
except Exception as e:
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
@staticmethod
def clear_cache():

View File

@@ -1,4 +1,4 @@
from typing import Optional, Dict, List, Union
from typing import Optional, Dict, List, Union, Any
from pydantic import BaseModel, Field
@@ -235,9 +235,9 @@ class Context(BaseModel):
上下文
"""
# 元数据
meta_info: Optional[MetaInfo] = None
meta_info: Optional[Union[MetaInfo, Any]] = None
# 媒体信息
media_info: Optional[MediaInfo] = None
media_info: Optional[Union[MediaInfo, Any]] = None
# 种子信息
torrent_info: Optional[TorrentInfo] = None

View File

@@ -7,7 +7,7 @@ class DownloadTask(BaseModel):
"""
下载任务
"""
download_id: Optional[str] = Field(None, description="任务ID")
downloader: Optional[str] = Field(None, description="下载器")
path: Optional[str] = Field(None, description="下载路径")
completed: Optional[bool] = Field(False, description="是否完成")
download_id: Optional[str] = Field(default=None, description="任务ID")
downloader: Optional[str] = Field(default=None, description="下载器")
path: Optional[str] = Field(default=None, description="下载路径")
completed: Optional[bool] = Field(default=False, description="是否完成")

View File

@@ -11,7 +11,7 @@ class Event(BaseModel):
事件模型
"""
event_type: str = Field(..., description="事件类型")
event_data: Optional[dict] = Field({}, description="事件数据")
event_data: Optional[dict] = Field(default={}, description="事件数据")
priority: Optional[int] = Field(0, description="事件优先级")

View File

@@ -147,6 +147,8 @@ class SystemConfigKey(Enum):
UserSiteAuthParams = "UserSiteAuthParams"
# Follow订阅分享者
FollowSubscribers = "FollowSubscribers"
# 通知发送时间
NotificationSendTime = "NotificationSendTime"
# 处理进度Key字典

View File

@@ -13,18 +13,18 @@ class Workflow(BaseModel):
"""
工作流信息
"""
id: Optional[int] = Field(None, description="工作流ID")
name: Optional[str] = Field(None, description="工作流名称")
description: Optional[str] = Field(None, description="工作流描述")
timer: Optional[str] = Field(None, description="定时器")
state: Optional[str] = Field(None, description="状态")
current_action: Optional[str] = Field(None, description="已执行动作")
result: Optional[str] = Field(None, description="任务执行结果")
run_count: Optional[int] = Field(0, description="已执行次数")
actions: Optional[list] = Field([], description="任务列表")
flows: Optional[list] = Field([], description="任务流")
add_time: Optional[str] = Field(None, description="创建时间")
last_time: Optional[str] = Field(None, description="最后执行时间")
id: Optional[int] = Field(default=None, description="工作流ID")
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
state: Optional[str] = Field(default=None, description="状态")
current_action: Optional[str] = Field(default=None, description="已执行动作")
result: Optional[str] = Field(default=None, description="任务执行结果")
run_count: Optional[int] = Field(default=0, description="已执行次数")
actions: Optional[list] = Field(default=[], description="任务列表")
flows: Optional[list] = Field(default=[], description="任务流")
add_time: Optional[str] = Field(default=None, description="创建时间")
last_time: Optional[str] = Field(default=None, description="最后执行时间")
class Config:
orm_mode = True
@@ -34,51 +34,51 @@ class ActionParams(BaseModel):
"""
动作基础参数
"""
loop: Optional[bool] = Field(False, description="是否需要循环")
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
loop: Optional[bool] = Field(default=False, description="是否需要循环")
loop_interval: Optional[int] = Field(default=0, description="循环间隔 (秒)")
class Action(BaseModel):
"""
动作信息
"""
id: Optional[str] = Field(None, description="动作ID")
type: Optional[str] = Field(None, description="动作类型 (类名)")
name: Optional[str] = Field(None, description="动作名称")
description: Optional[str] = Field(None, description="动作描述")
position: Optional[dict] = Field({}, description="位置")
data: Optional[dict] = Field({}, description="参数")
id: Optional[str] = Field(default=None, description="动作ID")
type: Optional[str] = Field(default=None, description="动作类型 (类名)")
name: Optional[str] = Field(default=None, description="动作名称")
description: Optional[str] = Field(default=None, description="动作描述")
position: Optional[dict] = Field(default={}, description="位置")
data: Optional[dict] = Field(default={}, description="参数")
class ActionExecution(BaseModel):
"""
动作执行情况
"""
action: Optional[str] = Field(None, description="当前动作(名称)")
result: Optional[bool] = Field(None, description="执行结果")
message: Optional[str] = Field(None, description="执行消息")
action: Optional[str] = Field(default=None, description="当前动作(名称)")
result: Optional[bool] = Field(default=None, description="执行结果")
message: Optional[str] = Field(default=None, description="执行消息")
class ActionContext(BaseModel):
"""
动作基础上下文,各动作通用数据
"""
content: Optional[str] = Field(None, description="文本类内容")
torrents: Optional[List[Context]] = Field([], description="资源列表")
medias: Optional[List[MediaInfo]] = Field([], description="媒体列表")
fileitems: Optional[List[FileItem]] = Field([], description="文件列表")
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史")
progress: Optional[int] = Field(0, description="执行进度(%")
content: Optional[str] = Field(default=None, description="文本类内容")
torrents: Optional[List[Context]] = Field(default=[], description="资源列表")
medias: Optional[List[MediaInfo]] = Field(default=[], description="媒体列表")
fileitems: Optional[List[FileItem]] = Field(default=[], description="文件列表")
downloads: Optional[List[DownloadTask]] = Field(default=[], description="下载任务列表")
sites: Optional[List[Site]] = Field(default=[], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field(default=[], description="订阅列表")
execute_history: Optional[List[ActionExecution]] = Field(default=[], description="执行历史")
progress: Optional[int] = Field(default=0, description="执行进度(%")
class ActionFlow(BaseModel):
"""
工作流流程
"""
id: Optional[str] = Field(None, description="流程ID")
source: Optional[str] = Field(None, description="源动作")
target: Optional[str] = Field(None, description="目标动作")
animated: Optional[bool] = Field(True, description="是否动画流程")
id: Optional[str] = Field(default=None, description="流程ID")
source: Optional[str] = Field(default=None, description="源动作")
target: Optional[str] = Field(default=None, description="目标动作")
animated: Optional[bool] = Field(default=True, description="是否动画流程")

View File

@@ -3,7 +3,7 @@ import abc
class Singleton(abc.ABCMeta, type):
"""
类单例模式
类单例模式(按参数)
"""
_instances: dict = {}
@@ -19,3 +19,24 @@ class AbstractSingleton(abc.ABC, metaclass=Singleton):
"""
抽像类单例模式
"""
pass
class SingletonClass(abc.ABCMeta, type):
"""
类单例模式(按类)
"""
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonClass, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class AbstractSingletonClass(abc.ABC, metaclass=SingletonClass):
"""
抽像类单例模式(按类)
"""
pass

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.3.3'
FRONTEND_VERSION = 'v2.3.3'
APP_VERSION = 'v2.3.4'
FRONTEND_VERSION = 'v2.3.4'