diff --git a/app/agent/tools/impl/_system_setting_utils.py b/app/agent/tools/impl/_system_setting_utils.py index c434aa8b..774217f5 100644 --- a/app/agent/tools/impl/_system_setting_utils.py +++ b/app/agent/tools/impl/_system_setting_utils.py @@ -62,6 +62,10 @@ SYSTEMCONFIG_SETTING_METADATA = { "group": "custom_identifiers", "label": "自定义识别词", }, + SystemConfigKey.EpisodeFormatRuleTable.value: { + "group": "transfer", + "label": "集数定位规则词表", + }, SystemConfigKey.CustomReleaseGroups.value: { "group": "customization", "label": "自定义制作组/字幕组", diff --git a/app/api/endpoints/transfer.py b/app/api/endpoints/transfer.py index 3101ba20..d2d4fc4a 100644 --- a/app/api/endpoints/transfer.py +++ b/app/api/endpoints/transfer.py @@ -15,7 +15,13 @@ from app.db.models import User from app.db.models.transferhistory import TransferHistory from app.db.user_oper import get_current_active_superuser from app.helper.directory import DirectoryHelper -from app.schemas import MediaType, FileItem, ManualTransferItem +from app.log import logger +from app.schemas import ( + MediaType, + FileItem, + ManualTransferItem, + EpisodeFormatRecommendItem, +) router = APIRouter() @@ -233,6 +239,34 @@ def manual_transfer( return schemas.Response(success=True) +@router.post( + "/episode-format/recommend", + summary="推荐集数定位模板", + response_model=schemas.Response, +) +def recommend_episode_format( + recommend_item: EpisodeFormatRecommendItem, + _: User = Depends(get_current_active_superuser), +) -> Any: + """ + 根据目录样本推荐集数定位模板 + :param recommend_item: 推荐请求 + :param _: Token校验 + """ + target_path = recommend_item.fileitem.path if recommend_item.fileitem else None + logger.info(f"开始推荐集数定位模板:{target_path}") + state, errmsg, data = TransferChain().recommend_episode_format( + fileitem=recommend_item.fileitem + ) + if not state: + logger.warn(f"推荐集数定位模板失败:{target_path} - {errmsg}") + return schemas.Response(success=False, message=errmsg) + logger.info( + f"推荐集数定位模板成功:{target_path} - 规则 {data.get('rule_name') if data else None}" + ) + return schemas.Response(success=True, data=data) + + @router.get("/now", summary="立即执行下载器文件整理", response_model=schemas.Response) def now(_: Annotated[str, Depends(verify_apitoken)]) -> Any: """ diff --git a/app/chain/transfer.py b/app/chain/transfer.py index b54c0594..09a5c81b 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -26,6 +26,7 @@ from app.db.models.transferhistory import TransferHistory from app.db.systemconfig_oper import SystemConfigOper from app.db.transferhistory_oper import TransferHistoryOper from app.helper.directory import DirectoryHelper +from app.helper.episode_format import EpisodeFormatRuleHelper from app.helper.format import FormatParser from app.helper.progress import ProgressHelper from app.log import logger @@ -820,6 +821,21 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): else False ) + @staticmethod + def __is_hidden_or_recycle_path(file_path: Optional[str]) -> bool: + """ + 判断是否隐藏或回收站路径 + """ + if not file_path: + return False + normalized_path = file_path.replace("\\", "/") + return ( + "/@Recycle/" in normalized_path + or "/#recycle/" in normalized_path + or "/." in normalized_path + or "/@eaDir" in normalized_path + ) + def __default_callback( self, task: TransferTask, transferinfo: TransferInfo, / ) -> Tuple[bool, str]: @@ -1652,6 +1668,94 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ return self.run_module("recommend_name", meta=meta, mediainfo=mediainfo) + def recommend_episode_format( + self, + fileitem: FileItem, + ) -> Tuple[bool, str, Optional[dict]]: + """ + 根据目录样本推荐集数定位模板 + """ + if not fileitem or not fileitem.path: + logger.warn("推荐集数定位模板失败:缺少目录参数") + return False, "缺少目录参数", None + + directory = self.__resolve_episode_format_directory(fileitem) + if not directory or directory.type != "dir": + logger.warn(f"推荐集数定位模板失败:目录不存在 - {fileitem.path}") + return False, "目录不存在", None + + rules = self.__get_episode_format_rules() + sample_files = self.__get_episode_format_sample_files(directory) + logger.info( + f"开始匹配集数定位规则:{directory.path},规则数 {len(rules)},样本数 {len(sample_files)}" + ) + state, errmsg, data = EpisodeFormatRuleHelper().recommend( + rules=rules, + sample_files=sample_files, + ) + if not state: + logger.warn(f"集数定位模板推荐失败:{directory.path} - {errmsg}") + return state, errmsg, data + logger.info( + f"集数定位模板推荐成功:{directory.path} - 规则 {data.get('rule_name') if data else None}" + ) + return state, errmsg, data + + @staticmethod + def __get_episode_format_rules() -> List[schemas.EpisodeFormatRule]: + """ + 获取启用的集数定位规则 + """ + rule_items = SystemConfigOper().get(SystemConfigKey.EpisodeFormatRuleTable) or [] + rules: List[schemas.EpisodeFormatRule] = [] + for item in rule_items: + if not isinstance(item, dict): + continue + try: + rule = schemas.EpisodeFormatRule(**item) + except Exception as err: + logger.warn(f"忽略无效的集数定位规则:{err}") + continue + if rule.enabled: + rules.append(rule) + return sorted(rules, key=lambda item: item.order) + + def __resolve_episode_format_directory( + self, fileitem: FileItem + ) -> Optional[FileItem]: + """ + 将文件或目录入参归一化为目录对象 + """ + storage_chain = StorageChain() + if fileitem.type == "dir": + return storage_chain.get_item(fileitem) + source_path = Path(fileitem.path) + parent_item = FileItem( + storage=fileitem.storage, + path=source_path.parent.as_posix(), + type="dir", + name=source_path.parent.name, + ) + return storage_chain.get_item(parent_item) + + def __get_episode_format_sample_files( + self, directory: FileItem + ) -> List[FileItem]: + """ + 获取目录下可参与模板推荐的媒体文件 + """ + file_items = StorageChain().list_files(directory, recursion=False) or [] + sample_files: List[FileItem] = [] + for item in file_items: + if not item or item.type != "file": + continue + if not self.__is_media_file(item): + continue + if self.__is_hidden_or_recycle_path(item.path): + continue + sample_files.append(item) + return sample_files + def process(self) -> bool: """ 获取下载器中的种子列表,并执行整理 diff --git a/app/helper/episode_format.py b/app/helper/episode_format.py new file mode 100644 index 00000000..a12fc86d --- /dev/null +++ b/app/helper/episode_format.py @@ -0,0 +1,158 @@ +import re +from typing import List, Match, Optional, Tuple + +from app.helper.format import FormatParser +from app.log import logger +from app.schemas import EpisodeFormatRule, FileItem + + +class EpisodeFormatRuleHelper: + """ + 集数定位规则辅助类 + """ + + def recommend( + self, + rules: List[EpisodeFormatRule], + sample_files: List[FileItem], + ) -> Tuple[bool, str, Optional[dict]]: + """ + 推荐集数定位模板 + """ + if not rules: + return False, "未配置集数定位规则", None + + if not sample_files: + return False, "目录中没有可用于识别的媒体文件", None + + for index, rule in enumerate(rules): + matched_samples = self._match_rule(rule, sample_files) + if not matched_samples: + continue + + sample_file, match_result = matched_samples[0] + episode_format = self._build_template(sample_file.name, match_result) + if not episode_format: + continue + if not self._validate_template(episode_format, matched_samples): + logger.warn(f"集数定位规则 {rule.name} 模板校验失败") + continue + + logger.info( + f"集数定位规则命中:{rule.name},样本文件:{sample_file.name}" + ) + return True, "", { + "rule_name": rule.name, + "rule_index": index, + "pattern": rule.pattern, + "episode_format": episode_format, + "sample_file": sample_file.name, + "min_file_size_mb": rule.min_file_size_mb, + "message": "已根据预定义规则生成集数定位模板", + } + + return False, "未匹配到可用的集数定位规则", None + + @staticmethod + def _match_rule( + rule: EpisodeFormatRule, sample_files: List[FileItem] + ) -> List[Tuple[FileItem, Match[str]]]: + """ + 获取规则命中的样本文件 + """ + try: + compiled_pattern = re.compile( + EpisodeFormatRuleHelper._normalize_pattern(rule.pattern) + ) + except Exception as err: + logger.warn(f"集数定位规则 {rule.name} 编译失败:{err}") + return [] + + matched_samples: List[Tuple[FileItem, Match[str]]] = [] + for item in sample_files: + if rule.min_file_size_mb and (item.size or 0) < rule.min_file_size_mb * 1024 * 1024: + continue + match_result = compiled_pattern.search(item.name or "") + if not match_result or "ep" not in match_result.groupdict(): + continue + matched_samples.append((item, match_result)) + return matched_samples + + def _build_template(self, file_name: str, match_result: Match[str]) -> Optional[str]: + """ + 根据命中的样本生成模板 + """ + group_items = [] + for group_name, group_value in match_result.groupdict().items(): + if group_value is None: + continue + start, end = match_result.span(group_name) + if start < 0 or end < 0: + continue + group_items.append((start, end, group_name)) + + if not group_items or not any(group_name == "ep" for _, _, group_name in group_items): + return None + + group_items.sort(key=lambda item: (item[0], -(item[1] - item[0]))) + template_parts: List[str] = [] + cursor = 0 + for start, end, group_name in group_items: + if start < cursor: + continue + template_parts.append(self._escape_literal(file_name[cursor:start])) + template_parts.append(f"{{{group_name}}}") + cursor = end + template_parts.append(self._escape_literal(file_name[cursor:])) + return "".join(template_parts) + + def _validate_template( + self, + episode_format: str, + matched_samples: List[Tuple[FileItem, Match[str]]], + ) -> bool: + """ + 校验生成的模板是否可被现有格式解析器稳定消费 + """ + parser = FormatParser(eformat=episode_format) + for item, match_result in matched_samples: + if not parser.match(item.name): + return False + result = parser.split_episode(file_name=item.name, file_meta=None) + if result[0] is None: + return False + expected_episode = match_result.groupdict().get("ep") + if not self._episode_matches(result[0], expected_episode): + return False + return True + + @staticmethod + def _episode_matches(actual_episode: int, expected_episode: Optional[str]) -> bool: + """ + 校验模板提取出的集数是否与正则命名组一致 + """ + if expected_episode is None: + return False + number_match = re.search(r"\d{1,4}", expected_episode) + if not number_match: + return False + return int(number_match.group()) == actual_episode + + @staticmethod + def _normalize_pattern(pattern: str) -> str: + """ + 将 PCRE 风格命名组转为 Python re 可识别的语法 + """ + return re.sub(r"\(\?<([a-zA-Z_][a-zA-Z0-9_]*)>", r"(?P<\1>", pattern) + + def _escape_literal(self, text: str) -> str: + """ + 将样本文本转为 parse 模板中的字面量 + """ + escaped_parts: List[str] = [] + for char in text: + if char in "{}": + escaped_parts.append(char * 2) + else: + escaped_parts.append(char) + return "".join(escaped_parts) diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index f4c60b56..0fb2fd73 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -166,6 +166,24 @@ class EpisodeFormat(BaseModel): offset: Optional[str] = None +class EpisodeFormatRule(BaseModel): + """ + 集数定位规则 + """ + name: str + enabled: bool = True + order: int = 0 + pattern: str + min_file_size_mb: int = 0 + + +class EpisodeFormatRecommendItem(BaseModel): + """ + 集数定位推荐请求 + """ + fileitem: FileItem + + class ManualTransferItem(BaseModel): # 文件项 fileitem: FileItem = None diff --git a/app/schemas/types.py b/app/schemas/types.py index 81a82f92..ecc855a2 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -198,6 +198,8 @@ class SystemConfigKey(Enum): Customization = "Customization" # 自定义识别词 CustomIdentifiers = "CustomIdentifiers" + # 集数定位规则词表 + EpisodeFormatRuleTable = "EpisodeFormatRuleTable" # 转移屏蔽词 TransferExcludeWords = "TransferExcludeWords" # 种子优先级规则