feat: 新增集数定位模板生成接口 (#5785)

This commit is contained in:
Album
2026-05-19 07:18:28 +08:00
committed by GitHub
parent 1f0eeb25e6
commit 9e9c398177
6 changed files with 321 additions and 1 deletions

View File

@@ -62,6 +62,10 @@ SYSTEMCONFIG_SETTING_METADATA = {
"group": "custom_identifiers",
"label": "自定义识别词",
},
SystemConfigKey.EpisodeFormatRuleTable.value: {
"group": "transfer",
"label": "集数定位规则词表",
},
SystemConfigKey.CustomReleaseGroups.value: {
"group": "customization",
"label": "自定义制作组/字幕组",

View File

@@ -15,7 +15,13 @@ from app.db.models import User
from app.db.models.transferhistory import TransferHistory
from app.db.user_oper import get_current_active_superuser
from app.helper.directory import DirectoryHelper
from app.schemas import MediaType, FileItem, ManualTransferItem
from app.log import logger
from app.schemas import (
MediaType,
FileItem,
ManualTransferItem,
EpisodeFormatRecommendItem,
)
router = APIRouter()
@@ -233,6 +239,34 @@ def manual_transfer(
return schemas.Response(success=True)
@router.post(
"/episode-format/recommend",
summary="推荐集数定位模板",
response_model=schemas.Response,
)
def recommend_episode_format(
recommend_item: EpisodeFormatRecommendItem,
_: User = Depends(get_current_active_superuser),
) -> Any:
"""
根据目录样本推荐集数定位模板
:param recommend_item: 推荐请求
:param _: Token校验
"""
target_path = recommend_item.fileitem.path if recommend_item.fileitem else None
logger.info(f"开始推荐集数定位模板:{target_path}")
state, errmsg, data = TransferChain().recommend_episode_format(
fileitem=recommend_item.fileitem
)
if not state:
logger.warn(f"推荐集数定位模板失败:{target_path} - {errmsg}")
return schemas.Response(success=False, message=errmsg)
logger.info(
f"推荐集数定位模板成功:{target_path} - 规则 {data.get('rule_name') if data else None}"
)
return schemas.Response(success=True, data=data)
@router.get("/now", summary="立即执行下载器文件整理", response_model=schemas.Response)
def now(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""

View File

@@ -26,6 +26,7 @@ from app.db.models.transferhistory import TransferHistory
from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.episode_format import EpisodeFormatRuleHelper
from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
@@ -820,6 +821,21 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
else False
)
@staticmethod
def __is_hidden_or_recycle_path(file_path: Optional[str]) -> bool:
"""
判断是否隐藏或回收站路径
"""
if not file_path:
return False
normalized_path = file_path.replace("\\", "/")
return (
"/@Recycle/" in normalized_path
or "/#recycle/" in normalized_path
or "/." in normalized_path
or "/@eaDir" in normalized_path
)
def __default_callback(
self, task: TransferTask, transferinfo: TransferInfo, /
) -> Tuple[bool, str]:
@@ -1652,6 +1668,94 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
return self.run_module("recommend_name", meta=meta, mediainfo=mediainfo)
def recommend_episode_format(
self,
fileitem: FileItem,
) -> Tuple[bool, str, Optional[dict]]:
"""
根据目录样本推荐集数定位模板
"""
if not fileitem or not fileitem.path:
logger.warn("推荐集数定位模板失败:缺少目录参数")
return False, "缺少目录参数", None
directory = self.__resolve_episode_format_directory(fileitem)
if not directory or directory.type != "dir":
logger.warn(f"推荐集数定位模板失败:目录不存在 - {fileitem.path}")
return False, "目录不存在", None
rules = self.__get_episode_format_rules()
sample_files = self.__get_episode_format_sample_files(directory)
logger.info(
f"开始匹配集数定位规则:{directory.path},规则数 {len(rules)},样本数 {len(sample_files)}"
)
state, errmsg, data = EpisodeFormatRuleHelper().recommend(
rules=rules,
sample_files=sample_files,
)
if not state:
logger.warn(f"集数定位模板推荐失败:{directory.path} - {errmsg}")
return state, errmsg, data
logger.info(
f"集数定位模板推荐成功:{directory.path} - 规则 {data.get('rule_name') if data else None}"
)
return state, errmsg, data
@staticmethod
def __get_episode_format_rules() -> List[schemas.EpisodeFormatRule]:
"""
获取启用的集数定位规则
"""
rule_items = SystemConfigOper().get(SystemConfigKey.EpisodeFormatRuleTable) or []
rules: List[schemas.EpisodeFormatRule] = []
for item in rule_items:
if not isinstance(item, dict):
continue
try:
rule = schemas.EpisodeFormatRule(**item)
except Exception as err:
logger.warn(f"忽略无效的集数定位规则:{err}")
continue
if rule.enabled:
rules.append(rule)
return sorted(rules, key=lambda item: item.order)
def __resolve_episode_format_directory(
self, fileitem: FileItem
) -> Optional[FileItem]:
"""
将文件或目录入参归一化为目录对象
"""
storage_chain = StorageChain()
if fileitem.type == "dir":
return storage_chain.get_item(fileitem)
source_path = Path(fileitem.path)
parent_item = FileItem(
storage=fileitem.storage,
path=source_path.parent.as_posix(),
type="dir",
name=source_path.parent.name,
)
return storage_chain.get_item(parent_item)
def __get_episode_format_sample_files(
self, directory: FileItem
) -> List[FileItem]:
"""
获取目录下可参与模板推荐的媒体文件
"""
file_items = StorageChain().list_files(directory, recursion=False) or []
sample_files: List[FileItem] = []
for item in file_items:
if not item or item.type != "file":
continue
if not self.__is_media_file(item):
continue
if self.__is_hidden_or_recycle_path(item.path):
continue
sample_files.append(item)
return sample_files
def process(self) -> bool:
"""
获取下载器中的种子列表,并执行整理

View File

@@ -0,0 +1,158 @@
import re
from typing import List, Match, Optional, Tuple
from app.helper.format import FormatParser
from app.log import logger
from app.schemas import EpisodeFormatRule, FileItem
class EpisodeFormatRuleHelper:
"""
集数定位规则辅助类
"""
def recommend(
self,
rules: List[EpisodeFormatRule],
sample_files: List[FileItem],
) -> Tuple[bool, str, Optional[dict]]:
"""
推荐集数定位模板
"""
if not rules:
return False, "未配置集数定位规则", None
if not sample_files:
return False, "目录中没有可用于识别的媒体文件", None
for index, rule in enumerate(rules):
matched_samples = self._match_rule(rule, sample_files)
if not matched_samples:
continue
sample_file, match_result = matched_samples[0]
episode_format = self._build_template(sample_file.name, match_result)
if not episode_format:
continue
if not self._validate_template(episode_format, matched_samples):
logger.warn(f"集数定位规则 {rule.name} 模板校验失败")
continue
logger.info(
f"集数定位规则命中:{rule.name},样本文件:{sample_file.name}"
)
return True, "", {
"rule_name": rule.name,
"rule_index": index,
"pattern": rule.pattern,
"episode_format": episode_format,
"sample_file": sample_file.name,
"min_file_size_mb": rule.min_file_size_mb,
"message": "已根据预定义规则生成集数定位模板",
}
return False, "未匹配到可用的集数定位规则", None
@staticmethod
def _match_rule(
rule: EpisodeFormatRule, sample_files: List[FileItem]
) -> List[Tuple[FileItem, Match[str]]]:
"""
获取规则命中的样本文件
"""
try:
compiled_pattern = re.compile(
EpisodeFormatRuleHelper._normalize_pattern(rule.pattern)
)
except Exception as err:
logger.warn(f"集数定位规则 {rule.name} 编译失败:{err}")
return []
matched_samples: List[Tuple[FileItem, Match[str]]] = []
for item in sample_files:
if rule.min_file_size_mb and (item.size or 0) < rule.min_file_size_mb * 1024 * 1024:
continue
match_result = compiled_pattern.search(item.name or "")
if not match_result or "ep" not in match_result.groupdict():
continue
matched_samples.append((item, match_result))
return matched_samples
def _build_template(self, file_name: str, match_result: Match[str]) -> Optional[str]:
"""
根据命中的样本生成模板
"""
group_items = []
for group_name, group_value in match_result.groupdict().items():
if group_value is None:
continue
start, end = match_result.span(group_name)
if start < 0 or end < 0:
continue
group_items.append((start, end, group_name))
if not group_items or not any(group_name == "ep" for _, _, group_name in group_items):
return None
group_items.sort(key=lambda item: (item[0], -(item[1] - item[0])))
template_parts: List[str] = []
cursor = 0
for start, end, group_name in group_items:
if start < cursor:
continue
template_parts.append(self._escape_literal(file_name[cursor:start]))
template_parts.append(f"{{{group_name}}}")
cursor = end
template_parts.append(self._escape_literal(file_name[cursor:]))
return "".join(template_parts)
def _validate_template(
self,
episode_format: str,
matched_samples: List[Tuple[FileItem, Match[str]]],
) -> bool:
"""
校验生成的模板是否可被现有格式解析器稳定消费
"""
parser = FormatParser(eformat=episode_format)
for item, match_result in matched_samples:
if not parser.match(item.name):
return False
result = parser.split_episode(file_name=item.name, file_meta=None)
if result[0] is None:
return False
expected_episode = match_result.groupdict().get("ep")
if not self._episode_matches(result[0], expected_episode):
return False
return True
@staticmethod
def _episode_matches(actual_episode: int, expected_episode: Optional[str]) -> bool:
"""
校验模板提取出的集数是否与正则命名组一致
"""
if expected_episode is None:
return False
number_match = re.search(r"\d{1,4}", expected_episode)
if not number_match:
return False
return int(number_match.group()) == actual_episode
@staticmethod
def _normalize_pattern(pattern: str) -> str:
"""
将 PCRE 风格命名组转为 Python re 可识别的语法
"""
return re.sub(r"\(\?<([a-zA-Z_][a-zA-Z0-9_]*)>", r"(?P<\1>", pattern)
def _escape_literal(self, text: str) -> str:
"""
将样本文本转为 parse 模板中的字面量
"""
escaped_parts: List[str] = []
for char in text:
if char in "{}":
escaped_parts.append(char * 2)
else:
escaped_parts.append(char)
return "".join(escaped_parts)

View File

@@ -166,6 +166,24 @@ class EpisodeFormat(BaseModel):
offset: Optional[str] = None
class EpisodeFormatRule(BaseModel):
"""
集数定位规则
"""
name: str
enabled: bool = True
order: int = 0
pattern: str
min_file_size_mb: int = 0
class EpisodeFormatRecommendItem(BaseModel):
"""
集数定位推荐请求
"""
fileitem: FileItem
class ManualTransferItem(BaseModel):
# 文件项
fileitem: FileItem = None

View File

@@ -198,6 +198,8 @@ class SystemConfigKey(Enum):
Customization = "Customization"
# 自定义识别词
CustomIdentifiers = "CustomIdentifiers"
# 集数定位规则词表
EpisodeFormatRuleTable = "EpisodeFormatRuleTable"
# 转移屏蔽词
TransferExcludeWords = "TransferExcludeWords"
# 种子优先级规则