Compare commits

...

50 Commits

Author SHA1 Message Date
jxxghp
17a21ed707 更新 version.py 2026-02-12 07:09:45 +08:00
jxxghp
f390647139 fix(site): 更新站点信息时同步更新domain域名 2026-02-12 06:59:13 +08:00
jxxghp
aacd91e196 Merge pull request #5487 from cddjr/bugfix/issue_5242 2026-02-11 16:02:54 +08:00
景大侠
258171c9c4 fix(telegram): 修复通知标题含特殊符号时异常显示**符号 2026-02-11 09:20:50 +08:00
jxxghp
812c5873aa Merge pull request #5486 from cddjr/feat/shared-sync-async-cache 2026-02-10 22:11:42 +08:00
景大侠
4c3d47f1f0 feat(cache): 同步/异步函数可共享缓存
- 缓存键支持自定义命名,使异步与同步函数可共享缓存结果
- 内存缓存改为类变量,实现多个cache装饰器共享同一缓存空间
- 重构AsyncMemoryBackend,减少重复代码
- 补齐部分模块的缓存清理功能
2026-02-10 18:46:49 +08:00
jxxghp
ba7b6ba869 Merge pull request #5485 from yubanmeiqin9048/patch-2 2026-02-10 17:41:51 +08:00
yubanmeiqin9048
d0471ae512 fix: 修复目标目录无视频文件时转移字幕和音频触发目录删除 2026-02-10 14:10:42 +08:00
jxxghp
636c4be9fb 更新 version.py 2026-02-07 08:13:43 +08:00
jxxghp
6bec765a9d Merge pull request #5474 from jxxghp/copilot/optimize-file-move-implementation 2026-02-06 22:20:11 +08:00
copilot-swe-agent[bot]
d61d16ccc4 Restore the optimization - accidentally reverted in previous commit
Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com>
2026-02-06 14:15:29 +00:00
copilot-swe-agent[bot]
f2a5715b24 Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com> 2026-02-06 14:11:15 +00:00
copilot-swe-agent[bot]
c064c3781f Optimize SystemUtils.move to avoid triggering directory monitoring
Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com>
2026-02-06 14:03:03 +00:00
copilot-swe-agent[bot]
bb4dffe2a4 Initial plan 2026-02-06 13:59:59 +00:00
jxxghp
37cf3eeef3 Merge pull request #5473 from cddjr/feat_transfer_files_filter 2026-02-06 21:04:52 +08:00
景大侠
40395b2999 feat: 在构造待整理文件列表时引入过滤逻辑以简化后续处理 2026-02-06 20:56:26 +08:00
景大侠
32afe6445f fix: 整理成功事件缺少历史记录ID 2026-02-06 20:33:13 +08:00
jxxghp
793a991913 Merge remote-tracking branch 'origin/v2' into v2 2026-02-05 14:16:55 +08:00
jxxghp
d278224ff1 fix:优化第三方插件存储类型的检测提示 2026-02-05 14:16:50 +08:00
jxxghp
9b4d0ce6a8 Merge pull request #5466 from DDSRem-Dev/dev 2026-02-05 06:56:25 +08:00
DDSRem
a1829fe590 feat: u115 global rate limiting strategy 2026-02-04 23:24:14 +08:00
jxxghp
2b2b39365c Merge pull request #5464 from ChanningHe/enhance/discord 2026-02-04 18:08:38 +08:00
ChanningHe
1147930f3f fix: [slack&discord&telegram] handle special characters in config names 2026-02-04 14:09:40 +09:00
ChanningHe
636f338ed7 enhance: [discord] add _user_chat_mapping to chat in channel 2026-02-04 13:42:33 +09:00
ChanningHe
72365d00b4 enhance: discord debug information 2026-02-04 12:54:17 +09:00
jxxghp
19d8086732 Merge pull request #5460 from cddjr/fix_download_hash_overridden 2026-02-03 21:23:04 +08:00
大虾
30488418e5 修复 整理时download_hash参数被覆盖
导致后续文件均识别成同一个媒体信息
2026-02-03 18:59:32 +08:00
jxxghp
2f0badd74a Merge pull request #5457 from cddjr/fix_5449 2026-02-02 23:45:07 +08:00
jxxghp
6045b0579b Merge pull request #5455 from cddjr/fix_transfer_result_incorrect 2026-02-02 23:44:32 +08:00
景大侠
498f1fec74 修复 整理视频可能导致误删字幕及音轨 2026-02-02 23:18:46 +08:00
景大侠
f6a541f2b9 修复 覆盖整理失败时误报成功 2026-02-02 21:50:35 +08:00
jxxghp
8ce78eabca 更新 version.py 2026-02-02 18:44:30 +08:00
jxxghp
2c34c5309f Merge pull request #5454 from CHANTXU64/v2 2026-02-02 18:02:45 +08:00
jxxghp
77e680168a Merge pull request #5452 from 0honus0/v2 2026-02-02 17:22:00 +08:00
jxxghp
8a7e59742f Merge pull request #5451 from cddjr/fix_specials_season 2026-02-02 17:21:29 +08:00
jxxghp
42bac14770 Merge pull request #5450 from CHANTXU64/v2 2026-02-02 17:20:40 +08:00
CHANTXU64
8323834483 feat: 优化RSS订阅和网页抓取中发布日期(PubDate)的获取兼容性
- app/helper/rss.py: 优化RSS解析,支持带命名空间的日期标签(如 pubDate/published/updated)。
- app/modules/indexer/spider/__init__.py: 优化网页抓取,增加日期格式校验并对非标准格式进行自动归一化。
2026-02-02 16:52:04 +08:00
景大侠
1751caef62 fix: 补充几处season的判空 2026-02-02 15:01:12 +08:00
0honus0
d622d1474d 根据意见增加尾部逗号 2026-02-02 07:00:57 +00:00
0honus0
f28be2e7de 增加登录按钮xpath支持nicept网站 2026-02-02 06:52:48 +00:00
jxxghp
17773913ae fix: 统一了数据库查询中 season 参数的非空判断逻辑,以正确处理 season=0 的情况。 2026-02-02 14:23:51 +08:00
jxxghp
d469c2d3f9 refactor: 统一将布尔判断 if var:if not var: 更改为显式的 if var is not None:if var is None: 以正确处理 None 值。 2026-02-02 13:49:32 +08:00
CHANTXU64
4e74d32882 Fix: TMDB 剧集详情页不显示第 0 季(特别篇) #5444 2026-02-02 10:28:22 +08:00
jxxghp
7b8cd37a9b feat(transfer): enhance job removal methods for thread safety and strict checks 2026-02-01 16:58:32 +08:00
jxxghp
eda306d726 Merge pull request #5448 from cddjr/feat_japanese_subtitles 2026-02-01 16:25:56 +08:00
景大侠
94f3b1fe84 feat: 支持整理日语字幕 2026-02-01 16:04:22 +08:00
jxxghp
c50e3ba293 Merge pull request #5445 from jxxghp/copilot/analyze-task-loss-reason 2026-02-01 08:42:17 +08:00
copilot-swe-agent[bot]
eff7818912 Improve documentation and fix validation bug in add_task
Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com>
2026-01-31 16:44:01 +00:00
copilot-swe-agent[bot]
270bcff8f3 Fix task loss issue in do_transfer multi-threading batch adding
Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com>
2026-01-31 16:38:55 +00:00
copilot-swe-agent[bot]
e04963c2dc Initial plan 2026-01-31 16:33:59 +00:00
48 changed files with 813 additions and 424 deletions

View File

@@ -63,7 +63,7 @@ class SearchMediaTool(MoviePilotTool):
if media_type:
if result.type != MediaType(media_type):
continue
if season and result.season != season:
if season is not None and result.season != season:
continue
filtered_results.append(result)

View File

@@ -80,7 +80,7 @@ class SearchTorrentsTool(MoviePilotTool):
if media_type and torrent.media_info:
if torrent.media_info.type != MediaType(media_type):
continue
if season and torrent.meta_info and torrent.meta_info.begin_season != season:
if season is not None and torrent.meta_info and torrent.meta_info.begin_season != season:
continue
# 使用正则表达式过滤标题(分辨率、质量等关键字)
if regex_pattern and torrent.torrent_info and torrent.torrent_info.title:

View File

@@ -195,7 +195,7 @@ async def seasons(mediaid: Optional[str] = None,
tmdbid = int(mediaid[5:])
seasons_info = await TmdbChain().async_tmdb_seasons(tmdbid=tmdbid)
if seasons_info:
if season:
if season is not None:
return [sea for sea in seasons_info if sea.season_number == season]
return seasons_info
if title:
@@ -207,11 +207,11 @@ async def seasons(mediaid: Optional[str] = None,
if settings.RECOGNIZE_SOURCE == "themoviedb":
seasons_info = await TmdbChain().async_tmdb_seasons(tmdbid=mediainfo.tmdb_id)
if seasons_info:
if season:
if season is not None:
return [sea for sea in seasons_info if sea.season_number == season]
return seasons_info
else:
sea = season or 1
sea = season if season is not None else 1
return [schemas.MediaSeason(
season_number=sea,
poster_path=mediainfo.poster_path,

View File

@@ -54,7 +54,7 @@ async def exists_local(title: Optional[str] = None,
判断本地是否存在
"""
meta = MetaInfo(title)
if not season:
if season is None:
season = meta.begin_season
# 返回对象
ret_info = {}
@@ -83,7 +83,7 @@ def exists(media_in: schemas.MediaInfo,
existsinfo: schemas.ExistMediaInfo = MediaServerChain().media_exists(mediainfo=mediainfo)
if not existsinfo:
return {}
if media_in.season:
if media_in.season is not None:
return {
media_in.season: existsinfo.seasons.get(media_in.season) or []
}
@@ -101,7 +101,7 @@ def not_exists(media_in: schemas.MediaInfo,
mtype = MediaType(media_in.type) if media_in.type else None
if mtype:
meta.type = mtype
if media_in.season:
if media_in.season is not None:
meta.begin_season = media_in.season
meta.type = MediaType.TV
if media_in.year:

View File

@@ -92,10 +92,14 @@ async def update_site(
# 校正地址格式
_scheme, _netloc = StringUtils.get_url_netloc(site_in.url)
site_in.url = f"{_scheme}://{_netloc}/"
site_in.domain = StringUtils.get_url_domain(site_in.url)
await site.async_update(db, site_in.model_dump())
# 通知站点更新
await eventmanager.async_send_event(EventType.SiteUpdated, {
"domain": site_in.domain
"site_id": site_in.id,
"domain": site_in.domain,
"name": site_in.name,
"site_url": site_in.url
})
return schemas.Response(success=True)

View File

@@ -199,7 +199,7 @@ async def subscribe_mediaid(
# 使用名称检查订阅
if title_check and title:
meta = MetaInfo(title)
if season:
if season is not None:
meta.begin_season = season
result = await Subscribe.async_get_by_title(db, title=meta.name, season=meta.begin_season)

View File

@@ -150,7 +150,7 @@ class MediaChain(ChainBase):
org_meta.year = year
org_meta.begin_season = season_number
org_meta.begin_episode = episode_number
if org_meta.begin_season or org_meta.begin_episode:
if org_meta.begin_season is not None or org_meta.begin_episode is not None:
org_meta.type = MediaType.TV
# 重新识别
return self.recognize_media(meta=org_meta)
@@ -958,10 +958,10 @@ class MediaChain(ChainBase):
year = None
if tmdbinfo.get('release_date'):
year = tmdbinfo['release_date'][:4]
elif tmdbinfo.get('seasons') and season:
elif tmdbinfo.get('seasons') and season is not None:
for seainfo in tmdbinfo['seasons']:
season_number = seainfo.get("season_number")
if not season_number:
if season_number is None:
continue
air_date = seainfo.get("air_date")
if air_date and season_number == season:

View File

@@ -49,7 +49,7 @@ class SearchChain(ChainBase):
logger.error(f'{tmdbid} 媒体信息识别失败!')
return []
no_exists = None
if season:
if season is not None:
no_exists = {
tmdbid or doubanid: {
season: NotExistMediaInfo(episodes=[])
@@ -129,7 +129,7 @@ class SearchChain(ChainBase):
logger.error(f'{tmdbid} 媒体信息识别失败!')
return []
no_exists = None
if season:
if season is not None:
no_exists = {
tmdbid or doubanid: {
season: NotExistMediaInfo(episodes=[])
@@ -181,7 +181,7 @@ class SearchChain(ChainBase):
# 过滤剧集
season_episodes = {sea: info.episodes
for sea, info in no_exists[mediakey].items()}
elif mediainfo.season:
elif mediainfo.season is not None:
# 豆瓣只搜索当前季
season_episodes = {mediainfo.season: []}
else:

View File

@@ -156,7 +156,7 @@ class StorageChain(ChainBase):
"""
判断是否包含蓝光必备的文件夹
"""
required_files = ("BDMV", "CERTIFICATE")
required_files = {"BDMV", "CERTIFICATE"}
return any(
item.type == "dir" and item.name in required_files
for item in fileitems or []
@@ -166,7 +166,7 @@ class StorageChain(ChainBase):
"""
删除媒体文件,以及不含媒体文件的目录
"""
media_exts = settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT
media_exts = settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
fileitem_path = Path(fileitem.path) if fileitem.path else Path("")
if len(fileitem_path.parts) <= 2:
logger.warn(f"{fileitem.storage}{fileitem.path} 根目录或一级目录不允许删除")

View File

@@ -144,7 +144,7 @@ class SubscribeChain(ChainBase):
metainfo.year = year
if mtype:
metainfo.type = mtype
if season:
if season is not None:
metainfo.type = MediaType.TV
metainfo.begin_season = season
# 识别媒体信息
@@ -174,7 +174,7 @@ class SubscribeChain(ChainBase):
# 豆瓣标题处理
meta = MetaInfo(mediainfo.title)
mediainfo.title = meta.name
if not season:
if season is None:
season = meta.begin_season
# 使用名称识别兜底
@@ -188,7 +188,7 @@ class SubscribeChain(ChainBase):
# 总集数
if mediainfo.type == MediaType.TV:
if not season:
if season is None:
season = 1
# 总集数
if not kwargs.get('total_episode'):
@@ -321,7 +321,7 @@ class SubscribeChain(ChainBase):
metainfo.year = year
if mtype:
metainfo.type = mtype
if season:
if season is not None:
metainfo.type = MediaType.TV
metainfo.begin_season = season
# 识别媒体信息
@@ -351,7 +351,7 @@ class SubscribeChain(ChainBase):
# 豆瓣标题处理
meta = MetaInfo(mediainfo.title)
mediainfo.title = meta.name
if not season:
if season is None:
season = meta.begin_season
# 使用名称识别兜底
@@ -365,7 +365,7 @@ class SubscribeChain(ChainBase):
# 总集数
if mediainfo.type == MediaType.TV:
if not season:
if season is None:
season = 1
# 总集数
if not kwargs.get('total_episode'):
@@ -530,7 +530,7 @@ class SubscribeChain(ChainBase):
# 生成元数据
meta = MetaInfo(subscribe.name)
meta.year = subscribe.year
meta.begin_season = subscribe.season or None
meta.begin_season = subscribe.season if subscribe.season is not None else None
try:
meta.type = MediaType(subscribe.type)
except ValueError:

View File

@@ -29,6 +29,7 @@ from app.log import logger
from app.schemas import StorageOperSelectionEventData
from app.schemas import TransferInfo, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.exception import OperationInterrupted
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType
from app.utils.mixins import ConfigReloadMixin
@@ -111,12 +112,13 @@ class JobManager:
"""
return schemas.MetaInfo(**task.meta.to_dict())
def add_task(self, task: TransferTask, state: Optional[str] = "waiting"):
def add_task(self, task: TransferTask, state: Optional[str] = "waiting") -> bool:
"""
添加整理任务,自动分组到对应的作业中
:return: True表示任务已添加False表示任务无效或已存在重复
"""
if not any([task, task.meta, task.fileitem]):
return
if not all([task, task.meta, task.fileitem]):
return False
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
@@ -134,7 +136,8 @@ class JobManager:
else:
# 不重复添加任务
if any([t.fileitem == task.fileitem for t in self._job_view[__mediaid__].tasks]):
return
logger.debug(f"任务 {task.fileitem.name} 已存在,跳过重复添加")
return False
self._job_view[__mediaid__].tasks.append(
TransferJobTask(
fileitem=task.fileitem,
@@ -150,6 +153,7 @@ class JobManager:
self._season_episodes[__mediaid__] = list(set(self._season_episodes[__mediaid__]))
else:
self._season_episodes[__mediaid__] = task.meta.episode_list
return True
def running_task(self, task: TransferTask):
"""
@@ -221,7 +225,7 @@ class JobManager:
def remove_job(self, task: TransferTask) -> Optional[TransferJob]:
"""
移除任务对应的作业
移除任务对应的作业(强制,线程不安全)
"""
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -232,68 +236,99 @@ class JobManager:
return self._job_view.pop(__mediaid__)
return None
def try_remove_job(self, task: TransferTask):
"""
尝试移除任务对应的作业(严格检查未完成作业,线程安全)
"""
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
meta_done = True
if __metaid__ in self._job_view:
meta_done = all(
t.state in ["completed", "failed"] for t in self._job_view[__metaid__].tasks
)
media_done = True
if __mediaid__ in self._job_view:
media_done = all(
t.state in ["completed", "failed"] for t in self._job_view[__mediaid__].tasks
)
if meta_done and media_done:
__id__ = self.__get_id(task)
if __id__ in self._job_view:
# 移除季集信息
if __id__ in self._season_episodes:
self._season_episodes.pop(__id__)
self._job_view.pop(__id__)
def is_done(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否整理完成(不管成功还是失败)
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_done = True
if __mediaid__ in self._job_view:
media_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_done = True
return meta_done and media_done
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_done = True
if __mediaid__ in self._job_view:
media_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_done = True
return meta_done and media_done
def is_finished(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否已完成且有成功的记录
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_finished = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_finished = True
if __mediaid__ in self._job_view:
tasks = self._job_view[__mediaid__].tasks
media_finished = all(
task.state in ["completed", "failed"] for task in tasks
) and any(
task.state == "completed" for task in tasks
)
else:
media_finished = True
return meta_finished and media_finished
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_finished = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_finished = True
if __mediaid__ in self._job_view:
tasks = self._job_view[__mediaid__].tasks
media_finished = all(
task.state in ["completed", "failed"] for task in tasks
) and any(
task.state == "completed" for task in tasks
)
else:
media_finished = True
return meta_finished and media_finished
def is_success(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否全部成功
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_success = all(
task.state in ["completed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_success = True
if __mediaid__ in self._job_view:
media_success = all(
task.state in ["completed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_success = True
return meta_success and media_success
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_success = all(
task.state in ["completed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_success = True
if __mediaid__ in self._job_view:
media_success = all(
task.state in ["completed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_success = True
return meta_success and media_success
def get_all_torrent_hashes(self) -> set[str]:
"""
@@ -311,11 +346,13 @@ class JobManager:
检查指定种子的所有任务是否都已完成
"""
with job_lock:
for job in self._job_view.values():
for task in job.tasks:
if task.download_hash == download_hash:
if task.state not in ["completed", "failed"]:
return False
if any(
task.state not in {"completed", "failed"}
for job in self._job_view.values()
for task in job.tasks
if task.download_hash == download_hash
):
return False
return True
def is_torrent_success(self, download_hash: str) -> bool:
@@ -323,85 +360,95 @@ class JobManager:
检查指定种子的所有任务是否都已成功
"""
with job_lock:
for job in self._job_view.values():
for task in job.tasks:
if task.download_hash == download_hash:
if task.state not in ["completed"]:
return False
if any(
task.state != "completed"
for job in self._job_view.values()
for task in job.tasks
if task.download_hash == download_hash
):
return False
return True
def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool:
"""
判断作业是否还有任务正在处理
"""
if mediainfo:
__mediaid__ = self.__get_media_id(media=mediainfo, season=season)
if __mediaid__ in self._job_view:
return True
with job_lock:
if mediainfo:
__mediaid__ = self.__get_media_id(media=mediainfo, season=season)
if __mediaid__ in self._job_view:
return True
__metaid__ = self.__get_meta_id(meta=meta, season=season)
return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0
__metaid__ = self.__get_meta_id(meta=meta, season=season)
return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0
def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取作业中所有成功的任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取作业中全部任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return self._job_view[__mediaid__].tasks
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return self._job_view[__mediaid__].tasks
def count(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取作业中成功总数
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
def size(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取作业中所有成功文件总大小
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (
SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (
SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
def total(self) -> int:
"""
获取所有任务总数
"""
return sum([len(job.tasks) for job in self._job_view.values()])
with job_lock:
return sum([len(job.tasks) for job in self._job_view.values()])
def list_jobs(self) -> List[TransferJob]:
"""
获取所有作业的任务列表
"""
return list(self._job_view.values())
with job_lock:
return list(self._job_view.values())
def season_episodes(self, media: MediaInfo, season: Optional[int] = None) -> List[int]:
"""
获取作业的季集清单
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
return self._season_episodes.get(__mediaid__) or []
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
return self._season_episodes.get(__mediaid__) or []
class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
@@ -724,26 +771,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return ret_status, ret_message
def put_to_queue(self, task: TransferTask):
def put_to_queue(self, task: TransferTask) -> bool:
"""
添加到待整理队列
:param task: 任务信息
:return: True表示任务已添加到队列False表示任务无效或已存在重复
"""
if not task:
return
# 维护整理任务视图
self.__put_to_jobview(task)
return False
# 维护整理任务视图,如果任务已存在则不添加到队列
if not self.__put_to_jobview(task):
return False
# 添加到队列
self._queue.put(TransferQueue(
task=task,
callback=self.__default_callback
))
return True
def __put_to_jobview(self, task: TransferTask):
def __put_to_jobview(self, task: TransferTask) -> bool:
"""
添加到作业视图
:return: True表示任务已添加False表示任务无效或已存在重复
"""
self.jobview.add_task(task)
return self.jobview.add_task(task)
def remove_from_queue(self, fileitem: FileItem):
"""
@@ -901,7 +952,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history and mediainfo.title != transfer_history.title:
mediainfo.title = transfer_history.title
@@ -999,8 +1050,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
finally:
# 移除已完成的任务
if self.jobview.is_done(task):
self.jobview.remove_job(task)
self.jobview.try_remove_job(task)
def get_queue_tasks(self) -> List[TransferJob]:
"""
@@ -1124,14 +1174,29 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return True
def __get_trans_fileitems(
self, fileitem: FileItem, check: bool = True
self,
fileitem: FileItem,
predicate: Optional[Callable[[FileItem, bool], bool]],
verify_file_exists: bool = True,
) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
获取整理文件列表
:param fileitem: 文件项
:param check: 检查文件是否存在默认为True
:param fileitem: 文件项
:param predicate: 用于筛选目录或文件项
该函数接收两个参数:
- `file_item`: 需要判断的文件项(类型为 `FileItem`
- `is_bluray_dir`: 表示该项是否为蓝光原盘目录(布尔值)
函数应返回 `True` 表示保留该项,`False` 表示过滤掉
若 `predicate` 为 `None`,则默认保留所有项
:param verify_file_exists: 验证目录或文件是否存在,默认值为 `True`
"""
if global_vars.is_system_stopped:
raise OperationInterrupted()
storagechain = StorageChain()
def __is_bluray_sub(_path: str) -> bool:
@@ -1149,7 +1214,12 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return storagechain.get_file_item(storage=_storage, path=p.parent)
return None
if check:
def _apply_predicate(file_item: FileItem, is_bluray_dir: bool) -> List[Tuple[FileItem, bool]]:
if predicate is None or predicate(file_item, is_bluray_dir):
return [(file_item, is_bluray_dir)]
return []
if verify_file_exists:
latest_fileitem = storagechain.get_item(fileitem)
if not latest_fileitem:
logger.warn(f"目录或文件不存在:{fileitem.path}")
@@ -1159,28 +1229,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 是否蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
if dir_item := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
if bluray_dir := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
# 返回该文件所在的原盘根目录
return [(dir_item, True)]
return _apply_predicate(bluray_dir, True)
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
return _apply_predicate(fileitem, False)
# 是否蓝光原盘根目录
sub_items = storagechain.list_files(fileitem, recursion=False) or []
if storagechain.contains_bluray_subdirectories(sub_items):
# 当前目录是原盘根目录,不需要递归
return [(fileitem, True)]
return _apply_predicate(fileitem, True)
# 不是原盘根目录 递归获取目录内需要整理的文件项列表
return [
item
for sub_item in sub_items
for item in (
self.__get_trans_fileitems(sub_item, check=False)
self.__get_trans_fileitems(
sub_item, predicate, verify_file_exists=False
)
if sub_item.type == "dir"
else [(sub_item, False)]
else _apply_predicate(sub_item, False)
)
]
@@ -1230,22 +1302,47 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
# 汇总错误信息
err_msgs: List[str] = []
# 递归获取待整理的文件/目录列表
file_items = self.__get_trans_fileitems(fileitem)
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
def _filter(file_item: FileItem, is_bluray_dir: bool) -> bool:
"""
过滤文件
# 有集自定义格式,过滤文件
if formaterHandler:
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
:return: True 表示保留False 表示排除
"""
if continue_callback and not continue_callback():
raise OperationInterrupted()
# 有集自定义格式,过滤文件
if formaterHandler and not formaterHandler.match(file_item.name):
return False
# 过滤后缀和大小(蓝光目录、附加文件不过滤)
if (
not is_bluray_dir
and not self.__is_subtitle_file(file_item)
and not self.__is_audio_file(file_item)
):
if not self.__is_media_file(file_item):
return False
if not self.__is_allow_filesize(file_item, min_filesize):
return False
# 回收站及隐藏的文件不处理
if (
file_item.path.find("/@Recycle/") != -1
or file_item.path.find("/#recycle/") != -1
or file_item.path.find("/.") != -1
or file_item.path.find("/@eaDir") != -1
):
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
return False
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
return False
return True
# 过滤后缀和大小(蓝光目录、附加文件不过滤大小)
file_items = [f for f in file_items if f[1] or
self.__is_subtitle_file(f[0]) or
self.__is_audio_file(f[0]) or
(self.__is_media_file(f[0]) and self.__is_allow_filesize(f[0], min_filesize))]
try:
# 获取经过筛选后的待整理文件项列表
file_items = self.__get_trans_fileitems(fileitem, predicate=_filter)
except OperationInterrupted:
return False, f"{fileitem.name} 已取消"
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
@@ -1258,21 +1355,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
try:
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
raise OperationInterrupted()
if continue_callback and not continue_callback():
break
raise OperationInterrupted()
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
or file_item.path.find('/#recycle/') != -1 \
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
continue
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
continue
# 整理成功的不再处理
if not force:
@@ -1335,8 +1421,11 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 获取下载Hash
if download_history and (not downloader or not download_hash):
downloader = download_history.downloader
download_hash = download_history.download_hash
_downloader = download_history.downloader
_download_hash = download_history.download_hash
else:
_downloader = downloader
_download_hash = download_hash
# 后台整理
transfer_task = TransferTask(
@@ -1350,19 +1439,25 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash,
downloader=_downloader,
download_hash=_download_hash,
download_history=download_history,
manual=manual,
background=background
)
if background:
self.put_to_queue(task=transfer_task)
logger.info(f"{file_path.name} 已添加到整理队列")
if self.put_to_queue(task=transfer_task):
logger.info(f"{file_path.name} 已添加到整理队列")
else:
logger.debug(f"{file_path.name} 已在整理队列中,跳过")
else:
# 加入列表
self.__put_to_jobview(transfer_task)
transfer_tasks.append(transfer_task)
if self.__put_to_jobview(transfer_task):
transfer_tasks.append(transfer_task)
else:
logger.debug(f"{file_path.name} 已在整理列表中,跳过")
except OperationInterrupted:
return False, f"{fileitem.name} 已取消"
finally:
file_items.clear()
del file_items

View File

@@ -27,8 +27,6 @@ DEFAULT_CACHE_SIZE = 1024
# 默认缓存有效期
DEFAULT_CACHE_TTL = 365 * 24 * 60 * 60
lock = threading.Lock()
# 上下文变量来控制缓存行为
_fresh = contextvars.ContextVar('fresh', default=False)
@@ -297,14 +295,14 @@ class AsyncCacheBackend(CacheBackend):
"""
获取所有缓存键,类似 dict.keys()(异步)
"""
async for key, _ in await self.items(region=region):
async for key, _ in self.items(region=region):
yield key
async def values(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Any, None]:
"""
获取所有缓存值,类似 dict.values()(异步)
"""
async for _, value in await self.items(region=region):
async for _, value in self.items(region=region):
yield value
async def update(self, other: Dict[str, Any], region: Optional[str] = DEFAULT_CACHE_REGION,
@@ -332,7 +330,7 @@ class AsyncCacheBackend(CacheBackend):
弹出最后一个缓存项,类似 dict.popitem()(异步)
"""
items = []
async for item in await self.items(region=region):
async for item in self.items(region=region):
items.append(item)
if not items:
raise KeyError("popitem(): cache is empty")
@@ -364,6 +362,11 @@ class MemoryBackend(CacheBackend):
基于 `cachetools.TTLCache` 实现的缓存后端
"""
# 类变量 _region_caches 的互斥锁
_lock = threading.Lock()
# 存储各个 region 的缓存实例region -> TTLCache
_region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __init__(self, cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
@@ -376,8 +379,6 @@ class MemoryBackend(CacheBackend):
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
@@ -400,7 +401,7 @@ class MemoryBackend(CacheBackend):
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 设置缓存值
with lock:
with self._lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
@@ -445,7 +446,7 @@ class MemoryBackend(CacheBackend):
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
with self._lock:
del region_cache[key]
def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
@@ -458,13 +459,13 @@ class MemoryBackend(CacheBackend):
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
with self._lock:
region_cache.clear()
logger.debug(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
with self._lock:
region_cache.clear()
logger.info("Cleared all cache")
@@ -480,7 +481,7 @@ class MemoryBackend(CacheBackend):
yield from ()
return
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
with self._lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
@@ -507,18 +508,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
region = self.get_region(region)
return self._region_caches.get(region)
self._backend = MemoryBackend(cache_type=cache_type, maxsize=maxsize, ttl=ttl)
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
@@ -530,18 +520,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param ttl: 缓存的存活时间,不传入为永久缓存,单位秒
:param region: 缓存的区
"""
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 设置缓存值
with lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
MemoryTTLCache(maxsize=maxsize, ttl=ttl) if self.cache_type == 'ttl'
else MemoryLRUCache(maxsize=maxsize)
)
region_cache[key] = value
return self._backend.set(key=key, value=value, ttl=ttl, region=region, **kwargs)
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
@@ -551,10 +530,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return False
return key in region_cache
return self._backend.exists(key=key, region=region)
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
@@ -564,10 +540,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return region_cache.get(key)
return self._backend.get(key=key, region=region)
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION):
"""
@@ -576,11 +549,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param key: 缓存的键
:param region: 缓存的区
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
del region_cache[key]
return self._backend.delete(key=key, region=region)
async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
@@ -588,19 +557,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区为None时清空所有区缓存
"""
if region:
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
region_cache.clear()
logger.debug(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
region_cache.clear()
logger.info("All cache cleared")
return self._backend.clear(region=region)
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]:
"""
@@ -609,14 +566,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
for item in self._backend.items(region):
yield item
async def close(self) -> None:
@@ -1115,15 +1065,16 @@ def AsyncCache(cache_type: Literal['ttl', 'lru'] = 'ttl',
def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None,
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False):
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False, shared_key: Optional[str] = None):
"""
自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl
:param region: 缓存
:param maxsize: 缓存的最大条目数
:param region: 缓存区域的标识符,默认根据模块名、函数名等自动生成标识
:param maxsize: 缓存区内的最大条目数
:param ttl: 缓存的存活时间,单位秒,未传入则为永久缓存,单位秒
:param skip_none: 跳过 None 缓存,默认为 True
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:param shared_key: 同步/异步函数共享缓存的键,默认使用函数名(异步函数名会标准化为同步格式,如移除 `async_` 前缀)
:return: 装饰器函数
"""
@@ -1173,6 +1124,17 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
return False
return True
def __standardize_func_name() -> str:
"""
将异步函数名标准化为同步函数的命名,以生成统一的缓存键
"""
# XXX 假设异步函数名与同步版本仅差`async_`前缀或`_async`后缀当前MP代码大多符合否则需通过`shared_key`参数显式指定
return (
func.__name__.removeprefix("async_").removesuffix("_async")
if is_async
else func.__name__
)
def __get_cache_key(args, kwargs) -> str:
"""
根据函数和参数生成缓存键
@@ -1194,13 +1156,22 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
# 获取缓存区
cache_region = region if region is not None else f"{func.__module__}.{func.__name__}"
return f"{func_name}_{hashkey(*keys)}"
# 被装饰函数的上层名称(如类名或外层函数名)
enclosing_name = (
func.__qualname__[:last_dot]
if (last_dot := func.__qualname__.rfind(".")) != -1
else ""
)
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 生成标准化后的函数名称,用于同步/异步函数共享缓存
func_name = shared_key if shared_key else __standardize_func_name()
# 获取缓存区
cache_region = (
region if region is not None else f"{func.__module__}:{enclosing_name}:{func_name}"
)
if is_async:
# 异步函数使用异步缓存后端

View File

@@ -465,7 +465,7 @@ class MediaInfo:
for seainfo in info.get('seasons'):
# 季
season = seainfo.get("season_number")
if not season:
if season is None:
continue
# 集
episode_count = seainfo.get("episode_count")
@@ -545,9 +545,9 @@ class MediaInfo:
# 识别标题中的季
meta = MetaInfo(info.get("title"))
# 季
if not self.season:
if self.season is None:
self.season = meta.begin_season
if self.season:
if self.season is not None:
self.type = MediaType.TV
elif not self.type:
self.type = MediaType.MOVIE
@@ -607,13 +607,13 @@ class MediaInfo:
# 剧集
if self.type == MediaType.TV and not self.seasons:
meta = MetaInfo(info.get("title"))
season = meta.begin_season or 1
season = meta.begin_season if meta.begin_season is not None else 1
episodes_count = info.get("episodes_count")
if episodes_count:
self.seasons[season] = list(range(1, episodes_count + 1))
# 季年份
if self.type == MediaType.TV and not self.season_years:
season = self.season or 1
season = self.season if self.season is not None else 1
self.season_years = {
season: self.year
}
@@ -667,7 +667,7 @@ class MediaInfo:
# 识别标题中的季
meta = MetaInfo(self.title)
# 季
if not self.season:
if self.season is None:
self.season = meta.begin_season
# 评分
if not self.vote_average:
@@ -703,7 +703,7 @@ class MediaInfo:
# 剧集
if self.type == MediaType.TV and not self.seasons:
meta = MetaInfo(self.title)
season = meta.begin_season or 1
season = meta.begin_season if meta.begin_season is not None else 1
episodes_count = info.get("total_episodes")
if episodes_count:
self.seasons[season] = list(range(1, episodes_count + 1))

View File

@@ -49,7 +49,7 @@ class MediaServerOper(DbOper):
if not item:
return None
if kwargs.get("season"):
if kwargs.get("season") is not None:
# 判断季是否存在
if not item.seasoninfo:
return None
@@ -75,7 +75,7 @@ class MediaServerOper(DbOper):
if not item:
return None
if kwargs.get("season"):
if kwargs.get("season") is not None:
# 判断季是否存在
if not item.seasoninfo:
return None

View File

@@ -104,14 +104,14 @@ class DownloadHistory(Base):
# TMDBID + 类型
if tmdbid and mtype:
# 电视剧某季某集
if season and episode:
if season is not None and episode:
return db.query(DownloadHistory).filter(DownloadHistory.tmdbid == tmdbid,
DownloadHistory.type == mtype,
DownloadHistory.seasons == season,
DownloadHistory.episodes == episode).order_by(
DownloadHistory.id.desc()).all()
# 电视剧某季
elif season:
elif season is not None:
return db.query(DownloadHistory).filter(DownloadHistory.tmdbid == tmdbid,
DownloadHistory.type == mtype,
DownloadHistory.seasons == season).order_by(
@@ -124,14 +124,14 @@ class DownloadHistory(Base):
# 标题 + 年份
elif title and year:
# 电视剧某季某集
if season and episode:
if season is not None and episode:
return db.query(DownloadHistory).filter(DownloadHistory.title == title,
DownloadHistory.year == year,
DownloadHistory.seasons == season,
DownloadHistory.episodes == episode).order_by(
DownloadHistory.id.desc()).all()
# 电视剧某季
elif season:
elif season is not None:
return db.query(DownloadHistory).filter(DownloadHistory.title == title,
DownloadHistory.year == year,
DownloadHistory.seasons == season).order_by(

View File

@@ -93,7 +93,7 @@ class Subscribe(Base):
def exists(cls, db: Session, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
season: Optional[int] = None):
if tmdbid:
if season:
if season is not None:
return db.query(cls).filter(cls.tmdbid == tmdbid,
cls.season == season).first()
return db.query(cls).filter(cls.tmdbid == tmdbid).first()
@@ -106,7 +106,7 @@ class Subscribe(Base):
async def async_exists(cls, db: AsyncSession, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
season: Optional[int] = None):
if tmdbid:
if season:
if season is not None:
result = await db.execute(
select(cls).filter(cls.tmdbid == tmdbid, cls.season == season)
)
@@ -148,7 +148,7 @@ class Subscribe(Base):
@classmethod
@db_query
def get_by_title(cls, db: Session, title: str, season: Optional[int] = None):
if season:
if season is not None:
return db.query(cls).filter(cls.name == title,
cls.season == season).first()
return db.query(cls).filter(cls.name == title).first()
@@ -156,7 +156,7 @@ class Subscribe(Base):
@classmethod
@async_db_query
async def async_get_by_title(cls, db: AsyncSession, title: str, season: Optional[int] = None):
if season:
if season is not None:
result = await db.execute(
select(cls).filter(cls.name == title, cls.season == season)
)
@@ -169,7 +169,7 @@ class Subscribe(Base):
@classmethod
@db_query
def get_by_tmdbid(cls, db: Session, tmdbid: int, season: Optional[int] = None):
if season:
if season is not None:
return db.query(cls).filter(cls.tmdbid == tmdbid,
cls.season == season).all()
else:
@@ -178,7 +178,7 @@ class Subscribe(Base):
@classmethod
@async_db_query
async def async_get_by_tmdbid(cls, db: AsyncSession, tmdbid: int, season: Optional[int] = None):
if season:
if season is not None:
result = await db.execute(
select(cls).filter(cls.tmdbid == tmdbid, cls.season == season)
)

View File

@@ -99,7 +99,7 @@ class SubscribeHistory(Base):
def exists(cls, db: Session, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
season: Optional[int] = None):
if tmdbid:
if season:
if season is not None:
return db.query(cls).filter(cls.tmdbid == tmdbid,
cls.season == season).first()
return db.query(cls).filter(cls.tmdbid == tmdbid).first()
@@ -112,7 +112,7 @@ class SubscribeHistory(Base):
async def async_exists(cls, db: AsyncSession, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
season: Optional[int] = None):
if tmdbid:
if season:
if season is not None:
result = await db.execute(
select(cls).filter(cls.tmdbid == tmdbid, cls.season == season)
)

View File

@@ -266,14 +266,14 @@ class TransferHistory(Base):
# TMDBID + 类型
if tmdbid and mtype:
# 电视剧某季某集
if season and episode:
if season is not None and episode:
return db.query(cls).filter(cls.tmdbid == tmdbid,
cls.type == mtype,
cls.seasons == season,
cls.episodes == episode,
cls.dest == dest).all()
# 电视剧某季
elif season:
elif season is not None:
return db.query(cls).filter(cls.tmdbid == tmdbid,
cls.type == mtype,
cls.seasons == season).all()
@@ -290,14 +290,14 @@ class TransferHistory(Base):
# 标题 + 年份
elif title and year:
# 电视剧某季某集
if season and episode:
if season is not None and episode:
return db.query(cls).filter(cls.title == title,
cls.year == year,
cls.seasons == season,
cls.episodes == episode,
cls.dest == dest).all()
# 电视剧某季
elif season:
elif season is not None:
return db.query(cls).filter(cls.title == title,
cls.year == year,
cls.seasons == season).all()
@@ -312,7 +312,7 @@ class TransferHistory(Base):
return db.query(cls).filter(cls.title == title,
cls.year == year).all()
# 类型 + 转移路径emby webhook season无tmdbid场景
elif mtype and season and dest:
elif mtype and season is not None and dest:
# 电视剧某季
return db.query(cls).filter(cls.type == mtype,
cls.seasons == season,

View File

@@ -92,7 +92,7 @@ class SubscribeOper(DbOper):
判断是否存在
"""
if tmdbid:
if season:
if season is not None:
return True if Subscribe.exists(self._db, tmdbid=tmdbid, season=season) else False
else:
return True if Subscribe.exists(self._db, tmdbid=tmdbid) else False
@@ -195,7 +195,7 @@ class SubscribeOper(DbOper):
判断是否存在订阅历史
"""
if tmdbid:
if season:
if season is not None:
return True if SubscribeHistory.exists(self._db, tmdbid=tmdbid, season=season) else False
else:
return True if SubscribeHistory.exists(self._db, tmdbid=tmdbid) else False

View File

@@ -125,7 +125,7 @@ class TransferHistoryOper(DbOper):
"""
新增转移成功历史记录
"""
self.add_force(
return self.add_force(
src=fileitem.path,
src_storage=fileitem.storage,
src_fileitem=fileitem.model_dump(),

View File

@@ -19,41 +19,42 @@ class CookieHelper:
"username": [
'//input[@name="username"]',
'//input[@id="form_item_username"]',
'//input[@id="username"]'
'//input[@id="username"]',
],
"password": [
'//input[@name="password"]',
'//input[@id="form_item_password"]',
'//input[@id="password"]',
'//input[@type="password"]'
'//input[@type="password"]',
],
"captcha": [
'//input[@name="imagestring"]',
'//input[@name="captcha"]',
'//input[@id="form_item_captcha"]',
'//input[@placeholder="驗證碼"]'
'//input[@placeholder="驗證碼"]',
],
"captcha_img": [
'//img[@alt="captcha"]/@src',
'//img[@alt="CAPTCHA"]/@src',
'//img[@alt="SECURITY CODE"]/@src',
'//img[@id="LAY-user-get-vercode"]/@src',
'//img[contains(@src,"/api/getCaptcha")]/@src'
'//img[contains(@src,"/api/getCaptcha")]/@src',
],
"submit": [
'//input[@type="submit"]',
'//button[@type="submit"]',
'//button[@lay-filter="login"]',
'//button[@lay-filter="formLogin"]',
'//input[@type="button"][@value="登录"]'
'//input[@type="button"][@value="登录"]',
'//input[@id="submit-btn"]',
],
"error": [
"//table[@class='main']//td[@class='text']/text()"
"//table[@class='main']//td[@class='text']/text()",
],
"twostep": [
'//input[@name="two_step_code"]',
'//input[@name="2fa_secret"]',
'//input[@name="otp"]'
'//input[@name="otp"]',
]
}

View File

@@ -382,7 +382,10 @@ class RssHelper:
size = int(size_attr)
# 发布日期
pubdate_nodes = item.xpath('.//pubDate | .//published | .//updated')
pubdate_nodes = item.xpath('./pubDate | ./published | ./updated')
if not pubdate_nodes:
pubdate_nodes = item.xpath('.//*[local-name()="pubDate"] | .//*[local-name()="published"] | .//*[local-name()="updated"]')
pubdate = ""
if pubdate_nodes and pubdate_nodes[0].text:
pubdate = StringUtils.get_time(pubdate_nodes[0].text)

View File

@@ -290,3 +290,11 @@ class BangumiModule(_ModuleBase):
if infos:
return [MediaInfo(bangumi_info=info) for info in infos]
return []
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
self.bangumiapi.clear_cache()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -31,7 +31,7 @@ class BangumiApi(object):
self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session)
self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get")
def __invoke(self, url, key: Optional[str] = None, **kwargs):
req_url = self._base_url + url
params = {}
@@ -47,7 +47,7 @@ class BangumiApi(object):
print(e)
return None
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get")
async def __async_invoke(self, url, key: Optional[str] = None, **kwargs):
req_url = self._base_url + url
params = {}
@@ -300,6 +300,12 @@ class BangumiApi(object):
key="data",
_ts=datetime.strftime(datetime.now(), '%Y%m%d'), **kwargs)
def clear_cache(self):
"""
清除缓存
"""
self.__invoke.cache_clear()
def close(self):
if self._session:
self._session.close()

View File

@@ -139,9 +139,23 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
发送通知消息
:param message: 消息通知对象
"""
for conf in self.get_configs().values():
# DEBUG: Log entry and configs
configs = self.get_configs()
logger.debug(f"[Discord] post_message 被调用message.source={message.source}, "
f"message.userid={message.userid}, message.channel={message.channel}")
logger.debug(f"[Discord] 当前配置数量: {len(configs)}, 配置名称: {list(configs.keys())}")
logger.debug(f"[Discord] 当前实例数量: {len(self.get_instances())}, 实例名称: {list(self.get_instances().keys())}")
if not configs:
logger.warning("[Discord] get_configs() 返回空,没有可用的 Discord 配置")
return
for conf in configs.values():
logger.debug(f"[Discord] 检查配置: name={conf.name}, type={conf.type}, enabled={conf.enabled}")
if not self.check_message(message, conf.name):
logger.debug(f"[Discord] check_message 返回 False跳过配置: {conf.name}")
continue
logger.debug(f"[Discord] check_message 通过,准备发送到: {conf.name}")
targets = message.targets
userid = message.userid
if not userid and targets is not None:
@@ -150,13 +164,18 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
logger.warn("用户没有指定 Discord 用户ID消息无法发送")
return
client: Discord = self.get_instance(conf.name)
logger.debug(f"[Discord] get_instance('{conf.name}') 返回: {client is not None}")
if client:
client.send_msg(title=message.title, text=message.text,
logger.debug(f"[Discord] 调用 client.send_msg, userid={userid}, title={message.title[:50] if message.title else None}...")
result = client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id,
mtype=message.mtype)
logger.debug(f"[Discord] send_msg 返回结果: {result}")
else:
logger.warning(f"[Discord] 未找到配置 '{conf.name}' 对应的 Discord 客户端实例")
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""

View File

@@ -2,6 +2,7 @@ import asyncio
import re
import threading
from typing import Optional, List, Dict, Any, Tuple, Union
from urllib.parse import quote
import discord
from discord import app_commands
@@ -33,6 +34,9 @@ class Discord:
DISCORD_GUILD_ID: Optional[Union[str, int]] = None,
DISCORD_CHANNEL_ID: Optional[Union[str, int]] = None,
**kwargs):
logger.debug(f"[Discord] 初始化 Discord 实例: name={kwargs.get('name')}, "
f"GUILD_ID={DISCORD_GUILD_ID}, CHANNEL_ID={DISCORD_CHANNEL_ID}, "
f"TOKEN={'已配置' if DISCORD_BOT_TOKEN else '未配置'}")
if not DISCORD_BOT_TOKEN:
logger.error("Discord Bot Token 未配置!")
return
@@ -40,10 +44,14 @@ class Discord:
self._token = DISCORD_BOT_TOKEN
self._guild_id = self._to_int(DISCORD_GUILD_ID)
self._channel_id = self._to_int(DISCORD_CHANNEL_ID)
logger.debug(f"[Discord] 解析后的 ID: _guild_id={self._guild_id}, _channel_id={self._channel_id}")
base_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message/"
self._ds_url = f"{base_ds_url}?token={settings.API_TOKEN}"
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
# URL encode the source name to handle special characters in config names
encoded_name = quote(kwargs.get('name'), safe='')
self._ds_url = f"{self._ds_url}&source={encoded_name}"
logger.debug(f"[Discord] 消息回调 URL: {self._ds_url}")
intents = discord.Intents.default()
intents.message_content = True
@@ -59,6 +67,7 @@ class Discord:
self._thread: Optional[threading.Thread] = None
self._ready_event = threading.Event()
self._user_dm_cache: Dict[str, discord.DMChannel] = {}
self._user_chat_mapping: Dict[str, str] = {} # userid -> chat_id mapping for reply targeting
self._broadcast_channel = None
self._bot_user_id: Optional[int] = None
@@ -86,6 +95,9 @@ class Discord:
if not self._should_process_message(message):
return
# Update user-chat mapping for reply targeting
self._update_user_chat_mapping(str(message.author.id), str(message.channel.id))
cleaned_text = self._clean_bot_mention(message.content or "")
username = message.author.display_name or message.author.global_name or message.author.name
payload = {
@@ -112,6 +124,10 @@ class Discord:
except Exception as e:
logger.error(f"处理 Discord 交互响应失败:{e}")
# Update user-chat mapping for reply targeting
if interaction.user and interaction.channel:
self._update_user_chat_mapping(str(interaction.user.id), str(interaction.channel.id))
username = (interaction.user.display_name or interaction.user.global_name or interaction.user.name) \
if interaction.user else None
payload = {
@@ -168,13 +184,19 @@ class Discord:
original_message_id: Optional[Union[int, str]] = None,
original_chat_id: Optional[str] = None,
mtype: Optional['NotificationType'] = None) -> Optional[bool]:
logger.debug(f"[Discord] send_msg 被调用: userid={userid}, title={title[:50] if title else None}...")
logger.debug(f"[Discord] get_state() = {self.get_state()}, "
f"_ready_event.is_set() = {self._ready_event.is_set()}, "
f"_client = {self._client is not None}")
if not self.get_state():
logger.warning("[Discord] get_state() 返回 FalseBot 未就绪,无法发送消息")
return False
if not title and not text:
logger.warn("标题和内容不能同时为空")
return False
try:
logger.debug(f"[Discord] 准备异步发送消息...")
future = asyncio.run_coroutine_threadsafe(
self._send_message(title=title, text=text, image=image, userid=userid,
link=link, buttons=buttons,
@@ -182,7 +204,9 @@ class Discord:
original_chat_id=original_chat_id,
mtype=mtype),
self._loop)
return future.result(timeout=30)
result = future.result(timeout=30)
logger.debug(f"[Discord] 异步发送完成,结果: {result}")
return result
except Exception as err:
logger.error(f"发送 Discord 消息失败:{err}")
return False
@@ -254,7 +278,9 @@ class Discord:
original_message_id: Optional[Union[int, str]],
original_chat_id: Optional[str],
mtype: Optional['NotificationType'] = None) -> bool:
logger.debug(f"[Discord] _send_message: userid={userid}, original_chat_id={original_chat_id}")
channel = await self._resolve_channel(userid=userid, chat_id=original_chat_id)
logger.debug(f"[Discord] _resolve_channel 返回: {channel}, type={type(channel)}")
if not channel:
logger.error("未找到可用的 Discord 频道或私聊")
return False
@@ -264,11 +290,18 @@ class Discord:
content = None
if original_message_id and original_chat_id:
logger.debug(f"[Discord] 编辑现有消息: message_id={original_message_id}")
return await self._edit_message(chat_id=original_chat_id, message_id=original_message_id,
content=content, embed=embed, view=view)
await channel.send(content=content, embed=embed, view=view)
return True
logger.debug(f"[Discord] 发送新消息到频道: {channel}")
try:
await channel.send(content=content, embed=embed, view=view)
logger.debug("[Discord] 消息发送成功")
return True
except Exception as e:
logger.error(f"[Discord] 发送消息到频道失败: {e}")
return False
async def _send_list_message(self, embeds: List[discord.Embed],
userid: Optional[str],
@@ -515,26 +548,54 @@ class Discord:
return view
async def _resolve_channel(self, userid: Optional[str] = None, chat_id: Optional[str] = None):
# 优先使用明确的聊天 ID
"""
Resolve the channel to send messages to.
Priority order:
1. `chat_id` (original channel where user sent the message) - for contextual replies
2. `userid` mapping (channel where user last sent a message) - for contextual replies
3. Configured `_channel_id` (broadcast channel) - for system notifications
4. Any available text channel in configured guild - fallback
5. `userid` (DM) - for private conversations as a final fallback
"""
logger.debug(f"[Discord] _resolve_channel: userid={userid}, chat_id={chat_id}, "
f"_channel_id={self._channel_id}, _guild_id={self._guild_id}")
# Priority 1: Use explicit chat_id (reply to the same channel where user sent message)
if chat_id:
logger.debug(f"[Discord] 尝试通过 chat_id={chat_id} 获取原始频道")
channel = self._client.get_channel(int(chat_id))
if channel:
logger.debug(f"[Discord] 通过 get_channel 找到频道: {channel}")
return channel
try:
return await self._client.fetch_channel(int(chat_id))
channel = await self._client.fetch_channel(int(chat_id))
logger.debug(f"[Discord] 通过 fetch_channel 找到频道: {channel}")
return channel
except Exception as err:
logger.warn(f"通过 chat_id 获取 Discord 频道失败:{err}")
# 私聊
# Priority 2: Use user-chat mapping (reply to where the user last sent a message)
if userid:
dm = await self._get_dm_channel(str(userid))
if dm:
return dm
mapped_chat_id = self._get_user_chat_id(str(userid))
if mapped_chat_id:
logger.debug(f"[Discord] 从用户映射获取 chat_id={mapped_chat_id}")
channel = self._client.get_channel(int(mapped_chat_id))
if channel:
logger.debug(f"[Discord] 通过映射找到频道: {channel}")
return channel
try:
channel = await self._client.fetch_channel(int(mapped_chat_id))
logger.debug(f"[Discord] 通过 fetch_channel 找到映射频道: {channel}")
return channel
except Exception as err:
logger.warn(f"通过映射的 chat_id 获取 Discord 频道失败:{err}")
# 配置的广播频道
# Priority 3: Use configured broadcast channel (for system notifications)
if self._broadcast_channel:
logger.debug(f"[Discord] 使用缓存的广播频道: {self._broadcast_channel}")
return self._broadcast_channel
if self._channel_id:
logger.debug(f"[Discord] 尝试通过配置的 _channel_id={self._channel_id} 获取频道")
channel = self._client.get_channel(self._channel_id)
if not channel:
try:
@@ -544,9 +605,11 @@ class Discord:
channel = None
self._broadcast_channel = channel
if channel:
logger.debug(f"[Discord] 通过配置的频道ID找到频道: {channel}")
return channel
# 按 Guild 寻找一个可用文本频道
# Priority 4: Find any available text channel in guild (fallback)
logger.debug(f"[Discord] 尝试在 Guild 中寻找可用频道")
target_guilds = []
if self._guild_id:
guild = self._client.get_guild(self._guild_id)
@@ -554,22 +617,47 @@ class Discord:
target_guilds.append(guild)
else:
target_guilds = list(self._client.guilds)
logger.debug(f"[Discord] 目标 Guilds 数量: {len(target_guilds)}")
for guild in target_guilds:
for channel in guild.text_channels:
if guild.me and channel.permissions_for(guild.me).send_messages:
logger.debug(f"[Discord] 在 Guild 中找到可用频道: {channel}")
self._broadcast_channel = channel
return channel
# Priority 5: Fallback to DM (only if no channel available)
if userid:
logger.debug(f"[Discord] 回退到私聊: userid={userid}")
dm = await self._get_dm_channel(str(userid))
if dm:
logger.debug(f"[Discord] 获取到私聊频道: {dm}")
return dm
else:
logger.debug(f"[Discord] 无法获取用户 {userid} 的私聊频道")
return None
async def _get_dm_channel(self, userid: str) -> Optional[discord.DMChannel]:
logger.debug(f"[Discord] _get_dm_channel: userid={userid}")
if userid in self._user_dm_cache:
logger.debug(f"[Discord] 从缓存获取私聊频道: {self._user_dm_cache.get(userid)}")
return self._user_dm_cache.get(userid)
try:
user_obj = self._client.get_user(int(userid)) or await self._client.fetch_user(int(userid))
logger.debug(f"[Discord] 尝试获取/创建用户 {userid} 的私聊频道")
user_obj = self._client.get_user(int(userid))
logger.debug(f"[Discord] get_user 结果: {user_obj}")
if not user_obj:
user_obj = await self._client.fetch_user(int(userid))
logger.debug(f"[Discord] fetch_user 结果: {user_obj}")
if not user_obj:
logger.debug(f"[Discord] 无法找到用户 {userid}")
return None
dm = user_obj.dm_channel or await user_obj.create_dm()
dm = user_obj.dm_channel
logger.debug(f"[Discord] 用户现有 dm_channel: {dm}")
if not dm:
dm = await user_obj.create_dm()
logger.debug(f"[Discord] 创建新的 dm_channel: {dm}")
if dm:
self._user_dm_cache[userid] = dm
return dm
@@ -577,6 +665,25 @@ class Discord:
logger.error(f"获取 Discord 私聊失败:{err}")
return None
def _update_user_chat_mapping(self, userid: str, chat_id: str) -> None:
"""
Update user-chat mapping for reply targeting.
This ensures replies go to the same channel where the user sent the message.
:param userid: User ID
:param chat_id: Channel/Chat ID where the user sent the message
"""
if userid and chat_id:
self._user_chat_mapping[userid] = chat_id
logger.debug(f"[Discord] 更新用户频道映射: userid={userid} -> chat_id={chat_id}")
def _get_user_chat_id(self, userid: str) -> Optional[str]:
"""
Get the chat ID where the user last sent a message.
:param userid: User ID
:return: Chat ID or None if not found
"""
return self._user_chat_mapping.get(userid)
def _should_process_message(self, message: discord.Message) -> bool:
if isinstance(message.channel, discord.DMChannel):
return True

View File

@@ -154,7 +154,6 @@ class DoubanApi(metaclass=WeakSingleton):
_api_url = "https://api.douban.com/v2"
def __init__(self):
self.__clear_async_cache__ = False
self._session = requests.Session()
@classmethod
@@ -225,7 +224,7 @@ class DoubanApi(metaclass=WeakSingleton):
"""
return resp.json() if resp is not None else None
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get")
def __invoke(self, url: str, **kwargs) -> dict:
"""
GET请求
@@ -237,14 +236,11 @@ class DoubanApi(metaclass=WeakSingleton):
).get_res(url=req_url, params=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get")
async def __async_invoke(self, url: str, **kwargs) -> dict:
"""
GET请求异步版本
"""
if self.__clear_async_cache__:
self.__clear_async_cache__ = False
await self.__async_invoke.cache_clear()
req_url, params = self._prepare_get_request(url, **kwargs)
resp = await AsyncRequestUtils(
ua=choice(self._user_agents)
@@ -263,7 +259,7 @@ class DoubanApi(metaclass=WeakSingleton):
params.pop('_ts')
return req_url, params
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post")
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求
@@ -285,7 +281,7 @@ class DoubanApi(metaclass=WeakSingleton):
).post_res(url=req_url, data=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post")
async def __async_post(self, url: str, **kwargs) -> dict:
"""
POST请求异步版本
@@ -865,7 +861,7 @@ class DoubanApi(metaclass=WeakSingleton):
清空LRU缓存
"""
self.__invoke.cache_clear()
self.__clear_async_cache__ = True
self.__post.cache_clear()
def close(self):
if self._session:

View File

@@ -21,7 +21,7 @@ class DoubanScraper:
# 电影元数据文件
doc = self.__gen_movie_nfo_file(mediainfo=mediainfo)
else:
if season:
if season is not None:
# 季元数据文件
doc = self.__gen_tv_season_nfo_file(mediainfo=mediainfo, season=season)
else:
@@ -41,7 +41,7 @@ class DoubanScraper:
:param episode: 集号
"""
ret_dict = {}
if season:
if season is not None:
# 豆瓣无季图片
return {}
if episode:

View File

@@ -421,7 +421,7 @@ class Emby:
if str(tmdb_id) != str(item_info.tmdbid):
return None, {}
# 查集的信息
if not season:
if season is None:
season = None
try:
url = f"{self._host}emby/Shows/{item_id}/Episodes"
@@ -437,12 +437,12 @@ class Emby:
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
if season_index is None:
continue
if season and season != season_index:
if season is not None and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
if episode_index is None:
continue
if season_index not in season_episodes:
season_episodes[season_index] = []

View File

@@ -440,7 +440,7 @@ class FanartModule(_ModuleBase):
return result
@classmethod
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get")
def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]:
if media_type == MediaType.MOVIE:
image_url = cls._movie_url % queryid
@@ -456,3 +456,11 @@ class FanartModule(_ModuleBase):
except Exception as err:
logger.error(f"获取{queryid}的Fanart图片失败{str(err)}")
return None
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
self.__request_fanart.cache_clear()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -95,12 +95,11 @@ class FileManagerModule(_ModuleBase):
return False, f"{d.name} 的下载目录 {download_path} 与媒体库目录 {library_path} 不在同一磁盘,无法硬链接"
# 存储
storage_oper = self.__get_storage_oper(d.storage)
if not storage_oper:
return False, f"{d.name} 的存储类型 {d.storage} 不支持"
if not storage_oper.check():
return False, f"{d.name} 的存储测试不通过"
if d.transfer_type and d.transfer_type not in storage_oper.support_transtype():
return False, f"{d.name} 的存储不支持 {d.transfer_type} 整理方式"
if storage_oper:
if not storage_oper.check():
return False, f"{d.name} 的存储测试不通过"
if d.transfer_type and d.transfer_type not in storage_oper.support_transtype():
return False, f"{d.name} 的存储不支持 {d.transfer_type} 整理方式"
return True, ""

View File

@@ -3,7 +3,7 @@ import secrets
import time
from pathlib import Path
from threading import Lock
from typing import List, Optional, Tuple, Union, Dict
from typing import List, Optional, Tuple, Union
from hashlib import sha256
import oss2
@@ -20,7 +20,7 @@ from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
from app.utils.limit import QpsRateLimiter
from app.utils.limit import QpsRateLimiter, RateStats
lock = Lock()
@@ -46,22 +46,23 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
# 流控重试间隔时间
retry_delay = 70
# 下载接口单独限流
download_endpoint = "/open/ufile/downurl"
# 风控触发后休眠时间(秒)
limit_sleep_seconds = 3600
def __init__(self):
super().__init__()
self._auth_state = {}
self.session = httpx.Client(follow_redirects=True, timeout=20.0)
self._init_session()
self.qps_limiter: Dict[str, QpsRateLimiter] = {
"/open/ufile/files": QpsRateLimiter(4),
"/open/folder/get_info": QpsRateLimiter(3),
"/open/ufile/move": QpsRateLimiter(2),
"/open/ufile/copy": QpsRateLimiter(2),
"/open/ufile/update": QpsRateLimiter(2),
"/open/ufile/delete": QpsRateLimiter(2),
}
# 接口限流
self._download_limiter = QpsRateLimiter(1)
self._api_limiter = QpsRateLimiter(3)
self._limit_until = 0.0
self._limit_lock = Lock()
# 总体 QPS/QPM/QPH 统计
self._rate_stats = RateStats(source="115")
def _init_session(self):
"""
@@ -209,8 +210,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
try:
resp = self.session.get(
f"{settings.U115_AUTH_SERVER}/u115/token",
params={"state": state}
f"{settings.U115_AUTH_SERVER}/u115/token", params={"state": state}
)
if resp is None:
return {}, "无法连接到授权服务器"
@@ -221,12 +221,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if status == "completed":
data = result.get("data", {})
if data:
self.set_config({
"refresh_time": int(time.time()),
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_in": data.get("expires_in"),
})
self.set_config(
{
"refresh_time": int(time.time()),
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_in": data.get("expires_in"),
}
)
self._auth_state = {}
return {"status": 2, "tip": "授权成功"}, ""
return {}, "授权服务器返回数据不完整"
@@ -292,11 +294,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 错误日志标志
no_error_log = kwargs.pop("no_error_log", False)
# 重试次数
retry_times = kwargs.pop("retry_limit", 5)
retry_times = kwargs.pop("retry_limit", 3)
# qps 速率限制
if endpoint in self.qps_limiter:
self.qps_limiter[endpoint].acquire()
# 按接口类型限流
if endpoint == self.download_endpoint:
self._download_limiter.acquire()
else:
self._api_limiter.acquire()
self._rate_stats.record()
# 风控冷却期间阻止所有接口调用,统一等待
with self._limit_lock:
wait_until = self._limit_until
if wait_until > time.time():
wait_secs = wait_until - time.time()
logger.info(
f"【115】风控冷却中本请求等待 {wait_secs:.0f} 秒后再调用接口..."
)
time.sleep(wait_secs)
try:
resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
@@ -310,13 +325,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
kwargs["retry_limit"] = retry_times
# 处理速率限制
if resp.status_code == 429:
reset_time = 5 + int(resp.headers.get("X-RateLimit-Reset", 60))
logger.debug(
f"【115】{method} 请求 {endpoint} 限流,等待{reset_time}秒后重试"
self._rate_stats.log_stats("warning")
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 触发限流(429),重试次数用尽!"
)
return None
with self._limit_lock:
self._limit_until = max(
self._limit_until,
time.time() + self.limit_sleep_seconds,
)
logger.warning(
f"【115】触发限流(429),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..."
)
time.sleep(reset_time)
time.sleep(self.limit_sleep_seconds)
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
return self._request_api(method, endpoint, result_key, **kwargs)
# 处理请求错误
@@ -329,6 +355,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
)
return None
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
sleep_duration = 2 ** (5 - retry_times + 1)
logger.info(
f"【115】{method} 请求 {endpoint} 错误 {e},等待 {sleep_duration} 秒后重试..."
@@ -339,20 +366,27 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 返回数据
ret_data = resp.json()
if ret_data.get("code") not in (0, 20004):
error_msg = ret_data.get("message")
error_msg = ret_data.get("message", "")
if not no_error_log:
logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}")
if "已达到当前访问上限" in error_msg:
self._rate_stats.log_stats("warning")
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!"
f"【115】{method} 请求 {endpoint} 触发风控(访问上限),重试次数用尽!"
)
return None
kwargs["retry_limit"] = retry_times - 1
logger.info(
f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..."
with self._limit_lock:
self._limit_until = max(
self._limit_until,
time.time() + self.limit_sleep_seconds,
)
logger.warning(
f"【115】触发风控(访问上限),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..."
)
time.sleep(self.retry_delay)
time.sleep(self.limit_sleep_seconds)
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
return self._request_api(method, endpoint, result_key, **kwargs)
return None
@@ -879,7 +913,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
企业级复制实现(支持目录递归复制)
复制
"""
if fileitem.fileid is None:
fileitem = self.get_item(Path(fileitem.path))
@@ -912,7 +946,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
原子性移动操作实现
移动
"""
if fileitem.fileid is None:
fileitem = self.get_item(Path(fileitem.path))
@@ -950,7 +984,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def usage(self) -> Optional[schemas.StorageUsage]:
"""
获取带有企业级配额信息的存储使用情况
存储使用情况
"""
try:
resp = self._request_api("GET", "/open/user/info", "data")

View File

@@ -295,6 +295,7 @@ class TransHandler:
elif overwrite_mode == 'never':
# 存在不覆盖
self.__update_result(result=result,
success=False,
message=f"媒体库存在同名文件,当前覆盖模式为不覆盖",
fileitem=fileitem,
target_item=target_item,
@@ -313,6 +314,9 @@ class TransHandler:
logger.info(
f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_oper, new_file)
else:
# 附加文件 总是需要覆盖
overflag = True
# 整理文件
new_item, err_msg = self.__transfer_file(fileitem=fileitem,
@@ -497,18 +501,23 @@ class TransHandler:
重命名字幕文件,补充附加信息
"""
# 字幕正则式
_zhcn_sub_re = r"([.\[(](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \
r"|chinese|(cn|ch[si]|sg|zho?|eng)[-_&]?(cn|ch[si]|sg|zho?|eng)" \
r"|简[体中]?)[.\])])" \
_zhcn_sub_re = r"([.\[(\s](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \
r"|chinese|(cn|ch[si]|sg|zho?)[-_&]?(cn|ch[si]|sg|zho?|eng|jap|ja|jpn)" \
r"|eng[-_&]?(cn|ch[si]|sg|zho?)|(jap|ja|jpn)[-_&]?(cn|ch[si]|sg|zho?)" \
r"|简[体中]?)[.\])\s])" \
r"|([\u4e00-\u9fa5]{0,3}[中双][\u4e00-\u9fa5]{0,2}[字文语][\u4e00-\u9fa5]{0,3})" \
r"|简体|简中|JPSC|sc_jp" \
r"|(?<![a-z0-9])gb(?![a-z0-9])"
_zhtw_sub_re = r"([.\[(](((zh[-_])?(hk|tw|cht|tc))" \
r"|(cht|eng)[-_&]?(cht|eng)" \
r"|繁[体中]?)[.\])])" \
_zhtw_sub_re = r"([.\[(\s](((zh[-_])?(hk|tw|cht|tc))" \
r"|cht[-_&]?(cht|eng|jap|ja|jpn)" \
r"|eng[-_&]?cht|(jap|ja|jpn)[-_&]?cht" \
r"|繁[体中]?)[.\])\s])" \
r"|繁体中[文字]|中[文字]繁体|繁体|JPTC|tc_jp" \
r"|(?<![a-z0-9])big5(?![a-z0-9])"
_eng_sub_re = r"[.\[(]eng[.\])]"
_ja_sub_re = r"([.\[(\s](ja-jp|jap|ja|jpn" \
r"|(jap|ja|jpn)[-_&]?eng|eng[-_&]?(jap|ja|jpn))[.\])\s])" \
r"|日本語|日語"
_eng_sub_re = r"[.\[(\s]eng[.\])\s]"
# 原文件后缀
file_ext = f".{sub_item.extension}"
@@ -520,12 +529,15 @@ class TransHandler:
new_file_type = ".chi.zh-cn"
elif re.search(_zhtw_sub_re, sub_item.name, re.I):
new_file_type = ".zh-tw"
elif re.search(_ja_sub_re, sub_item.name, re.I):
new_file_type = ".ja"
elif re.search(_eng_sub_re, sub_item.name, re.I):
new_file_type = ".eng"
# 添加默认字幕标识
if ((settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn")
or (settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw")
or (settings.DEFAULT_SUB == "ja" and new_file_type == ".ja")
or (settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")):
new_sub_tag = ".default" + new_file_type
else:
@@ -789,8 +801,8 @@ class TransHandler:
continue
if media_file.type != "file":
continue
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if f".{media_file.extension.lower()}" not in media_exts:
# 当前只有视频文件需要保留最新版本,其余格式无需处理,以避免误删 (issue 5449)
if f".{media_file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
# 识别文件中的季集信息
filemeta = MetaInfoPath(media_path)

View File

@@ -428,6 +428,12 @@ class SiteSpider:
if pubdate_str:
pubdate_str = pubdate_str.replace('\n', ' ').strip()
self.torrents_info['pubdate'] = self.__filter_text(pubdate_str, selector.get('filters'))
if self.torrents_info.get('pubdate'):
try:
if not isinstance(self.torrents_info['pubdate'], datetime.datetime):
datetime.datetime.strptime(str(self.torrents_info['pubdate']), '%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
self.torrents_info['pubdate'] = StringUtils.unify_datetime_str(str(self.torrents_info['pubdate']))
def __get_date_elapsed(self, torrent: Any):
# torrent date elapsed text

View File

@@ -29,7 +29,7 @@ class TNodeSpider(metaclass=SingletonClass):
self._ua = indexer.get('ua')
self._timeout = indexer.get('timeout') or 15
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True)
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token")
def __get_token(self) -> Optional[str]:
if not self._domain:
return
@@ -43,7 +43,7 @@ class TNodeSpider(metaclass=SingletonClass):
return csrf_token.group(1)
return None
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True)
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token")
async def __async_get_token(self) -> Optional[str]:
if not self._domain:
return

View File

@@ -409,7 +409,7 @@ class Jellyfin:
if tmdb_id and item_info.tmdbid:
if str(tmdb_id) != str(item_info.tmdbid):
return None, {}
if not season:
if season is None:
season = None
url = f"{self._host}Shows/{item_id}/Episodes"
params = {
@@ -427,12 +427,12 @@ class Jellyfin:
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
if season_index is None:
continue
if season and season != season_index:
if season is not None and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
if episode_index is None:
continue
if not season_episodes.get(season_index):
season_episodes[season_index] = []

View File

@@ -287,7 +287,7 @@ class Plex:
episodes = videos.episodes()
season_episodes = {}
for episode in episodes:
if season and episode.seasonNumber != int(season):
if season is not None and episode.seasonNumber != int(season):
continue
if episode.seasonNumber not in season_episodes:
season_episodes[episode.seasonNumber] = []

View File

@@ -1,6 +1,7 @@
import re
from threading import Lock
from typing import List, Optional
from urllib.parse import quote
import requests
from slack_bolt import App
@@ -42,7 +43,9 @@ class Slack:
# 标记消息来源
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
# URL encode the source name to handle special characters
encoded_name = quote(kwargs.get('name'), safe='')
self._ds_url = f"{self._ds_url}&source={encoded_name}"
# 注册消息响应
@slack_app.event("message")

View File

@@ -2,7 +2,7 @@ import asyncio
import re
import threading
from typing import Optional, List, Dict, Callable
from urllib.parse import urljoin
from urllib.parse import urljoin, quote
from telebot import TeleBot, apihelper
from telebot.types import BotCommand, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto
@@ -65,7 +65,9 @@ class Telegram:
# 标记渠道来源
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
# URL encode the source name to handle special characters
encoded_name = quote(kwargs.get('name'), safe='')
self._ds_url = f"{self._ds_url}&source={encoded_name}"
@_bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
@@ -235,10 +237,14 @@ class Telegram:
return False
try:
if title and text:
caption = f"**{title}**\n{text}"
elif title:
caption = f"**{title}**"
# 标准化标题后再加粗,避免**符号被显示为文本
bold_title = (
f"**{standardize(title).removesuffix('\n')}**" if title else None
)
if bold_title and text:
caption = f"{bold_title}\n{text}"
elif bold_title:
caption = bold_title
elif text:
caption = text
else:

View File

@@ -798,7 +798,7 @@ class TheMovieDbModule(_ModuleBase):
if not tmdb_info:
return []
return [schemas.TmdbSeason(**sea)
for sea in tmdb_info.get("seasons", []) if sea.get("season_number")]
for sea in tmdb_info.get("seasons", []) if sea.get("season_number") is not None]
def tmdb_group_seasons(self, group_id: str) -> List[schemas.TmdbSeason]:
"""
@@ -1168,7 +1168,7 @@ class TheMovieDbModule(_ModuleBase):
if not tmdb_info:
return []
return [schemas.TmdbSeason(**sea)
for sea in tmdb_info.get("seasons", []) if sea.get("season_number")]
for sea in tmdb_info.get("seasons", []) if sea.get("season_number") is not None]
async def async_tmdb_group_seasons(self, group_id: str) -> List[schemas.TmdbSeason]:
"""

View File

@@ -167,7 +167,7 @@ class TmdbApi:
"""
记录匹配调试日志
"""
if season_number and season_year:
if season_number is not None and season_year:
logger.debug(f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
else:
logger.debug(f"正在识别{mtype.value}{name}, 年份={year} ...")
@@ -473,7 +473,7 @@ class TmdbApi:
info = self._set_media_type(info, MediaType.MOVIE)
else:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
if season_year and season_number is not None:
self._log_match_debug(mtype, name, season_year, season_number, season_year)
info = self.__search_tv_by_season(name,
season_year,
@@ -697,7 +697,7 @@ class TmdbApi:
return {}
ret_seasons = {}
for season_info in tv_info.get("seasons") or []:
if not season_info.get("season_number"):
if season_info.get("season_number") is None:
continue
ret_seasons[season_info.get("season_number")] = season_info
return ret_seasons
@@ -1625,6 +1625,9 @@ class TmdbApi:
"""
清除缓存
"""
self.match_web.cache_clear()
self.discover.discover_movies.cache_clear()
self.discover.discover_tv_shows.cache_clear()
self.tmdb.cache_clear()
# 私有异步方法
@@ -2028,7 +2031,7 @@ class TmdbApi:
info = self._set_media_type(info, MediaType.MOVIE)
else:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
if season_year and season_number is not None:
self._log_match_debug(mtype, name, season_year, season_number, season_year)
info = await self.__async_search_tv_by_season(name,
season_year,

View File

@@ -40,8 +40,6 @@ class TMDb(object):
self._reset = None
self._timeout = 15
self.__clear_async_cache__ = False
@property
def page(self):
return self._page
@@ -129,7 +127,6 @@ class TMDb(object):
return req
def cache_clear(self):
self.__clear_async_cache__ = True
return self.request.cache_clear()
def _validate_api_key(self):
@@ -200,7 +197,7 @@ class TMDb(object):
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
time.sleep(rate_limit_result)
return self._request_obj(action, params, call_cached, method, data, json, key)
return self._request_obj(action, params, False, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=False)
@@ -215,10 +212,6 @@ class TMDb(object):
self._validate_api_key()
url = self._build_url(action, params)
if self.__clear_async_cache__:
self.__clear_async_cache__ = False
await self.async_request.cache_clear()
async with async_fresh(not call_cached or method == "POST"):
req = await self.async_request(method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d'))
@@ -232,7 +225,7 @@ class TMDb(object):
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
await asyncio.sleep(rate_limit_result)
return await self._async_request_obj(action, params, call_cached, method, data, json, key)
return await self._async_request_obj(action, params, False, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=True)

View File

@@ -162,3 +162,12 @@ class TheTvDbModule(_ModuleBase):
except Exception as err:
logger.error(f"用标题搜索TVDB剧集失败 ({title}): {str(err)}")
return []
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
if tvdb := self.tvdb:
tvdb.clear_cache()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -618,3 +618,9 @@ class TVDB:
"""
url = self.url.construct('user/favorites')
return self.request.make_request(url)
def clear_cache(self):
"""
清除缓存
"""
self.request.make_request.cache_clear()

View File

@@ -29,3 +29,10 @@ class RateLimitExceededException(LimitException):
这个异常通常用于本地限流逻辑(例如 RateLimiter当系统检测到函数调用频率过高时触发限流并抛出该异常。
"""
pass
class OperationInterrupted(KeyboardInterrupt):
"""
用于表示操作被中断
"""
pass

View File

@@ -98,8 +98,14 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
"""
def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
source: str = "", enable_logging: bool = True):
def __init__(
self,
base_wait: float = 60.0,
max_wait: float = 600.0,
backoff_factor: float = 2.0,
source: str = "",
enable_logging: bool = True,
):
"""
初始化 ExponentialBackoffRateLimiter 实例
:param base_wait: 基础等待时间(秒),默认值为 60 秒1 分钟)
@@ -156,7 +162,9 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
current_time = time.time()
with self.lock:
self.next_allowed_time = current_time + self.current_wait
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
self.current_wait = min(
self.current_wait * self.backoff_factor, self.max_wait
)
wait_time = self.next_allowed_time - current_time
self.log_warning(f"触发限流,将在 {wait_time:.2f} 秒后允许继续调用")
@@ -168,8 +176,13 @@ class WindowRateLimiter(BaseRateLimiter):
如果超过允许的最大调用次数,则限流直到窗口期结束
"""
def __init__(self, max_calls: int, window_seconds: float,
source: str = "", enable_logging: bool = True):
def __init__(
self,
max_calls: int,
window_seconds: float,
source: str = "",
enable_logging: bool = True,
):
"""
初始化 WindowRateLimiter 实例
:param max_calls: 在时间窗口内允许的最大调用次数
@@ -190,7 +203,10 @@ class WindowRateLimiter(BaseRateLimiter):
current_time = time.time()
with self.lock:
# 清理超出时间窗口的调用记录
while self.call_times and current_time - self.call_times[0] > self.window_seconds:
while (
self.call_times
and current_time - self.call_times[0] > self.window_seconds
):
self.call_times.popleft()
if len(self.call_times) < self.max_calls:
@@ -225,8 +241,12 @@ class CompositeRateLimiter(BaseRateLimiter):
当任意一个限流策略触发限流时,都会阻止调用
"""
def __init__(self, limiters: List[BaseRateLimiter], source: str = "", enable_logging: bool = True):
def __init__(
self,
limiters: List[BaseRateLimiter],
source: str = "",
enable_logging: bool = True,
):
"""
初始化 CompositeRateLimiter 实例
:param limiters: 要组合的限流器列表
@@ -263,7 +283,9 @@ class CompositeRateLimiter(BaseRateLimiter):
# 通用装饰器:自定义限流器实例
def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -> Callable:
def rate_limit_handler(
limiter: BaseRateLimiter, raise_on_limit: bool = False
) -> Callable:
"""
通用装饰器,允许用户传递自定义的限流器实例,用于处理限流逻辑
该装饰器可灵活支持任意继承自 BaseRateLimiter 的限流器
@@ -344,8 +366,14 @@ def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -
# 装饰器:指数退避限流
def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
def rate_limit_exponential(
base_wait: float = 60.0,
max_wait: float = 600.0,
backoff_factor: float = 2.0,
raise_on_limit: bool = False,
source: str = "",
enable_logging: bool = True,
) -> Callable:
"""
装饰器,用于应用指数退避限流策略
通过逐渐增加调用等待时间控制调用频率。每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
@@ -359,14 +387,21 @@ def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, bac
:return: 装饰器函数
"""
# 实例化 ExponentialBackoffRateLimiter并传入相关参数
limiter = ExponentialBackoffRateLimiter(base_wait, max_wait, backoff_factor, source, enable_logging)
limiter = ExponentialBackoffRateLimiter(
base_wait, max_wait, backoff_factor, source, enable_logging
)
# 使用通用装饰器逻辑包装该限流器
return rate_limit_handler(limiter, raise_on_limit)
# 装饰器:时间窗口限流
def rate_limit_window(max_calls: int, window_seconds: float,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
def rate_limit_window(
max_calls: int,
window_seconds: float,
raise_on_limit: bool = False,
source: str = "",
enable_logging: bool = True,
) -> Callable:
"""
装饰器,用于应用时间窗口限流策略
在固定的时间窗口内限制调用次数,当调用次数超过最大值时,触发限流,直到时间窗口结束
@@ -407,3 +442,63 @@ class QpsRateLimiter:
self.next_call_time = max(now, self.next_call_time) + self.interval
if sleep_duration > 0:
time.sleep(sleep_duration)
class RateStats:
"""
请求速率统计:记录时间戳,计算 QPS / QPM / QPH
"""
def __init__(self, window_seconds: float = 7200, source: str = ""):
"""
:param window_seconds: 统计窗口(秒),默认 2 小时,用于计算 QPH
:param source: 日志来源标识
"""
self._window = window_seconds
self._source = source
self._lock = threading.Lock()
self._timestamps: deque = deque()
def record(self) -> None:
"""
记录一次请求
"""
t = time.time()
with self._lock:
self._timestamps.append(t)
while self._timestamps and t - self._timestamps[0] > self._window:
self._timestamps.popleft()
def _count_since(self, seconds: float) -> int:
t = time.time()
with self._lock:
return sum(1 for ts in self._timestamps if t - ts <= seconds)
def get_qps(self) -> float:
"""
最近 1 秒内请求数
"""
return self._count_since(1.0)
def get_qpm(self) -> float:
"""
最近 1 分钟内请求数
"""
return self._count_since(60.0)
def get_qph(self) -> float:
"""
最近 1 小时内请求数
"""
return self._count_since(3600.0)
def log_stats(self, level: str = "info") -> None:
"""
输出当前 QPS/QPM/QPH
"""
qps, qpm, qph = self.get_qps(), self.get_qpm(), self.get_qph()
msg = f"QPS={qps} QPM={qpm} QPH={qph}"
if self._source:
msg = f"[{self._source}] {msg}"
log_fn = getattr(logger, level, logger.info)
log_fn(msg)

View File

@@ -166,10 +166,8 @@ class SystemUtils:
移动
"""
try:
# 当前目录改名
temp = src.replace(src.parent / dest.name)
# 移动到目标目录
shutil.move(temp, dest)
# 直接移动到目标路径,避免中间改名步骤触发目录监控
shutil.move(src, dest)
return 0, ""
except Exception as err:
return -1, str(err)

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.9.8'
FRONTEND_VERSION = 'v2.9.8'
APP_VERSION = 'v2.9.11'
FRONTEND_VERSION = 'v2.9.11'